Module phi.math.backend
Low-level library wrappers for delegating vector operations.
Expand source code
"""
Low-level library wrappers for delegating vector operations.
"""
from ._backend import (
Backend, choose_backend, NoBackendFound,
ComputeDevice,
default_backend, set_global_default_backend, BACKENDS, context_backend, _DEFAULT,
get_precision, precision, set_global_precision,
convert,
PHI_LOGGER,
)
from ._numpy_backend import NumPyBackend as _NumPyBackend
from ._profile import Profile, get_current_profile, profile, profile_function
NUMPY = _NumPyBackend()
"""Default backend for NumPy arrays and SciPy objects."""
BACKENDS.append(NUMPY)
_DEFAULT.append(NUMPY)
__all__ = [key for key in globals().keys() if not key.startswith('_')]
__pdoc__ = {
'ComputeDevice.__init__': False,
'NoBackendFound.__init__': False,
'Profile.__init__': False,
}
Global variables
var NUMPY
-
Default backend for NumPy arrays and SciPy objects.
Functions
def choose_backend(*values, prefer_default=False) ‑> phi.math.backend._backend.Backend
-
Selects a suitable backend to handle the given values.
This function is used by most math functions operating on
Tensor
objects to delegate the actual computations.Backends need to be registered to be available, e.g. via the global import
phi.<backend>
ordetect_backends()
.Args
- *values:
prefer_default
- Whether to always select the default backend if it can work with
values
, seedefault_backend()
.
Returns
The selected
Backend
Expand source code
def choose_backend(*values, prefer_default=False) -> Backend: """ Selects a suitable backend to handle the given values. This function is used by most math functions operating on `Tensor` objects to delegate the actual computations. Backends need to be registered to be available, e.g. via the global import `phi.<backend>` or `phi.detect_backends()`. Args: *values: prefer_default: Whether to always select the default backend if it can work with `values`, see `default_backend()`. Returns: The selected `Backend` """ # --- Default Backend has priority --- if _is_applicable(_DEFAULT[-1], values) and (prefer_default or _is_specific(_DEFAULT[-1], values)): return _DEFAULT[-1] # --- Filter out non-applicable --- backends = [backend for backend in BACKENDS if _is_applicable(backend, values)] if len(backends) == 0: raise NoBackendFound(f"No backend found for types {[type(v).__name__ for v in values]}; registered backends are {BACKENDS}") # --- Native tensors? --- for backend in backends: if _is_specific(backend, values): return backend return backends[0]
def context_backend() ‑> phi.math.backend._backend.Backend
-
Returns the backend set by the inner-most surrounding
with backend:
block. If called outside a backend context, returnsNone
.Returns
Backend
orNone
Expand source code
def context_backend() -> Backend or None: """ Returns the backend set by the inner-most surrounding `with backend:` block. If called outside a backend context, returns `None`. Returns: `Backend` or `None` """ return _DEFAULT[-1] if len(_DEFAULT) > 1 else None
def convert(tensor, backend: phi.math.backend._backend.Backend = None, use_dlpack=True)
-
Convert a Tensor to the native format of
backend
. If the target backend can operate natively ontensor
, returnstensor
.If both backends support DLPack and
use_dlpack=True
, uses zero-copy conversion using the DLPack library. Else, intermediately convertstensor
to a NumPy array.Warning: This operation breaks the automatic differentiation chain.
Args
tensor
- Native tensor belonging to any registered backend.
backend
- Target backend. If
None
, uses the current default backend, seedefault_backend()
.
Returns
Tensor belonging to
backend
.Expand source code
def convert(tensor, backend: Backend = None, use_dlpack=True): """ Convert a Tensor to the native format of `backend`. If the target backend can operate natively on `tensor`, returns `tensor`. If both backends support *DLPack* and `use_dlpack=True`, uses zero-copy conversion using the DLPack library. Else, intermediately converts `tensor` to a NumPy array. *Warning*: This operation breaks the automatic differentiation chain. Args: tensor: Native tensor belonging to any registered backend. backend: Target backend. If `None`, uses the current default backend, see `default_backend()`. Returns: Tensor belonging to `backend`. """ backend = backend or default_backend() current_backend = choose_backend(tensor, prefer_default=False) if backend.is_tensor(tensor, True) or backend is current_backend: return tensor if use_dlpack and current_backend.supports(Backend.to_dlpack) and backend.supports(Backend.from_dlpack): capsule = current_backend.to_dlpack(tensor) return backend.from_dlpack(capsule) else: nparray = current_backend.numpy(tensor) return backend.as_tensor(nparray)
def default_backend() ‑> phi.math.backend._backend.Backend
-
The default backend is preferred by
choose_backend()
.The default backend can be set globally using
set_global_default_backend()
and locally usingwith backend:
.Returns
current default
Backend
Expand source code
def default_backend() -> Backend: """ The default backend is preferred by `choose_backend()`. The default backend can be set globally using `set_global_default_backend()` and locally using `with backend:`. Returns: current default `Backend` """ return _DEFAULT[-1]
def get_current_profile() ‑> Optional[phi.math.backend._profile.Profile]
-
Returns the currently active
Profile
if one is active. Otherwise returnsNone
.Expand source code
def get_current_profile() -> Optional[Profile]: """ Returns the currently active `Profile` if one is active. Otherwise returns `None`. """ return _PROFILE[-1] if _PROFILE else None
def get_precision() ‑> int
-
Gets the current target floating point precision in bits. The precision can be set globally using
set_global_precision()
or locally usingwith precision(p):
.Any Backend method may convert floating point values to this precision, even if the input had a different precision.
Returns
16 for half, 32 for single, 64 for double
Expand source code
def get_precision() -> int: """ Gets the current target floating point precision in bits. The precision can be set globally using `set_global_precision()` or locally using `with precision(p):`. Any Backend method may convert floating point values to this precision, even if the input had a different precision. Returns: 16 for half, 32 for single, 64 for double """ return _PRECISION[-1]
def precision(floating_point_bits: int)
-
Sets the floating point precision for the local context.
Usage:
with precision(p):
This overrides the global setting, see
set_global_precision()
.Args
floating_point_bits
- 16 for half, 32 for single, 64 for double
Expand source code
@contextmanager def precision(floating_point_bits: int): """ Sets the floating point precision for the local context. Usage: `with precision(p):` This overrides the global setting, see `set_global_precision()`. Args: floating_point_bits: 16 for half, 32 for single, 64 for double """ _PRECISION.append(floating_point_bits) try: yield None finally: _PRECISION.pop(-1)
def profile(backends=None, trace=True, subtract_trace_time=True, save: str = None) ‑> phi.math.backend._profile.Profile
-
To be used in
with
statements,with math.backend.profile() as prof: ...
. Creates aProfile
for the code executed within the context by tracking calls to thebackends
and optionally tracing the call.Args
backends
- List of backends to profile,
None
to profile all. trace
- Whether to perform a full stack trace for each backend call. If true, groups backend calls by function.
subtract_trace_time
- If True, subtracts the time it took to trace the call stack from the event times
save
- (Optional) File path to save the profile to. This will call
Profile.save()
.
Returns
Created
Profile
Expand source code
@contextmanager def profile(backends=None, trace=True, subtract_trace_time=True, save: str or None = None) -> Profile: """ To be used in `with` statements, `with math.backend.profile() as prof: ...`. Creates a `Profile` for the code executed within the context by tracking calls to the `backends` and optionally tracing the call. Args: backends: List of backends to profile, `None` to profile all. trace: Whether to perform a full stack trace for each backend call. If true, groups backend calls by function. subtract_trace_time: If True, subtracts the time it took to trace the call stack from the event times save: (Optional) File path to save the profile to. This will call `Profile.save()`. Returns: Created `Profile` """ backends = BACKENDS if backends is None else backends prof = Profile(trace, backends, subtract_trace_time) restore_data = _start_profiling(prof, backends) try: yield prof finally: _stop_profiling(prof, *restore_data) if save is not None: prof.save(save)
def profile_function(fun: Callable, args: tuple = (), kwargs: dict = None, backends=None, trace=True, subtract_trace_time=True, retime=True, warmup=1, call_count=1) ‑> phi.math.backend._profile.Profile
-
Creates a
Profile
for the functionfun(*args, **kwargs)
.Args
fun
- Function to be profiled. In case
retime=True
, this function must perform the same operations each time it is called. Usewarmup>0
to ensure that internal caching does not interfere with the operations. args
- Arguments to be passed to
fun
. kwargs
- Keyword arguments to be passed to
fun
. backends
- List of backends to profile,
None
to profile all. trace
- Whether to perform a full stack trace for each backend call. If true, groups backend calls by function.
subtract_trace_time
- If True, subtracts the time it took to trace the call stack from the event times. Has no effect if
retime=True
. retime
- If true, calls
fun
another time without tracing the calls and updates the profile. This gives a much better indication of the true timing. SeeProfile.retime()
. warmup
- Number of times to call
fun
before profiling it. call_count
- How often to call the function (excluding retime and warmup). The times will be averaged over multiple runs if
call_count > 1
.
Returns
Created
Profile
forfun
.Expand source code
def profile_function(fun: Callable, args: tuple or list = (), kwargs: dict or None = None, backends=None, trace=True, subtract_trace_time=True, retime=True, warmup=1, call_count=1) -> Profile: """ Creates a `Profile` for the function `fun(*args, **kwargs)`. Args: fun: Function to be profiled. In case `retime=True`, this function must perform the same operations each time it is called. Use `warmup>0` to ensure that internal caching does not interfere with the operations. args: Arguments to be passed to `fun`. kwargs: Keyword arguments to be passed to `fun`. backends: List of backends to profile, `None` to profile all. trace: Whether to perform a full stack trace for each backend call. If true, groups backend calls by function. subtract_trace_time: If True, subtracts the time it took to trace the call stack from the event times. Has no effect if `retime=True`. retime: If true, calls `fun` another time without tracing the calls and updates the profile. This gives a much better indication of the true timing. See `Profile.retime()`. warmup: Number of times to call `fun` before profiling it. call_count: How often to call the function (excluding retime and warmup). The times will be averaged over multiple runs if `call_count > 1`. Returns: Created `Profile` for `fun`. """ kwargs = kwargs if isinstance(kwargs, dict) else {} for _ in range(warmup): fun(*args, **kwargs) with profile(backends=backends, trace=trace, subtract_trace_time=subtract_trace_time) as prof: fun(*args, **kwargs) if retime: with prof.retime(): fun(*args, **kwargs) if call_count > 1: with prof._accumulate_average(call_count): for _ in range(call_count - 1): fun(*args, **kwargs) return prof
def set_global_default_backend(backend: phi.math.backend._backend.Backend)
-
Sets the given backend as default. This setting can be overridden using
with backend:
.See
default_backend()
,choose_backend()
.Args
backend
Backend
to set as default
Expand source code
def set_global_default_backend(backend: Backend): """ Sets the given backend as default. This setting can be overridden using `with backend:`. See `default_backend()`, `choose_backend()`. Args: backend: `Backend` to set as default """ assert isinstance(backend, Backend) _DEFAULT[0] = backend
def set_global_precision(floating_point_bits: int)
-
Sets the floating point precision of DYNAMIC_BACKEND which affects all registered backends.
If
floating_point_bits
is an integer, all floating point tensors created henceforth will be of the corresponding data type, float16, float32 or float64. Operations may also convert floating point values to this precision, even if the input had a different precision.If
floating_point_bits
is None, new tensors will default to float32 unless specified otherwise. The output of math operations has the same precision as its inputs.Args
floating_point_bits
- one of (16, 32, 64, None)
Expand source code
def set_global_precision(floating_point_bits: int): """ Sets the floating point precision of DYNAMIC_BACKEND which affects all registered backends. If `floating_point_bits` is an integer, all floating point tensors created henceforth will be of the corresponding data type, float16, float32 or float64. Operations may also convert floating point values to this precision, even if the input had a different precision. If `floating_point_bits` is None, new tensors will default to float32 unless specified otherwise. The output of math operations has the same precision as its inputs. Args: floating_point_bits: one of (16, 32, 64, None) """ _PRECISION[0] = floating_point_bits
Classes
class Backend (name: str, default_device: phi.math.backend._backend.ComputeDevice)
-
Backends delegate low-level operations to a compute library or emulate them.
The methods of
Backend
form a comprehensive list of available operations.To support a compute library, subclass
Backend
and register it by adding it toBACKENDS
.Args
name
- Human-readable string
default_device
ComputeDevice
being used by default
Expand source code
class Backend: def __init__(self, name: str, default_device: ComputeDevice): """ Backends delegate low-level operations to a compute library or emulate them. The methods of `Backend` form a comprehensive list of available operations. To support a compute library, subclass `Backend` and register it by adding it to `BACKENDS`. Args: name: Human-readable string default_device: `ComputeDevice` being used by default """ self._name = name self._default_device = default_device def __enter__(self): _DEFAULT.append(self) def __exit__(self, exc_type, exc_val, exc_tb): _DEFAULT.pop(-1) @property def name(self) -> str: return self._name def supports(self, feature: str or Callable) -> bool: """ Tests if this backend supports the given feature. Features correspond to a method of this backend that must be implemented if the feature is supported. Possible features: * `sparse_coo_tensor` * `gradients Args: feature: `str` or unbound Backend method, e.g. `Backend.sparse_coo_tensor` Returns: Whether the feature is supported. """ feature = feature if isinstance(feature, str) else feature.__name__ if not hasattr(Backend, feature): raise ValueError(f"Not a valid feature: '{feature}'") backend_fun = getattr(Backend, feature) impl_fun = getattr(self.__class__, feature) return impl_fun is not backend_fun def prefers_channels_last(self) -> bool: raise NotImplementedError() @property def precision(self) -> int: """ Short for math.backend.get_precision() """ return get_precision() @property def float_type(self) -> DType: return DType(float, self.precision) @property def as_registered(self) -> 'Backend': from phi.math.backend import BACKENDS for backend in BACKENDS: if self.name in backend.name: return backend raise RuntimeError(f"Backend '{self}' is not visible.") @property def complex_type(self) -> DType: return DType(complex, max(64, self.precision)) def combine_types(self, *dtypes: DType) -> DType: return combine_types(*dtypes, fp_precision=self.precision) def auto_cast(self, *tensors) -> list: """ Determins the appropriate values type resulting from operations involving the tensors as input. This method is called by the default implementations of basic operators. Backends can override this method to prevent unnecessary casting. Args: *tensors: tensors to cast and to consider when determining the common data type Returns: tensors cast to a common data type """ dtypes = [self.dtype(t) for t in tensors] result_type = self.combine_types(*dtypes) if result_type.kind in (int, float, complex, bool): tensors = [self.cast(t, result_type) for t in tensors] return tensors def __str__(self): return self.name def __repr__(self): return self.name def list_devices(self, device_type: str or None = None) -> List[ComputeDevice]: """ Fetches information about all available compute devices this backend can use. Implementations: * NumPy: [`os.cpu_count`](https://docs.python.org/3/library/os.html#os.cpu_count) * PyTorch: [`torch.cuda.get_device_properties`](https://pytorch.org/docs/stable/cuda.html#torch.cuda.get_device_properties) * TensorFlow: `tensorflow.python.client.device_lib.list_local_devices` * Jax: [`jax.devices`](https://jax.readthedocs.io/en/latest/jax.html#jax.devices) See Also: `Backend.set_default_device()`. Args: device_type: (optional) Return only devices of this type, e.g. `'GPU'` or `'CPU'`. See `ComputeDevice.device_type`. Returns: `list` of all currently available devices. """ raise NotImplementedError() def get_default_device(self) -> ComputeDevice: return self._default_device def set_default_device(self, device: ComputeDevice or str) -> bool: """ Sets the device new tensors will be allocated on. This function will do nothing if the target device type is not available. See Also: `Backend.list_devices()`, `Backend.get_default_device()`. Args: device: `ComputeDevice` or device type as `str`, such as `'CPU'` or `'GPU'`. Returns: `bool` whether the device was successfully set. """ if isinstance(device, str): devices = self.list_devices(device) if not devices: warnings.warn(f"{self.name}: Cannot select '{device}' because no device of this type is available.", RuntimeWarning) return False device = devices[0] assert device.backend is self, f"Cannot set default device to {device.name} for backend {self.name} because the devices belongs to backend {device.backend.name}" self._default_device = device return True def seed(self, seed: int): raise NotImplementedError() def is_module(self, obj) -> bool: """ Tests if `obj` is of a type that is specific to this backend, e.g. a neural network. If `True`, this backend will be chosen for operations involving `obj`. See Also: `Backend.is_tensor()`. Args: obj: Object to test. """ raise NotImplementedError() def is_tensor(self, x, only_native=False): """ An object is considered a native tensor by a backend if no internal conversion is required by backend methods. An object is considered a tensor (nativer or otherwise) by a backend if it is not a struct (e.g. tuple, list) and all methods of the backend accept it as a tensor argument. If `True`, this backend will be chosen for operations involving `x`. See Also: `Backend.is_module()`. Args: x: object to check only_native: If True, only accepts true native tensor representations, not Python numbers or others that are also supported as tensors (Default value = False) Returns: bool: whether `x` is considered a tensor by this backend """ raise NotImplementedError() def as_tensor(self, x, convert_external=True): """ Converts a tensor-like object to the native tensor representation of this backend. If x is a native tensor of this backend, it is returned without modification. If x is a Python number (numbers.Number instance), `convert_numbers` decides whether to convert it unless the backend cannot handle Python numbers. *Note:* There may be objects that are considered tensors by this backend but are not native and thus, will be converted by this method. Args: x: tensor-like, e.g. list, tuple, Python number, tensor convert_external: if False and `x` is a Python number that is understood by this backend, this method returns the number as-is. This can help prevent type clashes like int32 vs int64. (Default value = True) Returns: tensor representation of `x` """ raise NotImplementedError() def is_available(self, tensor) -> bool: """ Tests if the value of the tensor is known and can be read at this point. If true, `numpy(tensor)` must return a valid NumPy representation of the value. Tensors are typically available when the backend operates in eager mode. Args: tensor: backend-compatible tensor Returns: bool """ raise NotImplementedError() def numpy(self, tensor) -> numpy.ndarray: """ Returns a NumPy representation of the given tensor. If `tensor` is already a NumPy array, it is returned without modification. This method raises an error if the value of the tensor is not known at this point, e.g. because it represents a node in a graph. Use `is_available(tensor)` to check if the value can be represented as a NumPy array. Args: tensor: backend-compatible tensor Returns: NumPy representation of the values stored in the tensor """ raise NotImplementedError() def to_dlpack(self, tensor): raise NotImplementedError() def from_dlpack(self, capsule): raise NotImplementedError() def copy(self, tensor, only_mutable=False): raise NotImplementedError() def call(self, f: Callable, *args, name=None): """ Calls `f(*args)` and returns the result. This method may be used to register internal calls with the profiler. Usage: choose_backend(key).call(custom_function, *args) """ return f(*args) def block_until_ready(self, values): pass def jit_compile(self, f: Callable) -> Callable: return NotImplemented def functional_gradient(self, f: Callable, wrt: tuple or list, get_output: bool): """ Args: f: Function to differentiate. wrt: Argument indices for which to compute the gradient. get_output: Whether the derivative function should return the output of `f` in addition to the gradient. Returns: A function `g` with the same arguments as `f`. If `get_output=True`, `g` returns a `tuple`containing the outputs of `f` followed by the gradients. """ raise NotImplementedError(self) def jacobian(self, f: Callable, wrt: tuple or list, get_output: bool): raise NotImplementedError(self) def hessian(self, f: Callable, wrt: tuple or list, get_output: bool, get_gradient: bool) -> tuple: """ First dimension of all inputs/outputs of `f` is assumed to be a batch dimension. Element-wise Hessians will be computed along the batch dimension. All other dimensions are parameter dimensions and will appear twice in the Hessian matrices. Args: f: Function whose first output is a scalar float or complex value. wrt: get_output: get_gradient: Returns: Function returning `(f(x), g(x), H(x))` or less depending on `get_output` and `get_gradient`. The result is always a `tuple` holding at most these three items. """ raise NotImplementedError(self) def custom_gradient(self, f: Callable, gradient: Callable) -> Callable: """ Creates a function based on `f` that uses a custom gradient for backprop. Args: f: Forward function. gradient: Function for backprop. Will be called as `gradient(*d_out)` to compute the gradient of `f`. Returns: Function with similar signature and return values as `f`. However, the returned function does not support keyword arguments. """ return NotImplemented def jit_compile_grad(self, f, wrt: tuple or list, get_output: bool): raise NotImplementedError() def jit_compile_hessian(self, f, wrt: tuple or list, get_output: bool, get_gradient: bool): raise NotImplementedError() def transpose(self, tensor, axes): raise NotImplementedError() def random_uniform(self, shape, low, high, dtype: DType or None): """ Float tensor of selected precision containing random values in the range [0, 1) """ raise NotImplementedError(self) def random_normal(self, shape, dtype: DType): """ Float tensor of selected precision containing random values sampled from a normal distribution with mean 0 and std 1. """ raise NotImplementedError(self) def stack(self, values, axis=0): raise NotImplementedError(self) def concat(self, values, axis): raise NotImplementedError(self) def pad(self, value, pad_width, mode: str = 'constant', constant_values=0): """ Pad a tensor with values as specified by `mode` and `constant_values`. If the mode is not supported, returns NotImplemented. Args: value: tensor pad_width: 2D tensor specifying the number of values padded to the edges of each axis in the form [[axis 0 lower, axis 0 upper], ...] including batch and component axes. mode: constant', 'boundary', 'periodic', 'symmetric', 'reflect' constant_values: used for out-of-bounds points if mode='constant' (Default value = 0) mode: str: (Default value = 'constant') Returns: padded tensor or NotImplemented """ raise NotImplementedError(self) def reshape(self, value, shape): raise NotImplementedError(self) def flip(self, value, axes: tuple or list): slices = tuple(slice(None, None, -1 if i in axes else None) for i in range(self.ndims(value))) return value[slices] def sum(self, value, axis=None, keepdims=False): raise NotImplementedError(self) def prod(self, value, axis=None): raise NotImplementedError(self) def divide_no_nan(self, x, y): """ Computes x/y but returns 0 if y=0. """ raise NotImplementedError(self) def where(self, condition, x=None, y=None): raise NotImplementedError(self) def nonzero(self, values): """ Args: values: Tensor with only spatial dimensions Returns: non-zero multi-indices as tensor of shape (nnz, vector) """ raise NotImplementedError(self) def mean(self, value, axis=None, keepdims=False): raise NotImplementedError(self) def range(self, start, limit=None, delta=1, dtype: DType = DType(int, 32)): raise NotImplementedError(self) def zeros(self, shape, dtype: DType = None): raise NotImplementedError(self) def zeros_like(self, tensor): raise NotImplementedError(self) def ones(self, shape, dtype: DType = None): raise NotImplementedError(self) def ones_like(self, tensor): raise NotImplementedError(self) def meshgrid(self, *coordinates): raise NotImplementedError(self) def linspace(self, start, stop, number): raise NotImplementedError(self) def tensordot(self, a, a_axes: tuple or list, b, b_axes: tuple or list): """ Multiply-sum-reduce a_axes of a with b_axes of b. """ raise NotImplementedError(self) def matmul(self, A, b): raise NotImplementedError(self) def einsum(self, equation, *tensors): raise NotImplementedError(self) def cumsum(self, x, axis: int): raise NotImplementedError(self) def while_loop(self, loop: Callable, values: tuple): """ ```python while any(values[0]): values = loop(*values) return values ``` This operation does not support backpropagation. Args: loop: Loop function, must return a `tuple` with entries equal to `values` in shape and data type. values: Initial values of loop variables. Returns: Loop variables upon loop completion. """ raise NotImplementedError(self) def abs(self, x): raise NotImplementedError(self) def sign(self, x): raise NotImplementedError(self) def round(self, x): raise NotImplementedError(self) def ceil(self, x): raise NotImplementedError(self) def floor(self, x): raise NotImplementedError(self) def max(self, x, axis=None, keepdims=False): raise NotImplementedError(self) def min(self, x, axis=None, keepdims=False): raise NotImplementedError(self) def maximum(self, a, b): raise NotImplementedError(self) def minimum(self, a, b): raise NotImplementedError(self) def clip(self, x, minimum, maximum): raise NotImplementedError(self) def sqrt(self, x): raise NotImplementedError(self) def exp(self, x): raise NotImplementedError(self) def conv(self, value, kernel, zero_padding=True): """ Convolve value with kernel. Depending on the tensor rank, the convolution is either 1D (rank=3), 2D (rank=4) or 3D (rank=5). Higher dimensions may not be supported. Args: value: tensor of shape (batch_size, in_channel, spatial...) kernel: tensor of shape (batch_size or 1, out_channel, in_channel, spatial...) zero_padding: If True, pads the edges of `value` with zeros so that the result has the same shape as `value`. Returns: Convolution result as tensor of shape (batch_size, out_channel, spatial...) """ raise NotImplementedError(self) def expand_dims(self, a, axis=0, number=1): raise NotImplementedError(self) def shape(self, tensor): """ Returns the shape of a tensor. The shape is iterable and implements `len()`. For non-eager tensors, undefined dimensions should return a placeholder value representing the size. See Also: `Backend.staticshape()`. Args: tensor: Native tensor compatible with this backend. Returns: Shape of `tensor` """ raise NotImplementedError(self) def staticshape(self, tensor) -> tuple: """ Evaluates the static shape of a native tensor. If the tensor is eager, the shape is a `tuple[int]`. For placeholder tensors, unknown dimensions are represented as `None`. See Also: `Backend.shape()`. Args: tensor: Native tensor compatible with this backend. Returns: `tuple` of sizes. Each size is an `int` if the size is defined, else `None`. """ raise NotImplementedError(self) def cast(self, x, dtype: DType): raise NotImplementedError(self) def to_float(self, x): """ Converts a tensor to floating point values with precision equal to the currently set default precision. See Also: `Backend.precision()`. If `x` is mutable and of the correct floating type, returns a copy of `x`. To convert float tensors to the backend precision but leave non-float tensors untouched, use `Backend.as_tensor()`. Args: x: tensor of bool, int or float Returns: Values of `x` as float tensor """ return self.cast(x, self.float_type) def to_int32(self, x): return self.cast(x, DType(int, 32)) def to_int64(self, x): return self.cast(x, DType(int, 64)) def to_complex(self, x): return self.cast(x, DType(complex, max(64, self.precision * 2))) def batched_gather_nd(self, values, indices): """ Gathers values from the tensor `values` at locations `indices`. The first dimension of `values` and `indices` is the batch dimension which must be either equal for both or one for either. Args: values: tensor of shape (batch, spatial..., channel) indices: int tensor of shape (batch, any..., multi_index) where the size of multi_index is values.rank - 2. Returns: Gathered values as tensor of shape (batch, any..., channel) """ raise NotImplementedError(self) def flatten(self, x): return self.reshape(x, (-1,)) def std(self, x, axis=None, keepdims=False): raise NotImplementedError(self) def boolean_mask(self, x, mask, axis=0): """ Args: x: tensor with any number of dimensions mask: 1D mask tensor axis: Axis index >= 0 """ raise NotImplementedError(self) def isfinite(self, x): raise NotImplementedError(self) def scatter(self, base_grid, indices, values, mode: str): """ Depending on `mode`, performs scatter_update or scatter_add. Args: base_grid: Tensor into which scatter values are inserted at indices. Tensor of shape (batch_size, spatial..., channels) indices: Tensor of shape (batch_size or 1, update_count, index_vector) values: Values to scatter at indices. Tensor of shape (batch_size or 1, update_count or 1, channels or 1) mode: One of ('update', 'add') Returns: Copy of base_grid with values at `indices` updated by `values`. """ raise NotImplementedError(self) def any(self, boolean_tensor, axis=None, keepdims=False): raise NotImplementedError(self) def all(self, boolean_tensor, axis=None, keepdims=False): raise NotImplementedError(self) def quantile(self, x, quantiles): """ Reduces the last / inner axis of x. Args: x: Tensor quantiles: List or 1D tensor of quantiles to compute. Returns: Tensor with shape (quantiles, *x.shape[:-1]) """ raise NotImplementedError(self) def fft(self, x, axes: tuple or list): """ Computes the n-dimensional FFT along all but the first and last dimensions. Args: x: tensor of dimension 3 or higher axes: Along which axes to perform the FFT Returns: Complex tensor `k` """ raise NotImplementedError(self) def ifft(self, k, axes: tuple or list): """ Computes the n-dimensional inverse FFT along all but the first and last dimensions. Args: k: tensor of dimension 3 or higher axes: Along which axes to perform the inverse FFT Returns: Complex tensor `x` """ raise NotImplementedError(self) def imag(self, x): raise NotImplementedError(self) def real(self, x): raise NotImplementedError(self) def conj(self, x): raise NotImplementedError(self) def sin(self, x): raise NotImplementedError(self) def arcsin(self, x): raise NotImplementedError(self) def cos(self, x): raise NotImplementedError(self) def arccos(self, x): raise NotImplementedError(self) def tan(self, x): raise NotImplementedError(self) def log(self, x): """ Natural logarithm """ raise NotImplementedError(self) def log2(self, x): raise NotImplementedError(self) def log10(self, x): raise NotImplementedError(self) def sigmoid(self, x): return 1 / (1 + self.exp(-x)) def dtype(self, array) -> DType: raise NotImplementedError(self) def tile(self, value, multiples): """ Repeats the tensor along each axis the number of times given by multiples. If `multiples` has more dimensions than `value`, these dimensions are added to `value` as outer dimensions. Args: value: tensor multiples: tuple or list of integers Returns: tile tensor """ raise NotImplementedError(self) def sparse_coo_tensor(self, indices: tuple or list, values, shape: tuple): """ Create a sparse matrix in coordinate list (COO) format. Optional feature. See Also: `Backend.csr_matrix()`, `Backend.csc_matrix()`. Args: indices: 2D tensor of shape `(2, n)` or tuple/list of two 1D tensors `(rows, cols)`. values: 1D values tensor matching `indices` shape: Shape of the sparse matrix Returns: Native representation of the sparse matrix """ raise NotImplementedError(self) def csr_matrix(self, column_indices, row_pointers, values, shape: tuple): """ Create a sparse matrix in compressed sparse row (CSR) format. Optional feature. See Also: `Backend.sparse_coo_tensor()`, `Backend.csc_matrix()`. Args: column_indices: Column indices corresponding to `values`, 1D tensor row_pointers: Indices in `values` where any row starts, 1D tensor of length `rows + 1` values: Non-zero values, 1D tensor shape: Shape of the full matrix Returns: Native representation of the sparse matrix """ raise NotImplementedError(self) def csc_matrix(self, column_pointers, row_indices, values, shape: tuple): """ Create a sparse matrix in compressed sparse column (CSC) format. Optional feature. See Also: `Backend.sparse_coo_tensor()`, `Backend.csr_matrix()`. Args: column_pointers: Indices in `values` where any column starts, 1D tensor of length `cols + 1` row_indices: Row indices corresponding to `values`. values: Non-zero values, 1D tensor shape: Shape of the full matrix Returns: Native representation of the sparse matrix """ raise NotImplementedError(self) def coordinates(self, tensor): """ Returns the coordinates and values of a tensor. Args: tensor: Sparse tensor Returns: coordinates: `tuple` of tensor holding the coordinate vectors, i.e. (row, col) for matrices. indices: Tensor holding the corresponding values """ raise NotImplementedError(self) def minimize(self, method: str, f, x0, atol, max_iter, trj: bool): if method == 'GD': return self._minimize_gradient_descent(f, x0, atol, max_iter, trj) from scipy.optimize import OptimizeResult, minimize from threading import Thread assert self.supports(Backend.functional_gradient) x0 = self.numpy(x0) assert x0.ndim == 2 # (batch, parameters) atol = self.numpy(atol) max_iter = self.numpy(max_iter) batch_size = x0.shape[0] fg = self.functional_gradient(f, [0], get_output=True) method_description = f"SciPy {method} with {self.name}" iterations = [0] * batch_size function_evaluations = [0] * batch_size xs = [None] * batch_size final_losses = [None] * batch_size converged = [False] * batch_size diverged = [False] * batch_size messages = [""] * batch_size f_inputs = [None] * batch_size f_b_losses = None f_b_losses_np = None f_grad_np = None f_input_available = Barrier(batch_size + 1) f_output_available = Barrier(batch_size + 1) finished = [False] * batch_size all_finished = False trajectories = [[] for _ in range(batch_size)] if trj else None threads = [] for b in range(batch_size): # Run each independent example as a scipy minimization in a new thread def b_thread(b=b): recent_b_losses = [] def b_fun(x: numpy.ndarray): function_evaluations[b] += 1 f_inputs[b] = self.as_tensor(x, convert_external=True) f_input_available.wait() f_output_available.wait() recent_b_losses.append(f_b_losses[b]) if final_losses[b] is None: # first evaluation final_losses[b] = f_b_losses[b] if trajectories is not None: trajectories[b].append(SolveResult(method_description, x0[b], f_b_losses[b], 0, 1, False, False, "")) return f_b_losses_np[b], f_grad_np[b] def callback(x, *args): # L-BFGS-B only passes x but the documentation says (x, state) iterations[b] += 1 loss = min(recent_b_losses) recent_b_losses.clear() final_losses[b] = loss if trajectories is not None: trajectories[b].append(SolveResult(method_description, x, loss, iterations[b], function_evaluations[b], False, False, "")) res = minimize(fun=b_fun, x0=x0[b], jac=True, method=method, tol=atol[b], options={'maxiter': max_iter[b]}, callback=callback) assert isinstance(res, OptimizeResult) # res.nit, res.nfev xs[b] = res.x converged[b] = res.success diverged[b] = res.status not in (0, 1) # 0=success messages[b] = res.message finished[b] = True while not all_finished: f_input_available.wait() f_output_available.wait() b_thread = Thread(target=b_thread) threads.append(b_thread) b_thread.start() while True: f_input_available.wait() if all(finished): all_finished = True f_output_available.wait() break _, f_b_losses, f_grad = fg(self.stack(f_inputs)) # Evaluate function and gradient f_b_losses_np = self.numpy(f_b_losses).astype(numpy.float64) f_grad_np = self.numpy(f_grad).astype(numpy.float64) f_output_available.wait() for b_thread in threads: b_thread.join() # make sure threads exit correctly if trj: max_trajectory_length = max([len(t) for t in trajectories]) last_points = [SolveResult(method_description, xs[b], final_losses[b], iterations[b], function_evaluations[b], converged[b], diverged[b], "") for b in range(batch_size)] trajectories = [t[:-1] + [last_point] * (max_trajectory_length - len(t) + 1) for t, last_point in zip(trajectories, last_points)] trajectory = [] for states in zip(*trajectories): x = self.stack([self.to_float(state.x) for state in states]) residual = self.stack([state.residual for state in states]) iterations = [state.iterations for state in states] function_evaluations = [state.function_evaluations for state in states] converged = [state.converged for state in states] diverged = [state.diverged for state in states] trajectory.append(SolveResult(method_description, x, residual, iterations, function_evaluations, converged, diverged, messages)) return trajectory else: x = self.stack(xs) residual = self.stack(final_losses) return SolveResult(method_description, x, residual, iterations, function_evaluations, converged, diverged, messages) def _minimize_gradient_descent(self, f, x0, atol, max_iter, trj: bool, step_size='adaptive'): assert self.supports(Backend.functional_gradient) assert len(self.staticshape(x0)) == 2 # (batch, parameters) batch_size = self.staticshape(x0)[0] fg = self.functional_gradient(f, [0], get_output=True) method = f"Gradient descent with {self.name}" iterations = self.zeros([batch_size], DType(int, 32)) function_evaluations = self.ones([batch_size], DType(int, 32)) adaptive_step_size = step_size == 'adaptive' if adaptive_step_size: step_size = self.zeros([batch_size]) + 0.1 _, loss, grad = fg(x0) # Evaluate function and gradient diverged = self.any(~self.isfinite(x0), axis=(1,)) converged = self.zeros([batch_size], DType(bool)) trajectory = [SolveResult(method, x0, loss, iterations, function_evaluations, converged, diverged, [""] * batch_size)] if trj else None continue_ = ~converged & ~diverged & (iterations < max_iter) def gd_step(continue_, x, loss, grad, iterations, function_evaluations, step_size, converged, diverged): prev_loss, prev_grad, prev_x = loss, grad, x continue_1 = self.to_int32(continue_) iterations += continue_1 if adaptive_step_size: for i in range(20): dx = - grad * self.expand_dims(step_size * self.to_float(continue_1), -1) next_x = x + dx predicted_loss_decrease = - self.sum(grad * dx, -1) # >= 0 _, next_loss, next_grad = fg(next_x); function_evaluations += continue_1 converged = converged | (self.sum(next_grad ** 2, axis=-1) < atol ** 2) PHI_LOGGER.debug(f"Gradient: {self.numpy(next_grad)} with step_size={self.numpy(step_size)}") actual_loss_decrease = loss - next_loss # we want > 0 # we want actual_loss_decrease to be at least half of predicted_loss_decrease act_pred = self.divide_no_nan(actual_loss_decrease, predicted_loss_decrease) PHI_LOGGER.debug(f"Actual/Predicted: {self.numpy(act_pred)}") step_size_fac = self.clip(self.log(1 + 1.71828182845 * self.exp((act_pred - 0.5) * 2.)), 0.1, 10) PHI_LOGGER.debug(f"step_size *= {self.numpy(step_size_fac)}") step_size *= step_size_fac if self.all((act_pred > 0.4) & (act_pred < 0.9) | converged | diverged): PHI_LOGGER.debug(f"GD minimization: Finished step_size adjustment after {i + 1} tries\n") break else: converged = converged | (abs(actual_loss_decrease) < predicted_loss_decrease) PHI_LOGGER.debug("Backend._minimize_gradient_descent(): No step size found!\n") diverged = diverged | (next_loss > loss) x, loss, grad = next_x, next_loss, next_grad else: x -= grad * self.expand_dims(step_size * self.to_float(continue_1), -1) _, loss, grad = fg(x); function_evaluations += continue_1 diverged = self.any(~self.isfinite(x), axis=(1,)) | (loss > prev_loss) converged = ~diverged & (prev_loss - loss < atol) if trj: trajectory.append(SolveResult(method, self.numpy(x), self.numpy(loss), self.numpy(iterations), self.numpy(function_evaluations), self.numpy(diverged), self.numpy(converged), [""] * batch_size)) continue_ = ~converged & ~diverged & (iterations < max_iter) return continue_, x, loss, grad, iterations, function_evaluations, step_size, converged, diverged not_converged, x, loss, grad, iterations, function_evaluations, step_size, converged, diverged = self.while_loop(gd_step, (continue_, x0, loss, grad, iterations, function_evaluations, step_size, converged, diverged)) if trj: trajectory.append(SolveResult(method, x, loss, iterations, function_evaluations + 1, converged, diverged, [""] * batch_size)) return trajectory else: return SolveResult(method, x, loss, iterations, function_evaluations, converged, diverged, [""] * batch_size) def linear_solve(self, method: str, lin, y, x0, rtol, atol, max_iter, trj: bool) -> SolveResult or List[SolveResult]: """ Solve the system of linear equations A · x = y. This method need not provide a gradient for the operation. Args: method: Which algorithm to use. One of `('auto', 'CG', 'CG-adaptive')`. lin: Linear operation. One of * sparse/dense matrix valid for all instances * tuple/list of sparse/dense matrices for varying matrices along batch, must have the same nonzero locations. * linear function A(x), must be called on all instances in parallel y: target result of A * x. 2nd order tensor (batch, vector) or list of vectors. x0: Initial guess of size (batch, parameters) rtol: Relative tolerance of size (batch,) atol: Absolute tolerance of size (batch,) max_iter: Maximum number of iterations of size (batch,) trj: Whether to record and return the optimization trajectory as a `List[SolveResult]`. Returns: result: `SolveResult` or `List[SolveResult]`, depending on `trj`. """ if method == 'auto': return self.conjugate_gradient_adaptive(lin, y, x0, rtol, atol, max_iter, trj) elif method == 'CG': return self.conjugate_gradient(lin, y, x0, rtol, atol, max_iter, trj) elif method == 'CG-adaptive': return self.conjugate_gradient_adaptive(lin, y, x0, rtol, atol, max_iter, trj) else: raise NotImplementedError(f"Method '{method}' not supported for linear solve.") def conjugate_gradient(self, lin, y, x0, rtol, atol, max_iter, trj: bool) -> SolveResult or List[SolveResult]: """ Standard conjugate gradient algorithm. Signature matches to `Backend.linear_solve()`. """ # Based on "An Introduction to the Conjugate Gradient Method Without the Agonizing Pain" by Jonathan Richard Shewchuk # symbols: dx=d, dy=q, step_size=alpha, residual_squared=delta, residual=r, y=b method = f"Φ-Flow CG ({self.name})" y = self.to_float(y) x0 = self.copy(self.to_float(x0), only_mutable=True) batch_size = self.staticshape(y)[0] tolerance_sq = self.maximum(rtol ** 2 * self.sum(y ** 2, -1), atol ** 2) x = x0 dx = residual = y - self.linear(lin, x) iterations = self.zeros([batch_size], DType(int, 32)) function_evaluations = self.ones([batch_size], DType(int, 32)) residual_squared = rsq0 = self.sum(residual ** 2, -1, keepdims=True) diverged = self.any(~self.isfinite(x), axis=(1,)) converged = self.all(residual_squared <= tolerance_sq, axis=(1,)) trajectory = [SolveResult(method, x, residual, iterations, function_evaluations, converged, diverged, "")] if trj else None continue_ = ~converged & ~diverged & (iterations < max_iter) def cg_loop_body(continue_, it_counter, x, dx, residual_squared, residual, iterations, function_evaluations, _converged, _diverged): continue_1 = self.to_int32(continue_) it_counter += 1; iterations += continue_1 with spatial_derivative_evaluation(1): dy = self.linear(lin, dx); function_evaluations += continue_1 dx_dy = self.sum(dx * dy, axis=-1, keepdims=True) step_size = self.divide_no_nan(residual_squared, dx_dy) step_size *= self.expand_dims(self.to_float(continue_1), -1) # this is not really necessary but ensures batch-independence x += step_size * dx # if it_counter % 50 == 0: # residual = y - self.linear(lin, x); function_evaluations += 1 # else: residual = residual - step_size * dy # in-place subtraction affects convergence residual_squared_old = residual_squared residual_squared = self.sum(residual ** 2, -1, keepdims=True) dx = residual + self.divide_no_nan(residual_squared, residual_squared_old) * dx diverged = self.any(residual_squared / rsq0 > 100, axis=(1,)) & (iterations >= 8) converged = self.all(residual_squared <= tolerance_sq, axis=(1,)) if trajectory is not None: trajectory.append(SolveResult(method, x, residual, iterations, function_evaluations, converged, diverged, "")) x = self.copy(x) iterations = self.copy(iterations) continue_ = ~converged & ~diverged & (iterations < max_iter) return continue_, it_counter, x, dx, residual_squared, residual, iterations, function_evaluations, converged, diverged _, _, x, _, _, residual, iterations, function_evaluations, converged, diverged = self.while_loop(cg_loop_body, (continue_, 0, x, dx, residual_squared, residual, iterations, function_evaluations, converged, diverged)) return trajectory if trj else SolveResult(method, x, residual, iterations, function_evaluations, converged, diverged, "") def conjugate_gradient_adaptive(self, lin, y, x0, rtol, atol, max_iter, trj: bool) -> SolveResult or List[SolveResult]: """ Conjugate gradient algorithm with adaptive step size. Signature matches to `Backend.linear_solve()`. """ # Based on the variant described in "Methods of Conjugate Gradients for Solving Linear Systems" by Magnus R. Hestenes and Eduard Stiefel # https://nvlpubs.nist.gov/nistpubs/jres/049/jresv49n6p409_A1b.pdf method = f"Φ-Flow CG-adaptive ({self.name})" y = self.to_float(y) x0 = self.copy(self.to_float(x0), only_mutable=True) batch_size = self.staticshape(y)[0] tolerance_sq = self.maximum(rtol ** 2 * self.sum(y ** 2, -1), atol ** 2) x = x0 dx = residual = y - self.linear(lin, x) dy = self.linear(lin, dx) iterations = self.zeros([batch_size], DType(int, 32)) function_evaluations = self.ones([batch_size], DType(int, 32)) residual_squared = rsq0 = self.sum(residual ** 2, -1, keepdims=True) diverged = self.any(~self.isfinite(x), axis=(1,)) converged = self.all(residual_squared <= tolerance_sq, axis=(1,)) trajectory = [SolveResult(method, x, residual, iterations, function_evaluations, converged, diverged, "")] if trj else None continue_ = ~converged & ~diverged & (iterations < max_iter) def acg_loop_body(continue_, it_counter, x, dx, dy, residual, iterations, function_evaluations, _converged, _diverged): continue_1 = self.to_int32(continue_) it_counter += 1 iterations += continue_1 dx_dy = self.sum(dx * dy, axis=-1, keepdims=True) step_size = self.divide_no_nan(self.sum(dx * residual, axis=-1, keepdims=True), dx_dy) step_size *= self.expand_dims(self.to_float(continue_1), -1) # this is not really necessary but ensures batch-independence x += step_size * dx # if it_counter % 50 == 0: # Not traceable since Python bool # residual = y - self.linear(lin, x); function_evaluations += 1 # else: residual = residual - step_size * dy # in-place subtraction affects convergence residual_squared = self.sum(residual ** 2, -1, keepdims=True) dx = residual - self.divide_no_nan(self.sum(residual * dy, axis=-1, keepdims=True) * dx, dx_dy) with spatial_derivative_evaluation(1): dy = self.linear(lin, dx); function_evaluations += continue_1 diverged = self.any(residual_squared / rsq0 > 100, axis=(1,)) & (iterations >= 8) converged = self.all(residual_squared <= tolerance_sq, axis=(1,)) if trajectory is not None: trajectory.append(SolveResult(method, x, residual, iterations, function_evaluations, converged, diverged, "")) x = self.copy(x) iterations = self.copy(iterations) continue_ = ~converged & ~diverged & (iterations < max_iter) return continue_, it_counter, x, dx, dy, residual, iterations, function_evaluations, converged, diverged _, _, x, _, _, residual, iterations, function_evaluations, converged, diverged = self.while_loop(acg_loop_body, (continue_, 0, x, dx, dy, residual, iterations, function_evaluations, converged, diverged)) return trajectory if trj else SolveResult(method, x, residual, iterations, function_evaluations, converged, diverged, "") def linear(self, lin, vector): if callable(lin): return lin(vector) elif isinstance(lin, (tuple, list)): for lin_i in lin: lin_shape = self.staticshape(lin_i) assert len(lin_shape) == 2 return self.stack([self.matmul(m, v) for m, v in zip(lin, self.unstack(vector))]) else: lin_shape = self.staticshape(lin) assert len(lin_shape) == 2, f"A must be a matrix but got shape {lin_shape}" return self.matmul(lin, vector) def gradients(self, y, xs: tuple or list, grad_y) -> tuple: raise NotImplementedError(self) def record_gradients(self, xs: tuple or list, persistent=False): raise NotImplementedError(self) def stop_gradient(self, value): raise NotImplementedError(self) def grid_sample(self, grid, coordinates, extrapolation: str): """ Interpolates a regular grid at the specified coordinates. Args: grid: Tensor of shape (batch, spatial..., channel) coordinates: Tensor of floating grid indices of shape (batch, instance..., vector). The last dimension must match `spatial_dims`. The first grid point of dimension i lies at position 0, the last at values.shape[i]-1. extrapolation: Values to use for coordinates outside the grid. One of `('undefined', 'zeros', 'boundary', 'periodic', 'symmetric', 'reflect')`. Returns: sampled values with linear interpolation """ return NotImplemented def variable(self, value): return NotImplemented def ndims(self, tensor): return len(self.staticshape(tensor)) def size(self, array): return self.prod(self.shape(array)) def multi_slice(self, tensor, slices: tuple): """ Args: tensor: value to slice slices: `tuple` of `slice`, `int`, or scalar integer tensors """ return tensor[slices] def batch_gather(self, tensor, batches): if isinstance(batches, int): batches = [batches] return tensor[batches, ...] def unstack(self, tensor, axis=0, keepdims=False) -> tuple: if axis < 0: axis += len(tensor.shape) if axis >= len(tensor.shape) or axis < 0: raise ValueError("Illegal axis value") result = [] for slice_idx in range(tensor.shape[axis]): if keepdims: component = tensor[tuple([slice(slice_idx, slice_idx + 1) if d == axis else slice(None) for d in range(len(tensor.shape))])] else: component = tensor[tuple([slice_idx if d == axis else slice(None) for d in range(len(tensor.shape))])] result.append(component) return tuple(result) def equal(self, x, y): """ Element-wise equality check """ raise NotImplementedError(self) def not_equal(self, x, y): return ~self.equal(x, y) def greater_than(self, x, y): x, y = self.auto_cast(x, y) return x > y def greater_or_equal(self, x, y): x, y = self.auto_cast(x, y) return x >= y def add(self, a, b): a, b = self.auto_cast(a, b) return a + b def sub(self, a, b): a, b = self.auto_cast(a, b) return a - b def mul(self, a, b): a, b = self.auto_cast(a, b) return a * b def div(self, numerator, denominator): numerator, denominator = self.auto_cast(numerator, denominator) return numerator / denominator def pow(self, base, exp): base, exp = self.auto_cast(base, exp) return base ** exp def mod(self, dividend, divisor): dividend, divisor = self.auto_cast(dividend, divisor) return dividend % divisor def and_(self, a, b): a, b = self.auto_cast(a, b) return a & b def or_(self, a, b): a, b = self.auto_cast(a, b) return a | b def xor(self, a, b): a, b = self.auto_cast(a, b) return a ^ b def floordiv(self, a, b): a, b = self.auto_cast(a, b) return a // b
Subclasses
- phi.jax._jax_backend.JaxBackend
- phi.math.backend._numpy_backend.NumPyBackend
- phi.tf._tf_backend.TFBackend
- phi.torch._torch_backend.TorchBackend
Instance variables
var as_registered : phi.math.backend._backend.Backend
-
Expand source code
@property def as_registered(self) -> 'Backend': from phi.math.backend import BACKENDS for backend in BACKENDS: if self.name in backend.name: return backend raise RuntimeError(f"Backend '{self}' is not visible.")
var complex_type : phi.math.backend._dtype.DType
-
Expand source code
@property def complex_type(self) -> DType: return DType(complex, max(64, self.precision))
var float_type : phi.math.backend._dtype.DType
-
Expand source code
@property def float_type(self) -> DType: return DType(float, self.precision)
var name : str
-
Expand source code
@property def name(self) -> str: return self._name
var precision : int
-
Short for math.backend.get_precision()
Expand source code
@property def precision(self) -> int: """ Short for math.backend.get_precision() """ return get_precision()
Methods
def abs(self, x)
-
Expand source code
def abs(self, x): raise NotImplementedError(self)
def add(self, a, b)
-
Expand source code
def add(self, a, b): a, b = self.auto_cast(a, b) return a + b
def all(self, boolean_tensor, axis=None, keepdims=False)
-
Expand source code
def all(self, boolean_tensor, axis=None, keepdims=False): raise NotImplementedError(self)
def and_(self, a, b)
-
Expand source code
def and_(self, a, b): a, b = self.auto_cast(a, b) return a & b
def any(self, boolean_tensor, axis=None, keepdims=False)
-
Expand source code
def any(self, boolean_tensor, axis=None, keepdims=False): raise NotImplementedError(self)
def arccos(self, x)
-
Expand source code
def arccos(self, x): raise NotImplementedError(self)
def arcsin(self, x)
-
Expand source code
def arcsin(self, x): raise NotImplementedError(self)
def as_tensor(self, x, convert_external=True)
-
Converts a tensor-like object to the native tensor representation of this backend. If x is a native tensor of this backend, it is returned without modification. If x is a Python number (numbers.Number instance),
convert_numbers
decides whether to convert it unless the backend cannot handle Python numbers.Note: There may be objects that are considered tensors by this backend but are not native and thus, will be converted by this method.
Args
x
- tensor-like, e.g. list, tuple, Python number, tensor
convert_external
- if False and
x
is a Python number that is understood by this backend, this method returns the number as-is. This can help prevent type clashes like int32 vs int64. (Default value = True)
Returns
tensor representation of
x
Expand source code
def as_tensor(self, x, convert_external=True): """ Converts a tensor-like object to the native tensor representation of this backend. If x is a native tensor of this backend, it is returned without modification. If x is a Python number (numbers.Number instance), `convert_numbers` decides whether to convert it unless the backend cannot handle Python numbers. *Note:* There may be objects that are considered tensors by this backend but are not native and thus, will be converted by this method. Args: x: tensor-like, e.g. list, tuple, Python number, tensor convert_external: if False and `x` is a Python number that is understood by this backend, this method returns the number as-is. This can help prevent type clashes like int32 vs int64. (Default value = True) Returns: tensor representation of `x` """ raise NotImplementedError()
def auto_cast(self, *tensors) ‑> list
-
Determins the appropriate values type resulting from operations involving the tensors as input.
This method is called by the default implementations of basic operators. Backends can override this method to prevent unnecessary casting.
Args
*tensors
- tensors to cast and to consider when determining the common data type
Returns
tensors cast to a common data type
Expand source code
def auto_cast(self, *tensors) -> list: """ Determins the appropriate values type resulting from operations involving the tensors as input. This method is called by the default implementations of basic operators. Backends can override this method to prevent unnecessary casting. Args: *tensors: tensors to cast and to consider when determining the common data type Returns: tensors cast to a common data type """ dtypes = [self.dtype(t) for t in tensors] result_type = self.combine_types(*dtypes) if result_type.kind in (int, float, complex, bool): tensors = [self.cast(t, result_type) for t in tensors] return tensors
def batch_gather(self, tensor, batches)
-
Expand source code
def batch_gather(self, tensor, batches): if isinstance(batches, int): batches = [batches] return tensor[batches, ...]
def batched_gather_nd(self, values, indices)
-
Gathers values from the tensor
values
at locationsindices
. The first dimension ofvalues
andindices
is the batch dimension which must be either equal for both or one for either.Args
values
- tensor of shape (batch, spatial…, channel)
indices
- int tensor of shape (batch, any…, multi_index) where the size of multi_index is values.rank - 2.
Returns
Gathered values as tensor of shape (batch, any…, channel)
Expand source code
def batched_gather_nd(self, values, indices): """ Gathers values from the tensor `values` at locations `indices`. The first dimension of `values` and `indices` is the batch dimension which must be either equal for both or one for either. Args: values: tensor of shape (batch, spatial..., channel) indices: int tensor of shape (batch, any..., multi_index) where the size of multi_index is values.rank - 2. Returns: Gathered values as tensor of shape (batch, any..., channel) """ raise NotImplementedError(self)
def block_until_ready(self, values)
-
Expand source code
def block_until_ready(self, values): pass
def boolean_mask(self, x, mask, axis=0)
-
Args
x
- tensor with any number of dimensions
mask
- 1D mask tensor
axis
- Axis index >= 0
Expand source code
def boolean_mask(self, x, mask, axis=0): """ Args: x: tensor with any number of dimensions mask: 1D mask tensor axis: Axis index >= 0 """ raise NotImplementedError(self)
def call(self, f: Callable, *args, name=None)
-
Calls
f(*args)
and returns the result. This method may be used to register internal calls with the profiler.Usage
choose_backend(key).call(custom_function, *args)
Expand source code
def call(self, f: Callable, *args, name=None): """ Calls `f(*args)` and returns the result. This method may be used to register internal calls with the profiler. Usage: choose_backend(key).call(custom_function, *args) """ return f(*args)
def cast(self, x, dtype: phi.math.backend._dtype.DType)
-
Expand source code
def cast(self, x, dtype: DType): raise NotImplementedError(self)
def ceil(self, x)
-
Expand source code
def ceil(self, x): raise NotImplementedError(self)
def clip(self, x, minimum, maximum)
-
Expand source code
def clip(self, x, minimum, maximum): raise NotImplementedError(self)
def combine_types(self, *dtypes: phi.math.backend._dtype.DType) ‑> phi.math.backend._dtype.DType
-
Expand source code
def combine_types(self, *dtypes: DType) -> DType: return combine_types(*dtypes, fp_precision=self.precision)
def concat(self, values, axis)
-
Expand source code
def concat(self, values, axis): raise NotImplementedError(self)
def conj(self, x)
-
Expand source code
def conj(self, x): raise NotImplementedError(self)
def conjugate_gradient(self, lin, y, x0, rtol, atol, max_iter, trj: bool) ‑> phi.math.backend._backend.SolveResult
-
Standard conjugate gradient algorithm. Signature matches to
Backend.linear_solve()
.Expand source code
def conjugate_gradient(self, lin, y, x0, rtol, atol, max_iter, trj: bool) -> SolveResult or List[SolveResult]: """ Standard conjugate gradient algorithm. Signature matches to `Backend.linear_solve()`. """ # Based on "An Introduction to the Conjugate Gradient Method Without the Agonizing Pain" by Jonathan Richard Shewchuk # symbols: dx=d, dy=q, step_size=alpha, residual_squared=delta, residual=r, y=b method = f"Φ-Flow CG ({self.name})" y = self.to_float(y) x0 = self.copy(self.to_float(x0), only_mutable=True) batch_size = self.staticshape(y)[0] tolerance_sq = self.maximum(rtol ** 2 * self.sum(y ** 2, -1), atol ** 2) x = x0 dx = residual = y - self.linear(lin, x) iterations = self.zeros([batch_size], DType(int, 32)) function_evaluations = self.ones([batch_size], DType(int, 32)) residual_squared = rsq0 = self.sum(residual ** 2, -1, keepdims=True) diverged = self.any(~self.isfinite(x), axis=(1,)) converged = self.all(residual_squared <= tolerance_sq, axis=(1,)) trajectory = [SolveResult(method, x, residual, iterations, function_evaluations, converged, diverged, "")] if trj else None continue_ = ~converged & ~diverged & (iterations < max_iter) def cg_loop_body(continue_, it_counter, x, dx, residual_squared, residual, iterations, function_evaluations, _converged, _diverged): continue_1 = self.to_int32(continue_) it_counter += 1; iterations += continue_1 with spatial_derivative_evaluation(1): dy = self.linear(lin, dx); function_evaluations += continue_1 dx_dy = self.sum(dx * dy, axis=-1, keepdims=True) step_size = self.divide_no_nan(residual_squared, dx_dy) step_size *= self.expand_dims(self.to_float(continue_1), -1) # this is not really necessary but ensures batch-independence x += step_size * dx # if it_counter % 50 == 0: # residual = y - self.linear(lin, x); function_evaluations += 1 # else: residual = residual - step_size * dy # in-place subtraction affects convergence residual_squared_old = residual_squared residual_squared = self.sum(residual ** 2, -1, keepdims=True) dx = residual + self.divide_no_nan(residual_squared, residual_squared_old) * dx diverged = self.any(residual_squared / rsq0 > 100, axis=(1,)) & (iterations >= 8) converged = self.all(residual_squared <= tolerance_sq, axis=(1,)) if trajectory is not None: trajectory.append(SolveResult(method, x, residual, iterations, function_evaluations, converged, diverged, "")) x = self.copy(x) iterations = self.copy(iterations) continue_ = ~converged & ~diverged & (iterations < max_iter) return continue_, it_counter, x, dx, residual_squared, residual, iterations, function_evaluations, converged, diverged _, _, x, _, _, residual, iterations, function_evaluations, converged, diverged = self.while_loop(cg_loop_body, (continue_, 0, x, dx, residual_squared, residual, iterations, function_evaluations, converged, diverged)) return trajectory if trj else SolveResult(method, x, residual, iterations, function_evaluations, converged, diverged, "")
def conjugate_gradient_adaptive(self, lin, y, x0, rtol, atol, max_iter, trj: bool) ‑> phi.math.backend._backend.SolveResult
-
Conjugate gradient algorithm with adaptive step size. Signature matches to
Backend.linear_solve()
.Expand source code
def conjugate_gradient_adaptive(self, lin, y, x0, rtol, atol, max_iter, trj: bool) -> SolveResult or List[SolveResult]: """ Conjugate gradient algorithm with adaptive step size. Signature matches to `Backend.linear_solve()`. """ # Based on the variant described in "Methods of Conjugate Gradients for Solving Linear Systems" by Magnus R. Hestenes and Eduard Stiefel # https://nvlpubs.nist.gov/nistpubs/jres/049/jresv49n6p409_A1b.pdf method = f"Φ-Flow CG-adaptive ({self.name})" y = self.to_float(y) x0 = self.copy(self.to_float(x0), only_mutable=True) batch_size = self.staticshape(y)[0] tolerance_sq = self.maximum(rtol ** 2 * self.sum(y ** 2, -1), atol ** 2) x = x0 dx = residual = y - self.linear(lin, x) dy = self.linear(lin, dx) iterations = self.zeros([batch_size], DType(int, 32)) function_evaluations = self.ones([batch_size], DType(int, 32)) residual_squared = rsq0 = self.sum(residual ** 2, -1, keepdims=True) diverged = self.any(~self.isfinite(x), axis=(1,)) converged = self.all(residual_squared <= tolerance_sq, axis=(1,)) trajectory = [SolveResult(method, x, residual, iterations, function_evaluations, converged, diverged, "")] if trj else None continue_ = ~converged & ~diverged & (iterations < max_iter) def acg_loop_body(continue_, it_counter, x, dx, dy, residual, iterations, function_evaluations, _converged, _diverged): continue_1 = self.to_int32(continue_) it_counter += 1 iterations += continue_1 dx_dy = self.sum(dx * dy, axis=-1, keepdims=True) step_size = self.divide_no_nan(self.sum(dx * residual, axis=-1, keepdims=True), dx_dy) step_size *= self.expand_dims(self.to_float(continue_1), -1) # this is not really necessary but ensures batch-independence x += step_size * dx # if it_counter % 50 == 0: # Not traceable since Python bool # residual = y - self.linear(lin, x); function_evaluations += 1 # else: residual = residual - step_size * dy # in-place subtraction affects convergence residual_squared = self.sum(residual ** 2, -1, keepdims=True) dx = residual - self.divide_no_nan(self.sum(residual * dy, axis=-1, keepdims=True) * dx, dx_dy) with spatial_derivative_evaluation(1): dy = self.linear(lin, dx); function_evaluations += continue_1 diverged = self.any(residual_squared / rsq0 > 100, axis=(1,)) & (iterations >= 8) converged = self.all(residual_squared <= tolerance_sq, axis=(1,)) if trajectory is not None: trajectory.append(SolveResult(method, x, residual, iterations, function_evaluations, converged, diverged, "")) x = self.copy(x) iterations = self.copy(iterations) continue_ = ~converged & ~diverged & (iterations < max_iter) return continue_, it_counter, x, dx, dy, residual, iterations, function_evaluations, converged, diverged _, _, x, _, _, residual, iterations, function_evaluations, converged, diverged = self.while_loop(acg_loop_body, (continue_, 0, x, dx, dy, residual, iterations, function_evaluations, converged, diverged)) return trajectory if trj else SolveResult(method, x, residual, iterations, function_evaluations, converged, diverged, "")
def conv(self, value, kernel, zero_padding=True)
-
Convolve value with kernel. Depending on the tensor rank, the convolution is either 1D (rank=3), 2D (rank=4) or 3D (rank=5). Higher dimensions may not be supported.
Args
value
- tensor of shape (batch_size, in_channel, spatial…)
kernel
- tensor of shape (batch_size or 1, out_channel, in_channel, spatial…)
zero_padding
- If True, pads the edges of
value
with zeros so that the result has the same shape asvalue
.
Returns
Convolution result as tensor of shape (batch_size, out_channel, spatial…)
Expand source code
def conv(self, value, kernel, zero_padding=True): """ Convolve value with kernel. Depending on the tensor rank, the convolution is either 1D (rank=3), 2D (rank=4) or 3D (rank=5). Higher dimensions may not be supported. Args: value: tensor of shape (batch_size, in_channel, spatial...) kernel: tensor of shape (batch_size or 1, out_channel, in_channel, spatial...) zero_padding: If True, pads the edges of `value` with zeros so that the result has the same shape as `value`. Returns: Convolution result as tensor of shape (batch_size, out_channel, spatial...) """ raise NotImplementedError(self)
def coordinates(self, tensor)
-
Returns the coordinates and values of a tensor.
Args
tensor
- Sparse tensor
Returns
coordinates
tuple
of tensor holding the coordinate vectors, i.e. (row, col) for matrices.indices
- Tensor holding the corresponding values
Expand source code
def coordinates(self, tensor): """ Returns the coordinates and values of a tensor. Args: tensor: Sparse tensor Returns: coordinates: `tuple` of tensor holding the coordinate vectors, i.e. (row, col) for matrices. indices: Tensor holding the corresponding values """ raise NotImplementedError(self)
def copy(self, tensor, only_mutable=False)
-
Expand source code
def copy(self, tensor, only_mutable=False): raise NotImplementedError()
def cos(self, x)
-
Expand source code
def cos(self, x): raise NotImplementedError(self)
def csc_matrix(self, column_pointers, row_indices, values, shape: tuple)
-
Create a sparse matrix in compressed sparse column (CSC) format.
Optional feature.
See Also:
Backend.sparse_coo_tensor()
,Backend.csr_matrix()
.Args
column_pointers
- Indices in
values
where any column starts, 1D tensor of lengthcols + 1
row_indices
- Row indices corresponding to
values
. values
- Non-zero values, 1D tensor
shape
- Shape of the full matrix
Returns
Native representation of the sparse matrix
Expand source code
def csc_matrix(self, column_pointers, row_indices, values, shape: tuple): """ Create a sparse matrix in compressed sparse column (CSC) format. Optional feature. See Also: `Backend.sparse_coo_tensor()`, `Backend.csr_matrix()`. Args: column_pointers: Indices in `values` where any column starts, 1D tensor of length `cols + 1` row_indices: Row indices corresponding to `values`. values: Non-zero values, 1D tensor shape: Shape of the full matrix Returns: Native representation of the sparse matrix """ raise NotImplementedError(self)
def csr_matrix(self, column_indices, row_pointers, values, shape: tuple)
-
Create a sparse matrix in compressed sparse row (CSR) format.
Optional feature.
See Also:
Backend.sparse_coo_tensor()
,Backend.csc_matrix()
.Args
column_indices
- Column indices corresponding to
values
, 1D tensor row_pointers
- Indices in
values
where any row starts, 1D tensor of lengthrows + 1
values
- Non-zero values, 1D tensor
shape
- Shape of the full matrix
Returns
Native representation of the sparse matrix
Expand source code
def csr_matrix(self, column_indices, row_pointers, values, shape: tuple): """ Create a sparse matrix in compressed sparse row (CSR) format. Optional feature. See Also: `Backend.sparse_coo_tensor()`, `Backend.csc_matrix()`. Args: column_indices: Column indices corresponding to `values`, 1D tensor row_pointers: Indices in `values` where any row starts, 1D tensor of length `rows + 1` values: Non-zero values, 1D tensor shape: Shape of the full matrix Returns: Native representation of the sparse matrix """ raise NotImplementedError(self)
def cumsum(self, x, axis: int)
-
Expand source code
def cumsum(self, x, axis: int): raise NotImplementedError(self)
def custom_gradient(self, f: Callable, gradient: Callable) ‑> Callable
-
Creates a function based on
f
that uses a custom gradient for backprop.Args
f
- Forward function.
gradient
- Function for backprop. Will be called as
gradient(*d_out)
to compute the gradient off
.
Returns
Function with similar signature and return values as
f
. However, the returned function does not support keyword arguments.Expand source code
def custom_gradient(self, f: Callable, gradient: Callable) -> Callable: """ Creates a function based on `f` that uses a custom gradient for backprop. Args: f: Forward function. gradient: Function for backprop. Will be called as `gradient(*d_out)` to compute the gradient of `f`. Returns: Function with similar signature and return values as `f`. However, the returned function does not support keyword arguments. """ return NotImplemented
def div(self, numerator, denominator)
-
Expand source code
def div(self, numerator, denominator): numerator, denominator = self.auto_cast(numerator, denominator) return numerator / denominator
def divide_no_nan(self, x, y)
-
Computes x/y but returns 0 if y=0.
Expand source code
def divide_no_nan(self, x, y): """ Computes x/y but returns 0 if y=0. """ raise NotImplementedError(self)
def dtype(self, array) ‑> phi.math.backend._dtype.DType
-
Expand source code
def dtype(self, array) -> DType: raise NotImplementedError(self)
def einsum(self, equation, *tensors)
-
Expand source code
def einsum(self, equation, *tensors): raise NotImplementedError(self)
def equal(self, x, y)
-
Element-wise equality check
Expand source code
def equal(self, x, y): """ Element-wise equality check """ raise NotImplementedError(self)
def exp(self, x)
-
Expand source code
def exp(self, x): raise NotImplementedError(self)
def expand_dims(self, a, axis=0, number=1)
-
Expand source code
def expand_dims(self, a, axis=0, number=1): raise NotImplementedError(self)
def fft(self, x, axes: tuple)
-
Computes the n-dimensional FFT along all but the first and last dimensions.
Args
x
- tensor of dimension 3 or higher
axes
- Along which axes to perform the FFT
Returns
Complex tensor
k
Expand source code
def fft(self, x, axes: tuple or list): """ Computes the n-dimensional FFT along all but the first and last dimensions. Args: x: tensor of dimension 3 or higher axes: Along which axes to perform the FFT Returns: Complex tensor `k` """ raise NotImplementedError(self)
def flatten(self, x)
-
Expand source code
def flatten(self, x): return self.reshape(x, (-1,))
def flip(self, value, axes: tuple)
-
Expand source code
def flip(self, value, axes: tuple or list): slices = tuple(slice(None, None, -1 if i in axes else None) for i in range(self.ndims(value))) return value[slices]
def floor(self, x)
-
Expand source code
def floor(self, x): raise NotImplementedError(self)
def floordiv(self, a, b)
-
Expand source code
def floordiv(self, a, b): a, b = self.auto_cast(a, b) return a // b
def from_dlpack(self, capsule)
-
Expand source code
def from_dlpack(self, capsule): raise NotImplementedError()
def functional_gradient(self, f: Callable, wrt: tuple, get_output: bool)
-
Args
f
- Function to differentiate.
wrt
- Argument indices for which to compute the gradient.
get_output
- Whether the derivative function should return the output of
f
in addition to the gradient.
Returns
A function
g
with the same arguments asf
. Ifget_output=True
,g
returns atuple
containing the outputs off
followed by the gradients.Expand source code
def functional_gradient(self, f: Callable, wrt: tuple or list, get_output: bool): """ Args: f: Function to differentiate. wrt: Argument indices for which to compute the gradient. get_output: Whether the derivative function should return the output of `f` in addition to the gradient. Returns: A function `g` with the same arguments as `f`. If `get_output=True`, `g` returns a `tuple`containing the outputs of `f` followed by the gradients. """ raise NotImplementedError(self)
def get_default_device(self) ‑> phi.math.backend._backend.ComputeDevice
-
Expand source code
def get_default_device(self) -> ComputeDevice: return self._default_device
def gradients(self, y, xs: tuple, grad_y) ‑> tuple
-
Expand source code
def gradients(self, y, xs: tuple or list, grad_y) -> tuple: raise NotImplementedError(self)
def greater_or_equal(self, x, y)
-
Expand source code
def greater_or_equal(self, x, y): x, y = self.auto_cast(x, y) return x >= y
def greater_than(self, x, y)
-
Expand source code
def greater_than(self, x, y): x, y = self.auto_cast(x, y) return x > y
def grid_sample(self, grid, coordinates, extrapolation: str)
-
Interpolates a regular grid at the specified coordinates.
Args
grid
- Tensor of shape (batch, spatial…, channel)
coordinates
- Tensor of floating grid indices of shape (batch, instance…, vector).
The last dimension must match
spatial_dims
. The first grid point of dimension i lies at position 0, the last at values.shape[i]-1. extrapolation
- Values to use for coordinates outside the grid.
One of
('undefined', 'zeros', 'boundary', 'periodic', 'symmetric', 'reflect')
.
Returns
sampled values with linear interpolation
Expand source code
def grid_sample(self, grid, coordinates, extrapolation: str): """ Interpolates a regular grid at the specified coordinates. Args: grid: Tensor of shape (batch, spatial..., channel) coordinates: Tensor of floating grid indices of shape (batch, instance..., vector). The last dimension must match `spatial_dims`. The first grid point of dimension i lies at position 0, the last at values.shape[i]-1. extrapolation: Values to use for coordinates outside the grid. One of `('undefined', 'zeros', 'boundary', 'periodic', 'symmetric', 'reflect')`. Returns: sampled values with linear interpolation """ return NotImplemented
def hessian(self, f: Callable, wrt: tuple, get_output: bool, get_gradient: bool) ‑> tuple
-
First dimension of all inputs/outputs of
f
is assumed to be a batch dimension. Element-wise Hessians will be computed along the batch dimension. All other dimensions are parameter dimensions and will appear twice in the Hessian matrices.Args
f
- Function whose first output is a scalar float or complex value.
wrt: get_output: get_gradient:
Returns
Function returning
(f(x), g(x), H(x))
or less depending onget_output
andget_gradient
. The result is always atuple
holding at most these three items.Expand source code
def hessian(self, f: Callable, wrt: tuple or list, get_output: bool, get_gradient: bool) -> tuple: """ First dimension of all inputs/outputs of `f` is assumed to be a batch dimension. Element-wise Hessians will be computed along the batch dimension. All other dimensions are parameter dimensions and will appear twice in the Hessian matrices. Args: f: Function whose first output is a scalar float or complex value. wrt: get_output: get_gradient: Returns: Function returning `(f(x), g(x), H(x))` or less depending on `get_output` and `get_gradient`. The result is always a `tuple` holding at most these three items. """ raise NotImplementedError(self)
def ifft(self, k, axes: tuple)
-
Computes the n-dimensional inverse FFT along all but the first and last dimensions.
Args
k
- tensor of dimension 3 or higher
axes
- Along which axes to perform the inverse FFT
Returns
Complex tensor
x
Expand source code
def ifft(self, k, axes: tuple or list): """ Computes the n-dimensional inverse FFT along all but the first and last dimensions. Args: k: tensor of dimension 3 or higher axes: Along which axes to perform the inverse FFT Returns: Complex tensor `x` """ raise NotImplementedError(self)
def imag(self, x)
-
Expand source code
def imag(self, x): raise NotImplementedError(self)
def is_available(self, tensor) ‑> bool
-
Tests if the value of the tensor is known and can be read at this point. If true,
numpy(tensor)
must return a valid NumPy representation of the value.Tensors are typically available when the backend operates in eager mode.
Args
tensor
- backend-compatible tensor
Returns
bool
Expand source code
def is_available(self, tensor) -> bool: """ Tests if the value of the tensor is known and can be read at this point. If true, `numpy(tensor)` must return a valid NumPy representation of the value. Tensors are typically available when the backend operates in eager mode. Args: tensor: backend-compatible tensor Returns: bool """ raise NotImplementedError()
def is_module(self, obj) ‑> bool
-
Tests if
obj
is of a type that is specific to this backend, e.g. a neural network. IfTrue
, this backend will be chosen for operations involvingobj
.See Also:
Backend.is_tensor()
.Args
obj
- Object to test.
Expand source code
def is_module(self, obj) -> bool: """ Tests if `obj` is of a type that is specific to this backend, e.g. a neural network. If `True`, this backend will be chosen for operations involving `obj`. See Also: `Backend.is_tensor()`. Args: obj: Object to test. """ raise NotImplementedError()
def is_tensor(self, x, only_native=False)
-
An object is considered a native tensor by a backend if no internal conversion is required by backend methods. An object is considered a tensor (nativer or otherwise) by a backend if it is not a struct (e.g. tuple, list) and all methods of the backend accept it as a tensor argument.
If
True
, this backend will be chosen for operations involvingx
.See Also:
Backend.is_module()
.Args
x
- object to check
only_native
- If True, only accepts true native tensor representations, not Python numbers or others that are also supported as tensors (Default value = False)
Returns
bool
- whether
x
is considered a tensor by this backend
Expand source code
def is_tensor(self, x, only_native=False): """ An object is considered a native tensor by a backend if no internal conversion is required by backend methods. An object is considered a tensor (nativer or otherwise) by a backend if it is not a struct (e.g. tuple, list) and all methods of the backend accept it as a tensor argument. If `True`, this backend will be chosen for operations involving `x`. See Also: `Backend.is_module()`. Args: x: object to check only_native: If True, only accepts true native tensor representations, not Python numbers or others that are also supported as tensors (Default value = False) Returns: bool: whether `x` is considered a tensor by this backend """ raise NotImplementedError()
def isfinite(self, x)
-
Expand source code
def isfinite(self, x): raise NotImplementedError(self)
def jacobian(self, f: Callable, wrt: tuple, get_output: bool)
-
Expand source code
def jacobian(self, f: Callable, wrt: tuple or list, get_output: bool): raise NotImplementedError(self)
def jit_compile(self, f: Callable) ‑> Callable
-
Expand source code
def jit_compile(self, f: Callable) -> Callable: return NotImplemented
def jit_compile_grad(self, f, wrt: tuple, get_output: bool)
-
Expand source code
def jit_compile_grad(self, f, wrt: tuple or list, get_output: bool): raise NotImplementedError()
def jit_compile_hessian(self, f, wrt: tuple, get_output: bool, get_gradient: bool)
-
Expand source code
def jit_compile_hessian(self, f, wrt: tuple or list, get_output: bool, get_gradient: bool): raise NotImplementedError()
def linear(self, lin, vector)
-
Expand source code
def linear(self, lin, vector): if callable(lin): return lin(vector) elif isinstance(lin, (tuple, list)): for lin_i in lin: lin_shape = self.staticshape(lin_i) assert len(lin_shape) == 2 return self.stack([self.matmul(m, v) for m, v in zip(lin, self.unstack(vector))]) else: lin_shape = self.staticshape(lin) assert len(lin_shape) == 2, f"A must be a matrix but got shape {lin_shape}" return self.matmul(lin, vector)
def linear_solve(self, method: str, lin, y, x0, rtol, atol, max_iter, trj: bool) ‑> phi.math.backend._backend.SolveResult
-
Solve the system of linear equations A · x = y. This method need not provide a gradient for the operation.
Args
method
- Which algorithm to use. One of
('auto', 'CG', 'CG-adaptive')
. lin
- Linear operation. One of * sparse/dense matrix valid for all instances * tuple/list of sparse/dense matrices for varying matrices along batch, must have the same nonzero locations. * linear function A(x), must be called on all instances in parallel
y
- target result of A * x. 2nd order tensor (batch, vector) or list of vectors.
x0
- Initial guess of size (batch, parameters)
rtol
- Relative tolerance of size (batch,)
atol
- Absolute tolerance of size (batch,)
max_iter
- Maximum number of iterations of size (batch,)
trj
- Whether to record and return the optimization trajectory as a
List[SolveResult]
.
Returns
result
SolveResult
orList[SolveResult]
, depending ontrj
.
Expand source code
def linear_solve(self, method: str, lin, y, x0, rtol, atol, max_iter, trj: bool) -> SolveResult or List[SolveResult]: """ Solve the system of linear equations A · x = y. This method need not provide a gradient for the operation. Args: method: Which algorithm to use. One of `('auto', 'CG', 'CG-adaptive')`. lin: Linear operation. One of * sparse/dense matrix valid for all instances * tuple/list of sparse/dense matrices for varying matrices along batch, must have the same nonzero locations. * linear function A(x), must be called on all instances in parallel y: target result of A * x. 2nd order tensor (batch, vector) or list of vectors. x0: Initial guess of size (batch, parameters) rtol: Relative tolerance of size (batch,) atol: Absolute tolerance of size (batch,) max_iter: Maximum number of iterations of size (batch,) trj: Whether to record and return the optimization trajectory as a `List[SolveResult]`. Returns: result: `SolveResult` or `List[SolveResult]`, depending on `trj`. """ if method == 'auto': return self.conjugate_gradient_adaptive(lin, y, x0, rtol, atol, max_iter, trj) elif method == 'CG': return self.conjugate_gradient(lin, y, x0, rtol, atol, max_iter, trj) elif method == 'CG-adaptive': return self.conjugate_gradient_adaptive(lin, y, x0, rtol, atol, max_iter, trj) else: raise NotImplementedError(f"Method '{method}' not supported for linear solve.")
def linspace(self, start, stop, number)
-
Expand source code
def linspace(self, start, stop, number): raise NotImplementedError(self)
def list_devices(self, device_type: str = None) ‑> List[phi.math.backend._backend.ComputeDevice]
-
Fetches information about all available compute devices this backend can use.
Implementations:
- NumPy:
os.cpu_count
- PyTorch:
torch.cuda.get_device_properties
- TensorFlow:
tensorflow.python.client.device_lib.list_local_devices
- Jax:
jax.devices
See Also:
Backend.set_default_device()
.Args
device_type
- (optional) Return only devices of this type, e.g.
'GPU'
or'CPU'
. SeeComputeDevice.device_type
.
Returns
list
of all currently available devices.Expand source code
def list_devices(self, device_type: str or None = None) -> List[ComputeDevice]: """ Fetches information about all available compute devices this backend can use. Implementations: * NumPy: [`os.cpu_count`](https://docs.python.org/3/library/os.html#os.cpu_count) * PyTorch: [`torch.cuda.get_device_properties`](https://pytorch.org/docs/stable/cuda.html#torch.cuda.get_device_properties) * TensorFlow: `tensorflow.python.client.device_lib.list_local_devices` * Jax: [`jax.devices`](https://jax.readthedocs.io/en/latest/jax.html#jax.devices) See Also: `Backend.set_default_device()`. Args: device_type: (optional) Return only devices of this type, e.g. `'GPU'` or `'CPU'`. See `ComputeDevice.device_type`. Returns: `list` of all currently available devices. """ raise NotImplementedError()
- NumPy:
def log(self, x)
-
Natural logarithm
Expand source code
def log(self, x): """ Natural logarithm """ raise NotImplementedError(self)
def log10(self, x)
-
Expand source code
def log10(self, x): raise NotImplementedError(self)
def log2(self, x)
-
Expand source code
def log2(self, x): raise NotImplementedError(self)
def matmul(self, A, b)
-
Expand source code
def matmul(self, A, b): raise NotImplementedError(self)
def max(self, x, axis=None, keepdims=False)
-
Expand source code
def max(self, x, axis=None, keepdims=False): raise NotImplementedError(self)
def maximum(self, a, b)
-
Expand source code
def maximum(self, a, b): raise NotImplementedError(self)
def mean(self, value, axis=None, keepdims=False)
-
Expand source code
def mean(self, value, axis=None, keepdims=False): raise NotImplementedError(self)
def meshgrid(self, *coordinates)
-
Expand source code
def meshgrid(self, *coordinates): raise NotImplementedError(self)
def min(self, x, axis=None, keepdims=False)
-
Expand source code
def min(self, x, axis=None, keepdims=False): raise NotImplementedError(self)
def minimize(self, method: str, f, x0, atol, max_iter, trj: bool)
-
Expand source code
def minimize(self, method: str, f, x0, atol, max_iter, trj: bool): if method == 'GD': return self._minimize_gradient_descent(f, x0, atol, max_iter, trj) from scipy.optimize import OptimizeResult, minimize from threading import Thread assert self.supports(Backend.functional_gradient) x0 = self.numpy(x0) assert x0.ndim == 2 # (batch, parameters) atol = self.numpy(atol) max_iter = self.numpy(max_iter) batch_size = x0.shape[0] fg = self.functional_gradient(f, [0], get_output=True) method_description = f"SciPy {method} with {self.name}" iterations = [0] * batch_size function_evaluations = [0] * batch_size xs = [None] * batch_size final_losses = [None] * batch_size converged = [False] * batch_size diverged = [False] * batch_size messages = [""] * batch_size f_inputs = [None] * batch_size f_b_losses = None f_b_losses_np = None f_grad_np = None f_input_available = Barrier(batch_size + 1) f_output_available = Barrier(batch_size + 1) finished = [False] * batch_size all_finished = False trajectories = [[] for _ in range(batch_size)] if trj else None threads = [] for b in range(batch_size): # Run each independent example as a scipy minimization in a new thread def b_thread(b=b): recent_b_losses = [] def b_fun(x: numpy.ndarray): function_evaluations[b] += 1 f_inputs[b] = self.as_tensor(x, convert_external=True) f_input_available.wait() f_output_available.wait() recent_b_losses.append(f_b_losses[b]) if final_losses[b] is None: # first evaluation final_losses[b] = f_b_losses[b] if trajectories is not None: trajectories[b].append(SolveResult(method_description, x0[b], f_b_losses[b], 0, 1, False, False, "")) return f_b_losses_np[b], f_grad_np[b] def callback(x, *args): # L-BFGS-B only passes x but the documentation says (x, state) iterations[b] += 1 loss = min(recent_b_losses) recent_b_losses.clear() final_losses[b] = loss if trajectories is not None: trajectories[b].append(SolveResult(method_description, x, loss, iterations[b], function_evaluations[b], False, False, "")) res = minimize(fun=b_fun, x0=x0[b], jac=True, method=method, tol=atol[b], options={'maxiter': max_iter[b]}, callback=callback) assert isinstance(res, OptimizeResult) # res.nit, res.nfev xs[b] = res.x converged[b] = res.success diverged[b] = res.status not in (0, 1) # 0=success messages[b] = res.message finished[b] = True while not all_finished: f_input_available.wait() f_output_available.wait() b_thread = Thread(target=b_thread) threads.append(b_thread) b_thread.start() while True: f_input_available.wait() if all(finished): all_finished = True f_output_available.wait() break _, f_b_losses, f_grad = fg(self.stack(f_inputs)) # Evaluate function and gradient f_b_losses_np = self.numpy(f_b_losses).astype(numpy.float64) f_grad_np = self.numpy(f_grad).astype(numpy.float64) f_output_available.wait() for b_thread in threads: b_thread.join() # make sure threads exit correctly if trj: max_trajectory_length = max([len(t) for t in trajectories]) last_points = [SolveResult(method_description, xs[b], final_losses[b], iterations[b], function_evaluations[b], converged[b], diverged[b], "") for b in range(batch_size)] trajectories = [t[:-1] + [last_point] * (max_trajectory_length - len(t) + 1) for t, last_point in zip(trajectories, last_points)] trajectory = [] for states in zip(*trajectories): x = self.stack([self.to_float(state.x) for state in states]) residual = self.stack([state.residual for state in states]) iterations = [state.iterations for state in states] function_evaluations = [state.function_evaluations for state in states] converged = [state.converged for state in states] diverged = [state.diverged for state in states] trajectory.append(SolveResult(method_description, x, residual, iterations, function_evaluations, converged, diverged, messages)) return trajectory else: x = self.stack(xs) residual = self.stack(final_losses) return SolveResult(method_description, x, residual, iterations, function_evaluations, converged, diverged, messages)
def minimum(self, a, b)
-
Expand source code
def minimum(self, a, b): raise NotImplementedError(self)
def mod(self, dividend, divisor)
-
Expand source code
def mod(self, dividend, divisor): dividend, divisor = self.auto_cast(dividend, divisor) return dividend % divisor
def mul(self, a, b)
-
Expand source code
def mul(self, a, b): a, b = self.auto_cast(a, b) return a * b
def multi_slice(self, tensor, slices: tuple)
-
Args
tensor
- value to slice
slices
tuple
ofslice
,int
, or scalar integer tensors
Expand source code
def multi_slice(self, tensor, slices: tuple): """ Args: tensor: value to slice slices: `tuple` of `slice`, `int`, or scalar integer tensors """ return tensor[slices]
def ndims(self, tensor)
-
Expand source code
def ndims(self, tensor): return len(self.staticshape(tensor))
def nonzero(self, values)
-
Args
values
- Tensor with only spatial dimensions
Returns
non-zero multi-indices as tensor of shape (nnz, vector)
Expand source code
def nonzero(self, values): """ Args: values: Tensor with only spatial dimensions Returns: non-zero multi-indices as tensor of shape (nnz, vector) """ raise NotImplementedError(self)
def not_equal(self, x, y)
-
Expand source code
def not_equal(self, x, y): return ~self.equal(x, y)
def numpy(self, tensor) ‑> numpy.ndarray
-
Returns a NumPy representation of the given tensor. If
tensor
is already a NumPy array, it is returned without modification.This method raises an error if the value of the tensor is not known at this point, e.g. because it represents a node in a graph. Use
is_available(tensor)
to check if the value can be represented as a NumPy array.Args
tensor
- backend-compatible tensor
Returns
NumPy representation of the values stored in the tensor
Expand source code
def numpy(self, tensor) -> numpy.ndarray: """ Returns a NumPy representation of the given tensor. If `tensor` is already a NumPy array, it is returned without modification. This method raises an error if the value of the tensor is not known at this point, e.g. because it represents a node in a graph. Use `is_available(tensor)` to check if the value can be represented as a NumPy array. Args: tensor: backend-compatible tensor Returns: NumPy representation of the values stored in the tensor """ raise NotImplementedError()
def ones(self, shape, dtype: phi.math.backend._dtype.DType = None)
-
Expand source code
def ones(self, shape, dtype: DType = None): raise NotImplementedError(self)
def ones_like(self, tensor)
-
Expand source code
def ones_like(self, tensor): raise NotImplementedError(self)
def or_(self, a, b)
-
Expand source code
def or_(self, a, b): a, b = self.auto_cast(a, b) return a | b
def pad(self, value, pad_width, mode: str = 'constant', constant_values=0)
-
Pad a tensor with values as specified by
mode
andconstant_values
.If the mode is not supported, returns NotImplemented.
Args
value
- tensor
pad_width
- 2D tensor specifying the number of values padded to the edges of each axis in the form [[axis 0 lower, axis 0 upper], …] including batch and component axes.
mode
- constant', 'boundary', 'periodic', 'symmetric', 'reflect'
constant_values
- used for out-of-bounds points if mode='constant' (Default value = 0)
mode
- str: (Default value = 'constant')
Returns
padded tensor or NotImplemented
Expand source code
def pad(self, value, pad_width, mode: str = 'constant', constant_values=0): """ Pad a tensor with values as specified by `mode` and `constant_values`. If the mode is not supported, returns NotImplemented. Args: value: tensor pad_width: 2D tensor specifying the number of values padded to the edges of each axis in the form [[axis 0 lower, axis 0 upper], ...] including batch and component axes. mode: constant', 'boundary', 'periodic', 'symmetric', 'reflect' constant_values: used for out-of-bounds points if mode='constant' (Default value = 0) mode: str: (Default value = 'constant') Returns: padded tensor or NotImplemented """ raise NotImplementedError(self)
def pow(self, base, exp)
-
Expand source code
def pow(self, base, exp): base, exp = self.auto_cast(base, exp) return base ** exp
def prefers_channels_last(self) ‑> bool
-
Expand source code
def prefers_channels_last(self) -> bool: raise NotImplementedError()
def prod(self, value, axis=None)
-
Expand source code
def prod(self, value, axis=None): raise NotImplementedError(self)
def quantile(self, x, quantiles)
-
Reduces the last / inner axis of x.
Args
x
- Tensor
quantiles
- List or 1D tensor of quantiles to compute.
Returns
Tensor with shape (quantiles, *x.shape[:-1])
Expand source code
def quantile(self, x, quantiles): """ Reduces the last / inner axis of x. Args: x: Tensor quantiles: List or 1D tensor of quantiles to compute. Returns: Tensor with shape (quantiles, *x.shape[:-1]) """ raise NotImplementedError(self)
def random_normal(self, shape, dtype: phi.math.backend._dtype.DType)
-
Float tensor of selected precision containing random values sampled from a normal distribution with mean 0 and std 1.
Expand source code
def random_normal(self, shape, dtype: DType): """ Float tensor of selected precision containing random values sampled from a normal distribution with mean 0 and std 1. """ raise NotImplementedError(self)
def random_uniform(self, shape, low, high, dtype: phi.math.backend._dtype.DType)
-
Float tensor of selected precision containing random values in the range [0, 1)
Expand source code
def random_uniform(self, shape, low, high, dtype: DType or None): """ Float tensor of selected precision containing random values in the range [0, 1) """ raise NotImplementedError(self)
def range(self, start, limit=None, delta=1, dtype: phi.math.backend._dtype.DType = int32)
-
Expand source code
def range(self, start, limit=None, delta=1, dtype: DType = DType(int, 32)): raise NotImplementedError(self)
def real(self, x)
-
Expand source code
def real(self, x): raise NotImplementedError(self)
def record_gradients(self, xs: tuple, persistent=False)
-
Expand source code
def record_gradients(self, xs: tuple or list, persistent=False): raise NotImplementedError(self)
def reshape(self, value, shape)
-
Expand source code
def reshape(self, value, shape): raise NotImplementedError(self)
def round(self, x)
-
Expand source code
def round(self, x): raise NotImplementedError(self)
def scatter(self, base_grid, indices, values, mode: str)
-
Depending on
mode
, performs scatter_update or scatter_add.Args
base_grid
- Tensor into which scatter values are inserted at indices. Tensor of shape (batch_size, spatial…, channels)
indices
- Tensor of shape (batch_size or 1, update_count, index_vector)
values
- Values to scatter at indices. Tensor of shape (batch_size or 1, update_count or 1, channels or 1)
mode
- One of ('update', 'add')
Returns
Copy of base_grid with values at
indices
updated byvalues
.Expand source code
def scatter(self, base_grid, indices, values, mode: str): """ Depending on `mode`, performs scatter_update or scatter_add. Args: base_grid: Tensor into which scatter values are inserted at indices. Tensor of shape (batch_size, spatial..., channels) indices: Tensor of shape (batch_size or 1, update_count, index_vector) values: Values to scatter at indices. Tensor of shape (batch_size or 1, update_count or 1, channels or 1) mode: One of ('update', 'add') Returns: Copy of base_grid with values at `indices` updated by `values`. """ raise NotImplementedError(self)
def seed(self, seed: int)
-
Expand source code
def seed(self, seed: int): raise NotImplementedError()
def set_default_device(self, device: phi.math.backend._backend.ComputeDevice) ‑> bool
-
Sets the device new tensors will be allocated on. This function will do nothing if the target device type is not available.
See Also:
Backend.list_devices()
,Backend.get_default_device()
.Args
device
ComputeDevice
or device type asstr
, such as'CPU'
or'GPU'
.
Returns
bool
whether the device was successfully set.Expand source code
def set_default_device(self, device: ComputeDevice or str) -> bool: """ Sets the device new tensors will be allocated on. This function will do nothing if the target device type is not available. See Also: `Backend.list_devices()`, `Backend.get_default_device()`. Args: device: `ComputeDevice` or device type as `str`, such as `'CPU'` or `'GPU'`. Returns: `bool` whether the device was successfully set. """ if isinstance(device, str): devices = self.list_devices(device) if not devices: warnings.warn(f"{self.name}: Cannot select '{device}' because no device of this type is available.", RuntimeWarning) return False device = devices[0] assert device.backend is self, f"Cannot set default device to {device.name} for backend {self.name} because the devices belongs to backend {device.backend.name}" self._default_device = device return True
def shape(self, tensor)
-
Returns the shape of a tensor. The shape is iterable and implements
len()
. For non-eager tensors, undefined dimensions should return a placeholder value representing the size.See Also:
Backend.staticshape()
.Args
tensor
- Native tensor compatible with this backend.
Returns
Shape of
tensor
Expand source code
def shape(self, tensor): """ Returns the shape of a tensor. The shape is iterable and implements `len()`. For non-eager tensors, undefined dimensions should return a placeholder value representing the size. See Also: `Backend.staticshape()`. Args: tensor: Native tensor compatible with this backend. Returns: Shape of `tensor` """ raise NotImplementedError(self)
def sigmoid(self, x)
-
Expand source code
def sigmoid(self, x): return 1 / (1 + self.exp(-x))
def sign(self, x)
-
Expand source code
def sign(self, x): raise NotImplementedError(self)
def sin(self, x)
-
Expand source code
def sin(self, x): raise NotImplementedError(self)
def size(self, array)
-
Expand source code
def size(self, array): return self.prod(self.shape(array))
def sparse_coo_tensor(self, indices: tuple, values, shape: tuple)
-
Create a sparse matrix in coordinate list (COO) format.
Optional feature.
See Also:
Backend.csr_matrix()
,Backend.csc_matrix()
.Args
indices
- 2D tensor of shape
(2, n)
or tuple/list of two 1D tensors(rows, cols)
. values
- 1D values tensor matching
indices
shape
- Shape of the sparse matrix
Returns
Native representation of the sparse matrix
Expand source code
def sparse_coo_tensor(self, indices: tuple or list, values, shape: tuple): """ Create a sparse matrix in coordinate list (COO) format. Optional feature. See Also: `Backend.csr_matrix()`, `Backend.csc_matrix()`. Args: indices: 2D tensor of shape `(2, n)` or tuple/list of two 1D tensors `(rows, cols)`. values: 1D values tensor matching `indices` shape: Shape of the sparse matrix Returns: Native representation of the sparse matrix """ raise NotImplementedError(self)
def sqrt(self, x)
-
Expand source code
def sqrt(self, x): raise NotImplementedError(self)
def stack(self, values, axis=0)
-
Expand source code
def stack(self, values, axis=0): raise NotImplementedError(self)
def staticshape(self, tensor) ‑> tuple
-
Evaluates the static shape of a native tensor. If the tensor is eager, the shape is a
tuple[int]
. For placeholder tensors, unknown dimensions are represented asNone
.See Also:
Backend.shape()
.Args
tensor
- Native tensor compatible with this backend.
Returns
tuple
of sizes. Each size is anint
if the size is defined, elseNone
.Expand source code
def staticshape(self, tensor) -> tuple: """ Evaluates the static shape of a native tensor. If the tensor is eager, the shape is a `tuple[int]`. For placeholder tensors, unknown dimensions are represented as `None`. See Also: `Backend.shape()`. Args: tensor: Native tensor compatible with this backend. Returns: `tuple` of sizes. Each size is an `int` if the size is defined, else `None`. """ raise NotImplementedError(self)
def std(self, x, axis=None, keepdims=False)
-
Expand source code
def std(self, x, axis=None, keepdims=False): raise NotImplementedError(self)
def stop_gradient(self, value)
-
Expand source code
def stop_gradient(self, value): raise NotImplementedError(self)
def sub(self, a, b)
-
Expand source code
def sub(self, a, b): a, b = self.auto_cast(a, b) return a - b
def sum(self, value, axis=None, keepdims=False)
-
Expand source code
def sum(self, value, axis=None, keepdims=False): raise NotImplementedError(self)
def supports(self, feature: str) ‑> bool
-
Tests if this backend supports the given feature. Features correspond to a method of this backend that must be implemented if the feature is supported.
Possible features:
sparse_coo_tensor
- `gradients
Args
feature
str
or unbound Backend method, e.g.Backend.sparse_coo_tensor()
Returns
Whether the feature is supported.
Expand source code
def supports(self, feature: str or Callable) -> bool: """ Tests if this backend supports the given feature. Features correspond to a method of this backend that must be implemented if the feature is supported. Possible features: * `sparse_coo_tensor` * `gradients Args: feature: `str` or unbound Backend method, e.g. `Backend.sparse_coo_tensor` Returns: Whether the feature is supported. """ feature = feature if isinstance(feature, str) else feature.__name__ if not hasattr(Backend, feature): raise ValueError(f"Not a valid feature: '{feature}'") backend_fun = getattr(Backend, feature) impl_fun = getattr(self.__class__, feature) return impl_fun is not backend_fun
def tan(self, x)
-
Expand source code
def tan(self, x): raise NotImplementedError(self)
def tensordot(self, a, a_axes: tuple, b, b_axes: tuple)
-
Multiply-sum-reduce a_axes of a with b_axes of b.
Expand source code
def tensordot(self, a, a_axes: tuple or list, b, b_axes: tuple or list): """ Multiply-sum-reduce a_axes of a with b_axes of b. """ raise NotImplementedError(self)
def tile(self, value, multiples)
-
Repeats the tensor along each axis the number of times given by multiples. If
multiples
has more dimensions thanvalue
, these dimensions are added tovalue
as outer dimensions.Args
value
- tensor
multiples
- tuple or list of integers
Returns
tile tensor
Expand source code
def tile(self, value, multiples): """ Repeats the tensor along each axis the number of times given by multiples. If `multiples` has more dimensions than `value`, these dimensions are added to `value` as outer dimensions. Args: value: tensor multiples: tuple or list of integers Returns: tile tensor """ raise NotImplementedError(self)
def to_complex(self, x)
-
Expand source code
def to_complex(self, x): return self.cast(x, DType(complex, max(64, self.precision * 2)))
def to_dlpack(self, tensor)
-
Expand source code
def to_dlpack(self, tensor): raise NotImplementedError()
def to_float(self, x)
-
Converts a tensor to floating point values with precision equal to the currently set default precision.
See Also:
Backend.precision
.If
x
is mutable and of the correct floating type, returns a copy ofx
.To convert float tensors to the backend precision but leave non-float tensors untouched, use
Backend.as_tensor()
.Args
x
- tensor of bool, int or float
Returns
Values of
x
as float tensorExpand source code
def to_float(self, x): """ Converts a tensor to floating point values with precision equal to the currently set default precision. See Also: `Backend.precision()`. If `x` is mutable and of the correct floating type, returns a copy of `x`. To convert float tensors to the backend precision but leave non-float tensors untouched, use `Backend.as_tensor()`. Args: x: tensor of bool, int or float Returns: Values of `x` as float tensor """ return self.cast(x, self.float_type)
def to_int32(self, x)
-
Expand source code
def to_int32(self, x): return self.cast(x, DType(int, 32))
def to_int64(self, x)
-
Expand source code
def to_int64(self, x): return self.cast(x, DType(int, 64))
def transpose(self, tensor, axes)
-
Expand source code
def transpose(self, tensor, axes): raise NotImplementedError()
def unstack(self, tensor, axis=0, keepdims=False) ‑> tuple
-
Expand source code
def unstack(self, tensor, axis=0, keepdims=False) -> tuple: if axis < 0: axis += len(tensor.shape) if axis >= len(tensor.shape) or axis < 0: raise ValueError("Illegal axis value") result = [] for slice_idx in range(tensor.shape[axis]): if keepdims: component = tensor[tuple([slice(slice_idx, slice_idx + 1) if d == axis else slice(None) for d in range(len(tensor.shape))])] else: component = tensor[tuple([slice_idx if d == axis else slice(None) for d in range(len(tensor.shape))])] result.append(component) return tuple(result)
def variable(self, value)
-
Expand source code
def variable(self, value): return NotImplemented
def where(self, condition, x=None, y=None)
-
Expand source code
def where(self, condition, x=None, y=None): raise NotImplementedError(self)
def while_loop(self, loop: Callable, values: tuple)
-
while any(values[0]): values = loop(*values) return values
This operation does not support backpropagation.
Args
loop
- Loop function, must return a
tuple
with entries equal tovalues
in shape and data type. values
- Initial values of loop variables.
Returns
Loop variables upon loop completion.
Expand source code
def while_loop(self, loop: Callable, values: tuple): """ ```python while any(values[0]): values = loop(*values) return values ``` This operation does not support backpropagation. Args: loop: Loop function, must return a `tuple` with entries equal to `values` in shape and data type. values: Initial values of loop variables. Returns: Loop variables upon loop completion. """ raise NotImplementedError(self)
def xor(self, a, b)
-
Expand source code
def xor(self, a, b): a, b = self.auto_cast(a, b) return a ^ b
def zeros(self, shape, dtype: phi.math.backend._dtype.DType = None)
-
Expand source code
def zeros(self, shape, dtype: DType = None): raise NotImplementedError(self)
def zeros_like(self, tensor)
-
Expand source code
def zeros_like(self, tensor): raise NotImplementedError(self)
class ComputeDevice
-
A physical device that can be selected to perform backend computations.
Expand source code
class ComputeDevice: """ A physical device that can be selected to perform backend computations. """ def __init__(self, backend: 'Backend', name: str, device_type: str, memory: int, processor_count: int, description: str, ref=None): self.name: str = name """ Name of the compute device. CPUs are typically called `'CPU'`. """ self.device_type: str = device_type """ Type of device such as `'CPU'`, `'GPU'` or `'TPU'`. """ self.memory: int = memory """ Maximum memory of the device that can be allocated (in bytes). -1 for n/a. """ self.processor_count: int = processor_count """ Number of CPU cores or GPU multiprocessors. -1 for n/a. """ self.description: str = description """ Further information about the device such as driver version. """ self.ref = ref """ (Optional) Reference to the internal device representation. """ self.backend: 'Backend' = backend """ Backend that this device belongs to. Different backends represent the same device with different objects. """ def __repr__(self): mem = f"{(self.memory / 1024 ** 2):.0f} MB" if self.memory > 0 else "memory: n/a" pro = f"{self.processor_count} processors" if self.processor_count > 0 else "processors: n/a" ref = f" '{self.ref}'" if isinstance(self.ref, str) else "" descr = self.description.replace('\n', ' ') if len(descr) > 30: descr = descr[:28] + "..." return f"{self.backend} device '{self.name}' ({self.device_type}{ref}) | {mem} | {pro} | {descr}"
Instance variables
var backend
-
Backend that this device belongs to. Different backends represent the same device with different objects.
var description
-
Further information about the device such as driver version.
var device_type
-
Type of device such as
'CPU'
,'GPU'
or'TPU'
. var memory
-
Maximum memory of the device that can be allocated (in bytes). -1 for n/a.
var name
-
Name of the compute device. CPUs are typically called
'CPU'
. var processor_count
-
Number of CPU cores or GPU multiprocessors. -1 for n/a.
var ref
-
(Optional) Reference to the internal device representation.
class NoBackendFound
-
Thrown by
choose_backend()
if no backend can handle the given values.Expand source code
class NoBackendFound(Exception): """ Thrown by `choose_backend` if no backend can handle the given values. """ def __init__(self, msg): Exception.__init__(self, msg)
Ancestors
- builtins.Exception
- builtins.BaseException
class Profile
-
Stores information about calls to backends and their timing.
Profile may be created through
profile()
orprofile_function()
.Profiles can be printed or saved to disc.
Expand source code
class Profile: """ Stores information about calls to backends and their timing. Profile may be created through `profile()` or `profile_function()`. Profiles can be printed or saved to disc. """ def __init__(self, trace: bool, backends: tuple or list, subtract_trace_time: bool): self._start = perf_counter() self._stop = None self._root = ExtCall(None, "", 0, "", "", "", -1) self._last_ext_call = self._root self._messages = [] self._trace = trace self._backend_calls = [] self._retime_index = -1 self._accumulating = False self._backends = backends self._subtract_trace_time = subtract_trace_time self._total_trace_time = 0 def _add_call(self, backend_call: BackendCall, args: tuple, kwargs: dict, result): if self._retime_index >= 0: prev_call = self._backend_calls[self._retime_index] assert prev_call._function_name == backend_call._function_name if self._accumulating: prev_call._start += backend_call._start prev_call._stop += backend_call._stop else: prev_call._start = backend_call._start prev_call._stop = backend_call._stop self._retime_index = (self._retime_index + 1) % len(self._backend_calls) else: self._backend_calls.append(backend_call) args = {i: arg for i, arg in enumerate(args)} args.update(kwargs) backend_call.add_arg("Inputs", _format_values(args, backend_call._backend)) if isinstance(result, (tuple, list)): backend_call.add_arg("Outputs", _format_values({i: res for i, res in enumerate(result)}, backend_call._backend)) else: backend_call.add_arg("Outputs", _format_values({0: result}, backend_call._backend)) if self._trace: stack = inspect.stack()[2:] call = self._last_ext_call.common_call(stack) for i in range(call._level, len(stack)): stack_frame = stack[len(stack) - i - 1] name = ExtCall.determine_name(stack_frame) # if len(stack) - i > 1 else "" sub_call = ExtCall(call, name, i + 1, stack_frame.function, stack_frame.code_context, stack_frame.filename, stack_frame.lineno) call.add(sub_call) call = sub_call call.add(backend_call) self._last_ext_call = call if self._subtract_trace_time: delta_trace_time = perf_counter() - backend_call._stop backend_call._start -= self._total_trace_time backend_call._stop -= self._total_trace_time self._total_trace_time += delta_trace_time def _finish(self): self._stop = perf_counter() self._children_to_properties() @property def duration(self) -> float: """ Total time passed from creation of the profile to the end of the last operation. """ return self._stop - self._start if self._stop is not None else None def print(self, min_duration=1e-3, code_col=80, code_len=50): """ Prints this profile to the console. Args: min_duration: Hides elements with less time spent on backend calls than `min_duration` (seconds) code_col: Formatting option for where the context code is printed. code_len: Formatting option for cropping the context code """ print(f"Profile: {self.duration:.4f} seconds total. Skipping elements shorter than {1000 * min_duration:.2f} ms") if self._messages: print("External profiling:") for message in self._messages: print(f" {message}") print() self._root.print(min_duration=min_duration, code_col=code_col, code_len=code_len) def save(self, json_file: str): """ Saves this profile to disc using the *trace event format* described at https://docs.google.com/document/d/1CvAClvFfyA5R-PhYUmn5OOQtYMH4h6I0nSsKchNAySU/edit This file can be viewed with external applications such as Google chrome. Args: json_file: filename """ data = [ {'name': "process_name", 'ph': 'M', 'pid': 0, 'tid': 0, "args": {"name": "0 Python calls"}}, {'name': "process_name", 'ph': 'M', 'pid': 1, 'tid': 1, "args": {"name": "1 Operations"}}, ] + [ {'name': "thread_name", 'ph': 'M', 'pid': 1, 'tid': i + 1, "args": {"name": backend.name}} for i, backend in enumerate(self._backends) ] if self._trace: if len(self._root._children) > 0: data.extend(self._root.trace_json_events()) else: data.extend(sum([call.trace_json_events(()) for call in self._backend_calls], [])) with open(json_file, 'w') as file: json.dump(data, file) save_trace = save def _children_to_properties(self): children = self._root.children_to_properties() for name, child in children.items(): setattr(self, name, child) def add_external_message(self, message: str): """ Stores an external message in this profile. External messages are printed in `Profile.print()`. """ self._messages.append(message) @contextmanager def retime(self): """ To be used in `with` statements, `with prof.retime(): ...`. Updates this profile by running the same operations again but without tracing. This gives a much better indication of the true timing. The code within the `with` block must perform the same operations as the code that created this profile. *Warning:* Internal caching may reduce the number of operations after the first time a function is called. To prevent this, run the function before profiling it, see `warmup` in `profile_function()`. """ self._retime_index = 0 restore_data = _start_profiling(self, self._backends) try: yield None finally: _stop_profiling(self, *restore_data) assert self._retime_index == 0, f"Number of calls during retime did not match original profile, originally {len(self._backend_calls)}, now {self._retime_index}, " self._retime_index = -1 @contextmanager def _accumulate_average(self, n): self._retime_index = 0 self._accumulating = True restore_data = _start_profiling(self, self._backends) try: yield None finally: _stop_profiling(self, *restore_data) assert self._retime_index == 0, f"Number of calls during retime did not match original profile, originally {len(self._backend_calls)}, now {self._retime_index}, " self._retime_index = -1 for call in self._backend_calls: call._start /= n call._stop /= n self._accumulating = False
Instance variables
var duration : float
-
Total time passed from creation of the profile to the end of the last operation.
Expand source code
@property def duration(self) -> float: """ Total time passed from creation of the profile to the end of the last operation. """ return self._stop - self._start if self._stop is not None else None
Methods
def add_external_message(self, message: str)
-
Stores an external message in this profile. External messages are printed in
Profile.print()
.Expand source code
def add_external_message(self, message: str): """ Stores an external message in this profile. External messages are printed in `Profile.print()`. """ self._messages.append(message)
def print(self, min_duration=0.001, code_col=80, code_len=50)
-
Prints this profile to the console.
Args
min_duration
- Hides elements with less time spent on backend calls than
min_duration
(seconds) code_col
- Formatting option for where the context code is printed.
code_len
- Formatting option for cropping the context code
Expand source code
def print(self, min_duration=1e-3, code_col=80, code_len=50): """ Prints this profile to the console. Args: min_duration: Hides elements with less time spent on backend calls than `min_duration` (seconds) code_col: Formatting option for where the context code is printed. code_len: Formatting option for cropping the context code """ print(f"Profile: {self.duration:.4f} seconds total. Skipping elements shorter than {1000 * min_duration:.2f} ms") if self._messages: print("External profiling:") for message in self._messages: print(f" {message}") print() self._root.print(min_duration=min_duration, code_col=code_col, code_len=code_len)
def retime(self)
-
To be used in
with
statements,with prof.retime(): ...
.Updates this profile by running the same operations again but without tracing. This gives a much better indication of the true timing. The code within the
with
block must perform the same operations as the code that created this profile.Warning: Internal caching may reduce the number of operations after the first time a function is called. To prevent this, run the function before profiling it, see
warmup
inprofile_function()
.Expand source code
@contextmanager def retime(self): """ To be used in `with` statements, `with prof.retime(): ...`. Updates this profile by running the same operations again but without tracing. This gives a much better indication of the true timing. The code within the `with` block must perform the same operations as the code that created this profile. *Warning:* Internal caching may reduce the number of operations after the first time a function is called. To prevent this, run the function before profiling it, see `warmup` in `profile_function()`. """ self._retime_index = 0 restore_data = _start_profiling(self, self._backends) try: yield None finally: _stop_profiling(self, *restore_data) assert self._retime_index == 0, f"Number of calls during retime did not match original profile, originally {len(self._backend_calls)}, now {self._retime_index}, " self._retime_index = -1
def save(self, json_file: str)
-
Saves this profile to disc using the trace event format described at https://docs.google.com/document/d/1CvAClvFfyA5R-PhYUmn5OOQtYMH4h6I0nSsKchNAySU/edit
This file can be viewed with external applications such as Google chrome.
Args
json_file
- filename
Expand source code
def save(self, json_file: str): """ Saves this profile to disc using the *trace event format* described at https://docs.google.com/document/d/1CvAClvFfyA5R-PhYUmn5OOQtYMH4h6I0nSsKchNAySU/edit This file can be viewed with external applications such as Google chrome. Args: json_file: filename """ data = [ {'name': "process_name", 'ph': 'M', 'pid': 0, 'tid': 0, "args": {"name": "0 Python calls"}}, {'name': "process_name", 'ph': 'M', 'pid': 1, 'tid': 1, "args": {"name": "1 Operations"}}, ] + [ {'name': "thread_name", 'ph': 'M', 'pid': 1, 'tid': i + 1, "args": {"name": backend.name}} for i, backend in enumerate(self._backends) ] if self._trace: if len(self._root._children) > 0: data.extend(self._root.trace_json_events()) else: data.extend(sum([call.trace_json_events(()) for call in self._backend_calls], [])) with open(json_file, 'w') as file: json.dump(data, file)
def save_trace(self, json_file: str)
-
Saves this profile to disc using the trace event format described at https://docs.google.com/document/d/1CvAClvFfyA5R-PhYUmn5OOQtYMH4h6I0nSsKchNAySU/edit
This file can be viewed with external applications such as Google chrome.
Args
json_file
- filename
Expand source code
def save(self, json_file: str): """ Saves this profile to disc using the *trace event format* described at https://docs.google.com/document/d/1CvAClvFfyA5R-PhYUmn5OOQtYMH4h6I0nSsKchNAySU/edit This file can be viewed with external applications such as Google chrome. Args: json_file: filename """ data = [ {'name': "process_name", 'ph': 'M', 'pid': 0, 'tid': 0, "args": {"name": "0 Python calls"}}, {'name': "process_name", 'ph': 'M', 'pid': 1, 'tid': 1, "args": {"name": "1 Operations"}}, ] + [ {'name': "thread_name", 'ph': 'M', 'pid': 1, 'tid': i + 1, "args": {"name": backend.name}} for i, backend in enumerate(self._backends) ] if self._trace: if len(self._root._children) > 0: data.extend(self._root.trace_json_events()) else: data.extend(sum([call.trace_json_events(()) for call in self._backend_calls], [])) with open(json_file, 'w') as file: json.dump(data, file)