Parallel Computation in ΦML¶

Colab   •   🌐 ΦML   •   📖 Documentation   •   🔗 API   •   ▶ Videos   •   Examples

ΦML offers a powerful, flexible and easy-to-use framework for parallel computation based on dataclasses.

Prior knowledge of Python's dataclasses module and @cached_property is recommended. If you are not familiar with dataclasses, please refer to the official documentation or the Dataclasses tutorial on Real Python. For @cached_property, please refer to the Cached Properties documentation.

In [1]:
%%capture
!pip install phiml

Parallelization with @parallel_property and parallel_compute¶

Parallelization is always performed over one or multiple ΦML dims. Take the following example:

In [2]:
from phiml.math import Tensor, tensor, map

def expensive_to_compute(x: float) -> float:
    import time
    time.sleep(1)  # Simulate an expensive computation
    return 2 * x

data = tensor([1.0, 2.0, 3.0], "data:b")

We have an expensive computation which we want to perform on each element of the data tensor. Without parallelization, this would look like this:

In [3]:
map(expensive_to_compute, data)
Out[3]:
(2.000, 4.000, 6.000) along dataᵇ float64

Let's first refactor this computation into a dataclass, expressing the computation as a @cached_property:

In [4]:
from dataclasses import dataclass
from functools import cached_property

@dataclass(frozen=True)
class SequentialComputation:
    data: Tensor

    @cached_property
    def result(self) -> Tensor:
        return map(expensive_to_compute, self.data)

SequentialComputation(data).result
Out[4]:
(2.000, 4.000, 6.000) along dataᵇ float64

The dataclass offers a convenient way of storing the result for future use. Now let's parallelize it!

In [5]:
from typing import Union
from phiml.dataclasses import parallel_property, parallel_compute

@dataclass(frozen=True)
class ParallelComputation:
    data: Tensor

    @parallel_property
    def result(self) -> Union[Tensor, float]:
        return expensive_to_compute(float(self.data))

As you can see, we have replaced @cached_property with @parallel_property. The property will now be evaluated on each element of data individually and in parallel. We can trigger the parallel computation using the parallel_compute function.

Note that multiprocessing is used under the hood, so we need to protect the entry point of the program using if __name__ == "__main__":. To avoid issues with Jupyter notebooks, we recommend declaring the worker code in a separate module outside the notebook.

In [6]:
from parallel_compute import ParallelComputation  # identical to above cell, but can be imported by workers

if __name__ == "__main__":
    computation = ParallelComputation(data)
    parallel_compute(computation, [ParallelComputation.result], max_workers=3)
    print(computation.result)
(2.000, 4.000, 6.000) along dataᵇ float64

Under the hood, ΦML unstacked the data tensor and distributed the computation of result over 3 worker processes. The results were then stacked back together into a single tensor.

Property Dependencies¶

In more complex situations, the property to evaluate may depend on other properties. ΦML will automatically resolve the dependencies and compute them in the correct order.

In [7]:
@dataclass(frozen=True)
class ParallelDepComputation:
    data: Tensor

    @cached_property
    def tmp_result(self) -> Union[Tensor, float]:
        return expensive_to_compute(float(self.data))

    @cached_property
    def result(self) -> Union[Tensor, float]:
        return self.tmp_result + 1

Note that we can use @cached_property for instead of @parallel_property for all properties. The only difference is that @parallel_property offers additional customization options and does not prevent accidental evaluation outside of parallel_compute.

In [8]:
from parallel_compute import ParallelDepComputation  # identical to above cell, but can be imported by workers
if __name__ == "__main__":
    computation = ParallelDepComputation(data)
    parallel_compute(computation, [ParallelDepComputation.result], max_workers=3)
    print(computation.result)
(3.000, 5.000, 7.000) along dataᵇ float64

The workers automatically computed tmp_result first, as it is a dependency of result. Note however, that tmp_result is not stored in the main process, as it was not requested.

In [9]:
computation.__dict__
Out[9]:
{'data': (1.000, 2.000, 3.000) along dataᵇ,
 'result': (3.000, 5.000, 7.000) along dataᵇ float64}

Non-parallelizable Properties¶

Some properties may not be parallelizable, e.g., because they access values across the parallelization dim. We can mark required dims in the @parallel_property decorator to prevent ΦML from attempting to parallelize them.

Say we want to compute the mean of expensive_computation over all elements of data:

In [10]:
import os
from phiml import mean, batch

@dataclass(frozen=True)
class ParallelMeanComputation:
    data: Tensor

    @parallel_property
    def individual_result(self) -> Union[Tensor, float]:
        print(f"Computing individual_result pid={os.getpid()}")
        return expensive_to_compute(float(self.data))

    @parallel_property(requires=batch)
    def mean(self) -> Union[Tensor, float]:
        print(f"Computing mean, pid={os.getpid()}")
        return mean(self.individual_result, batch)
In [11]:
from parallel_compute import ParallelMeanComputation  # identical to above cell, but can be imported by workers
if __name__ == "__main__":
    print(f"Host pid={os.getpid()}")
    computation = ParallelMeanComputation(data)
    parallel_compute(computation, [ParallelMeanComputation.mean], max_workers=3)
    print(computation.mean)
Host pid=3383
Computing individual_result pid=3444Computing individual_result pid=3446Computing individual_result pid=3445


Computing mean, pid=3383
float64 4.0

The computation is now split into two stages: First, individual_result is computed in parallel on each element of data. Then, the results are gathered in the host process and the mean is computed over all elements. This is achieved by building a dependency graph of all involved properties under-the-hood and determining the optimal execution strategy.

Note that the notebook output only captures the print statements from the main process. You can see the print statements from the worker processes when running the code in a script.

As a final example, let's look at the three-stage computation of computing the normalized (mean-subtracted) result. Here, both individual_result and normalized_result can be parallelized, while mean cannot.

In [12]:
@dataclass(frozen=True)
class ParallelNormComputation:
    data: Tensor

    @parallel_property
    def individual_result(self) -> Union[Tensor, float]:
        print(f"Computing individual_result pid={os.getpid()}")
        return expensive_to_compute(float(self.data))

    @parallel_property(requires=batch)
    def mean(self) -> Union[Tensor, float]:
        print(f"Computing mean, pid={os.getpid()}")
        return mean(self.individual_result, batch)

    @parallel_property
    def normalized_result(self) -> Union[Tensor, float]:
        print(f"Computing normalized_result pid={os.getpid()}")
        return self.individual_result - self.mean
In [13]:
from parallel_compute import ParallelNormComputation
if __name__ == "__main__":
    print(f"Host pid={os.getpid()}")
    computation = ParallelNormComputation(data)
    parallel_compute(computation, [ParallelNormComputation.normalized_result], max_workers=3)
    print(computation.normalized_result)
Host pid=3383
Computing individual_result pid=3462Computing individual_result pid=3461Computing individual_result pid=3463


Computing normalized_result pid=3462Computing normalized_result pid=3461Computing normalized_result pid=3463


Computing mean, pid=3383
(-2.000, 0.000, 2.000) along dataᵇ

Caching Properties on Disk¶

For large data, it may be beneficial to cache intermediate results on disk instead of in memory. This can be achieved by setting the memory_limit and cache_dir arguments of parallel_execute. Storing intermediate results on disk also reduces the transferred data between the main process and the workers, which can be a bottleneck for large data. See this example.