Module phi.physics.diffuse
Functions to simulate diffusion processes on Field
objects.
Expand source code
"""
Functions to simulate diffusion processes on `phi.field.Field` objects.
"""
import warnings
from typing import Union
from phi import math
from phi.field import Grid, Field, laplace, solve_linear, jit_compile_linear
from phi.field._field import FieldType
from phi.field._grid import GridType
from phiml.math import copy_with, shape, Solve, Tensor, wrap, spatial
def explicit(field: FieldType,
diffusivity: Union[float, math.Tensor, Field],
dt: Union[float, math.Tensor],
substeps: int = 1) -> FieldType:
"""
Simulate a finite-time diffusion process of the form dF/dt = α · ΔF on a given `Field` FieldType with diffusion coefficient α.
Args:
field: CenteredGrid, StaggeredGrid or ConstantField
diffusivity: Diffusion per time. `diffusion_amount = diffusivity * dt`
Can be a number, `phiml.math.Tensor` or `phi.field.Field`.
If a channel dimension is present, it will be interpreted as non-isotropic diffusion.
dt: Time interval. `diffusion_amount = diffusivity * dt`
substeps: number of iterations to use (Default value = 1)
Returns:
Diffused field of same type as `field`.
"""
amount = diffusivity * (dt / substeps)
# --- CFL check if possible ---
amount_ = amount.values if isinstance(amount, Field) else wrap(amount)
if amount_.available:
cfl = math.max(amount_, spatial) / field.dx**2
if (cfl > .5).any:
warnings.warn(f"CFL condition violated (CFL = {float(cfl.max):.1f} > 0.5) in diffuse.explicit() with diffusivity={diffusivity}, dt={dt}, dx={field.dx}. Increase substeps or use diffuse.implicit() instead.", RuntimeWarning, stacklevel=2)
# --- diffusion ---
if isinstance(amount, Field):
amount = amount.at(field)
ext = field.extrapolation
for i in range(substeps):
delta = laplace(field, weights=amount) if 'vector' in shape(amount) else amount * laplace(field)
field = (field + delta.with_extrapolation(ext)).with_extrapolation(ext)
return field
def implicit(field: FieldType,
diffusivity: Union[float, math.Tensor, Field],
dt: Union[float, math.Tensor],
order: int = 1,
solve=Solve('CG')) -> FieldType:
"""
Diffusion by solving a linear system of equations.
Args:
order: Order of method, 1=first order. This translates to `substeps` for the explicit sharpening.
field: `phi.field.Field` to diffuse.
diffusivity: Diffusion per time. `diffusion_amount = diffusivity * dt`
dt: Time interval. `diffusion_amount = diffusivity * dt`
solve:
Returns:
Diffused field of same type as `field`.
"""
@jit_compile_linear
def sharpen(x):
return explicit(x, diffusivity, -dt, substeps=order)
if not solve.x0:
solve = copy_with(solve, x0=field)
return solve_linear(sharpen, y=field, solve=solve)
def finite_difference(grid: Grid,
diffusivity: Union[float, math.Tensor, Field],
order: int,
implicit: math.Solve) -> FieldType:
"""
Diffusion by using a finite difference scheme.
In contrast to `explicit` and `implicit` accuracy can be increased by using stencils of higher-order rather than calculating substeps.
This is controlled by the `scheme` passed.
Args:
grid: CenteredGrid or StaggeredGrid
diffusivity: Diffusion per time. `diffusion_amount = diffusivity * dt`
order: Spatial order of accuracy.
Higher orders entail larger stencils and more computation time but result in more accurate results assuming a large enough resolution.
Supported: 2 explicit, 4 explicit, 6 implicit (inherited from `phi.field.laplace()`).
implicit: When a `Solve` object is passed, performs an implicit operation with the specified solver and tolerances.
Otherwise, an explicit stencil is used.
Returns:
Diffused grid of same type as `grid`.
"""
diffusivity = diffusivity.at(grid) if isinstance(diffusivity, Field) else diffusivity
return diffusivity * laplace(grid, order=order, implicit=implicit).with_extrapolation(grid.extrapolation)
def fourier(field: GridType,
diffusivity: Union[float, math.Tensor],
dt: Union[float, math.Tensor]) -> FieldType:
"""
Exact diffusion of a periodic field in frequency space.
For non-periodic fields or non-constant diffusivity, use another diffusion function such as `explicit()`.
Args:
field:
diffusivity: Diffusion per time. `diffusion_amount = diffusivity * dt`
dt: Time interval. `diffusion_amount = diffusivity * dt`
Returns:
Diffused field of same type as `field`.
"""
assert isinstance(field, Grid), "Cannot diffuse field of type '%s'" % type(field)
assert field.extrapolation == math.extrapolation.PERIODIC, "Fourier diffusion can only be applied to periodic fields."
amount = diffusivity * dt
k = math.fftfreq(field.resolution)
k2 = math.vec_squared(k)
fft_laplace = -(2 * math.PI) ** 2 * k2
diffuse_kernel = math.exp(fft_laplace * amount)
result_k = math.fft(field.values) * diffuse_kernel
result_values = math.real(math.ifft(result_k))
return field.with_values(result_values)
Functions
def explicit(field: ~FieldType, diffusivity: Union[float, phiml.math._tensors.Tensor, phi.field._field.Field], dt: Union[phiml.math._tensors.Tensor, float], substeps: int = 1) ‑> ~FieldType
-
Simulate a finite-time diffusion process of the form dF/dt = α · ΔF on a given
Field
FieldType with diffusion coefficient α.Args
field
- CenteredGrid, StaggeredGrid or ConstantField
diffusivity
- Diffusion per time.
diffusion_amount = diffusivity * dt
Can be a number,phiml.math.Tensor
orField
. If a channel dimension is present, it will be interpreted as non-isotropic diffusion. dt
- Time interval.
diffusion_amount = diffusivity * dt
substeps
- number of iterations to use (Default value = 1)
Returns
Diffused field of same type as
field
.Expand source code
def explicit(field: FieldType, diffusivity: Union[float, math.Tensor, Field], dt: Union[float, math.Tensor], substeps: int = 1) -> FieldType: """ Simulate a finite-time diffusion process of the form dF/dt = α · ΔF on a given `Field` FieldType with diffusion coefficient α. Args: field: CenteredGrid, StaggeredGrid or ConstantField diffusivity: Diffusion per time. `diffusion_amount = diffusivity * dt` Can be a number, `phiml.math.Tensor` or `phi.field.Field`. If a channel dimension is present, it will be interpreted as non-isotropic diffusion. dt: Time interval. `diffusion_amount = diffusivity * dt` substeps: number of iterations to use (Default value = 1) Returns: Diffused field of same type as `field`. """ amount = diffusivity * (dt / substeps) # --- CFL check if possible --- amount_ = amount.values if isinstance(amount, Field) else wrap(amount) if amount_.available: cfl = math.max(amount_, spatial) / field.dx**2 if (cfl > .5).any: warnings.warn(f"CFL condition violated (CFL = {float(cfl.max):.1f} > 0.5) in diffuse.explicit() with diffusivity={diffusivity}, dt={dt}, dx={field.dx}. Increase substeps or use diffuse.implicit() instead.", RuntimeWarning, stacklevel=2) # --- diffusion --- if isinstance(amount, Field): amount = amount.at(field) ext = field.extrapolation for i in range(substeps): delta = laplace(field, weights=amount) if 'vector' in shape(amount) else amount * laplace(field) field = (field + delta.with_extrapolation(ext)).with_extrapolation(ext) return field
def finite_difference(grid: phi.field._grid.Grid, diffusivity: Union[float, phiml.math._tensors.Tensor, phi.field._field.Field], order: int, implicit: phiml.math._optimize.Solve) ‑> ~FieldType
-
Diffusion by using a finite difference scheme. In contrast to
explicit()
andimplicit()
accuracy can be increased by using stencils of higher-order rather than calculating substeps. This is controlled by thescheme
passed.Args
grid
- CenteredGrid or StaggeredGrid
diffusivity
- Diffusion per time.
diffusion_amount = diffusivity * dt
order
- Spatial order of accuracy.
Higher orders entail larger stencils and more computation time but result in more accurate results assuming a large enough resolution.
Supported: 2 explicit, 4 explicit, 6 implicit (inherited from
laplace()
). implicit
- When a
Solve
object is passed, performs an implicit operation with the specified solver and tolerances. Otherwise, an explicit stencil is used.
Returns
Diffused grid of same type as
grid
.Expand source code
def finite_difference(grid: Grid, diffusivity: Union[float, math.Tensor, Field], order: int, implicit: math.Solve) -> FieldType: """ Diffusion by using a finite difference scheme. In contrast to `explicit` and `implicit` accuracy can be increased by using stencils of higher-order rather than calculating substeps. This is controlled by the `scheme` passed. Args: grid: CenteredGrid or StaggeredGrid diffusivity: Diffusion per time. `diffusion_amount = diffusivity * dt` order: Spatial order of accuracy. Higher orders entail larger stencils and more computation time but result in more accurate results assuming a large enough resolution. Supported: 2 explicit, 4 explicit, 6 implicit (inherited from `phi.field.laplace()`). implicit: When a `Solve` object is passed, performs an implicit operation with the specified solver and tolerances. Otherwise, an explicit stencil is used. Returns: Diffused grid of same type as `grid`. """ diffusivity = diffusivity.at(grid) if isinstance(diffusivity, Field) else diffusivity return diffusivity * laplace(grid, order=order, implicit=implicit).with_extrapolation(grid.extrapolation)
def fourier(field: ~GridType, diffusivity: Union[phiml.math._tensors.Tensor, float], dt: Union[phiml.math._tensors.Tensor, float]) ‑> ~FieldType
-
Exact diffusion of a periodic field in frequency space.
For non-periodic fields or non-constant diffusivity, use another diffusion function such as
explicit()
.Args
- field:
diffusivity
- Diffusion per time.
diffusion_amount = diffusivity * dt
dt
- Time interval.
diffusion_amount = diffusivity * dt
Returns
Diffused field of same type as
field
.Expand source code
def fourier(field: GridType, diffusivity: Union[float, math.Tensor], dt: Union[float, math.Tensor]) -> FieldType: """ Exact diffusion of a periodic field in frequency space. For non-periodic fields or non-constant diffusivity, use another diffusion function such as `explicit()`. Args: field: diffusivity: Diffusion per time. `diffusion_amount = diffusivity * dt` dt: Time interval. `diffusion_amount = diffusivity * dt` Returns: Diffused field of same type as `field`. """ assert isinstance(field, Grid), "Cannot diffuse field of type '%s'" % type(field) assert field.extrapolation == math.extrapolation.PERIODIC, "Fourier diffusion can only be applied to periodic fields." amount = diffusivity * dt k = math.fftfreq(field.resolution) k2 = math.vec_squared(k) fft_laplace = -(2 * math.PI) ** 2 * k2 diffuse_kernel = math.exp(fft_laplace * amount) result_k = math.fft(field.values) * diffuse_kernel result_values = math.real(math.ifft(result_k)) return field.with_values(result_values)
def implicit(field: ~FieldType, diffusivity: Union[float, phiml.math._tensors.Tensor, phi.field._field.Field], dt: Union[phiml.math._tensors.Tensor, float], order: int = 1, solve=CG with tolerance None (rel), None (abs), max_iterations=1000) ‑> ~FieldType
-
Diffusion by solving a linear system of equations.
Args
order
- Order of method, 1=first order. This translates to
substeps
for the explicit sharpening. field
Field
to diffuse.diffusivity
- Diffusion per time.
diffusion_amount = diffusivity * dt
dt
- Time interval.
diffusion_amount = diffusivity * dt
solve:
Returns
Diffused field of same type as
field
.Expand source code
def implicit(field: FieldType, diffusivity: Union[float, math.Tensor, Field], dt: Union[float, math.Tensor], order: int = 1, solve=Solve('CG')) -> FieldType: """ Diffusion by solving a linear system of equations. Args: order: Order of method, 1=first order. This translates to `substeps` for the explicit sharpening. field: `phi.field.Field` to diffuse. diffusivity: Diffusion per time. `diffusion_amount = diffusivity * dt` dt: Time interval. `diffusion_amount = diffusivity * dt` solve: Returns: Diffused field of same type as `field`. """ @jit_compile_linear def sharpen(x): return explicit(x, diffusivity, -dt, substeps=order) if not solve.x0: solve = copy_with(solve, x0=field) return solve_linear(sharpen, y=field, solve=solve)