Parallel Computation in ΦML¶

Colab   •   🌐 ΦML   •   📖 Documentation   •   🔗 API   •   ▶ Videos   •   Examples

ΦML offers a powerful, flexible and easy-to-use framework for parallel computation based on dataclasses.

Prior knowledge of Python's dataclasses module and @cached_property is recommended. If you are not familiar with dataclasses, please refer to the official documentation or the Dataclasses tutorial on Real Python. For @cached_property, please refer to the Cached Properties documentation.

In [1]:
%%capture
!pip install phiml

Parallelization with @parallel_property and parallel_compute¶

Parallelization is always performed over one or multiple ΦML dims. Take the following example:

In [2]:
from phiml.math import Tensor, tensor, map

def expensive_to_compute(x: float) -> float:
    import time
    time.sleep(1)  # Simulate an expensive computation
    return 2 * x

data = tensor([1.0, 2.0, 3.0], "data:b")

We have an expensive computation which we want to perform on each element of the data tensor. Without parallelization, this would look like this:

In [3]:
map(expensive_to_compute, data)
Out[3]:
(2.000, 4.000, 6.000) along dataᵇ

Let's first refactor this computation into a dataclass, expressing the computation as a @cached_property:

In [4]:
from dataclasses import dataclass
from functools import cached_property

@dataclass(frozen=True)
class SequentialComputation:
    data: Tensor

    @cached_property
    def result(self) -> Tensor:
        return map(expensive_to_compute, self.data)

SequentialComputation(data).result
Out[4]:
(2.000, 4.000, 6.000) along dataᵇ

The dataclass offers a convenient way of storing the result for future use. Now let's parallelize it!

In [5]:
from typing import Union
from phiml.dataclasses import parallel_property, parallel_compute

@dataclass(frozen=True)
class ParallelComputation:
    data: Tensor

    @parallel_property
    def result(self) -> Union[Tensor, float]:
        return expensive_to_compute(float(self.data))
---------------------------------------------------------------------------
ImportError                               Traceback (most recent call last)
Cell In[5], line 2
      1 from typing import Union
----> 2 from phiml.dataclasses import parallel_property, parallel_compute
      3 
      4 @dataclass(frozen=True)
      5 class ParallelComputation:

ImportError: cannot import name 'parallel_property' from 'phiml.dataclasses' (/opt/hostedtoolcache/Python/3.11.15/x64/lib/python3.11/site-packages/phiml/dataclasses/__init__.py)

As you can see, we have replaced @cached_property with @parallel_property. The property will now be evaluated on each element of data individually and in parallel. We can trigger the parallel computation using the parallel_compute function.

Note that multiprocessing is used under the hood, so we need to protect the entry point of the program using if __name__ == "__main__":. To avoid issues with Jupyter notebooks, we recommend declaring the worker code in a separate module outside the notebook.

In [6]:
from parallel_compute import ParallelComputation  # identical to above cell, but can be imported by workers

if __name__ == "__main__":
    computation = ParallelComputation(data)
    parallel_compute(computation, [ParallelComputation.result], max_workers=3)
    print(computation.result)
---------------------------------------------------------------------------
ImportError                               Traceback (most recent call last)
Cell In[6], line 1
----> 1 from parallel_compute import ParallelComputation  # identical to above cell, but can be imported by workers
      2 
      3 if __name__ == "__main__":
      4     computation = ParallelComputation(data)

File ~/work/PhiML/PhiML/docs/parallel_compute.py:7
      4 from typing import Union
      6 from phiml import Tensor, tensor, mean, batch
----> 7 from phiml.dataclasses import parallel_property
     10 def expensive_to_compute(x: float) -> float:
     11     import time

ImportError: cannot import name 'parallel_property' from 'phiml.dataclasses' (/opt/hostedtoolcache/Python/3.11.15/x64/lib/python3.11/site-packages/phiml/dataclasses/__init__.py)

Under the hood, ΦML unstacked the data tensor and distributed the computation of result over 3 worker processes. The results were then stacked back together into a single tensor.

Property Dependencies¶

In more complex situations, the property to evaluate may depend on other properties. ΦML will automatically resolve the dependencies and compute them in the correct order.

In [7]:
@dataclass(frozen=True)
class ParallelDepComputation:
    data: Tensor

    @cached_property
    def tmp_result(self) -> Union[Tensor, float]:
        return expensive_to_compute(float(self.data))

    @cached_property
    def result(self) -> Union[Tensor, float]:
        return self.tmp_result + 1

Note that we can use @cached_property for instead of @parallel_property for all properties. The only difference is that @parallel_property offers additional customization options and does not prevent accidental evaluation outside of parallel_compute.

In [8]:
from parallel_compute import ParallelDepComputation  # identical to above cell, but can be imported by workers
if __name__ == "__main__":
    computation = ParallelDepComputation(data)
    parallel_compute(computation, [ParallelDepComputation.result], max_workers=3)
    print(computation.result)
---------------------------------------------------------------------------
ImportError                               Traceback (most recent call last)
Cell In[8], line 1
----> 1 from parallel_compute import ParallelDepComputation  # identical to above cell, but can be imported by workers
      2 if __name__ == "__main__":
      3     computation = ParallelDepComputation(data)
      4     parallel_compute(computation, [ParallelDepComputation.result], max_workers=3)

File ~/work/PhiML/PhiML/docs/parallel_compute.py:7
      4 from typing import Union
      6 from phiml import Tensor, tensor, mean, batch
----> 7 from phiml.dataclasses import parallel_property
     10 def expensive_to_compute(x: float) -> float:
     11     import time

ImportError: cannot import name 'parallel_property' from 'phiml.dataclasses' (/opt/hostedtoolcache/Python/3.11.15/x64/lib/python3.11/site-packages/phiml/dataclasses/__init__.py)

The workers automatically computed tmp_result first, as it is a dependency of result. Note however, that tmp_result is not stored in the main process, as it was not requested.

In [9]:
computation.__dict__
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[9], line 1
----> 1 computation.__dict__

NameError: name 'computation' is not defined

Non-parallelizable Properties¶

Some properties may not be parallelizable, e.g., because they access values across the parallelization dim. We can mark required dims in the @parallel_property decorator to prevent ΦML from attempting to parallelize them.

Say we want to compute the mean of expensive_computation over all elements of data:

In [10]:
import os
from phiml import mean, batch

@dataclass(frozen=True)
class ParallelMeanComputation:
    data: Tensor

    @parallel_property
    def individual_result(self) -> Union[Tensor, float]:
        print(f"Computing individual_result pid={os.getpid()}")
        return expensive_to_compute(float(self.data))

    @parallel_property(requires=batch)
    def mean(self) -> Union[Tensor, float]:
        print(f"Computing mean, pid={os.getpid()}")
        return mean(self.individual_result, batch)
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[10], line 5
      1 import os
      2 from phiml import mean, batch
      3 
      4 @dataclass(frozen=True)
----> 5 class ParallelMeanComputation:
      6     data: Tensor
      7 
      8     @parallel_property

Cell In[10], line 8, in ParallelMeanComputation()
      4 @dataclass(frozen=True)
      5 class ParallelMeanComputation:
      6     data: Tensor
      7 
----> 8     @parallel_property
      9     def individual_result(self) -> Union[Tensor, float]:
     10         print(f"Computing individual_result pid={os.getpid()}")
     11         return expensive_to_compute(float(self.data))

NameError: name 'parallel_property' is not defined
In [11]:
from parallel_compute import ParallelMeanComputation  # identical to above cell, but can be imported by workers
if __name__ == "__main__":
    print(f"Host pid={os.getpid()}")
    computation = ParallelMeanComputation(data)
    parallel_compute(computation, [ParallelMeanComputation.mean], max_workers=3)
    print(computation.mean)
---------------------------------------------------------------------------
ImportError                               Traceback (most recent call last)
Cell In[11], line 1
----> 1 from parallel_compute import ParallelMeanComputation  # identical to above cell, but can be imported by workers
      2 if __name__ == "__main__":
      3     print(f"Host pid={os.getpid()}")
      4     computation = ParallelMeanComputation(data)

File ~/work/PhiML/PhiML/docs/parallel_compute.py:7
      4 from typing import Union
      6 from phiml import Tensor, tensor, mean, batch
----> 7 from phiml.dataclasses import parallel_property
     10 def expensive_to_compute(x: float) -> float:
     11     import time

ImportError: cannot import name 'parallel_property' from 'phiml.dataclasses' (/opt/hostedtoolcache/Python/3.11.15/x64/lib/python3.11/site-packages/phiml/dataclasses/__init__.py)

The computation is now split into two stages: First, individual_result is computed in parallel on each element of data. Then, the results are gathered in the host process and the mean is computed over all elements. This is achieved by building a dependency graph of all involved properties under-the-hood and determining the optimal execution strategy.

Note that the notebook output only captures the print statements from the main process. You can see the print statements from the worker processes when running the code in a script.

As a final example, let's look at the three-stage computation of computing the normalized (mean-subtracted) result. Here, both individual_result and normalized_result can be parallelized, while mean cannot.

In [12]:
@dataclass(frozen=True)
class ParallelNormComputation:
    data: Tensor

    @parallel_property
    def individual_result(self) -> Union[Tensor, float]:
        print(f"Computing individual_result pid={os.getpid()}")
        return expensive_to_compute(float(self.data))

    @parallel_property(requires=batch)
    def mean(self) -> Union[Tensor, float]:
        print(f"Computing mean, pid={os.getpid()}")
        return mean(self.individual_result, batch)

    @parallel_property
    def normalized_result(self) -> Union[Tensor, float]:
        print(f"Computing normalized_result pid={os.getpid()}")
        return self.individual_result - self.mean
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[12], line 2
      1 @dataclass(frozen=True)
----> 2 class ParallelNormComputation:
      3     data: Tensor
      4 
      5     @parallel_property

Cell In[12], line 5, in ParallelNormComputation()
      1 @dataclass(frozen=True)
      2 class ParallelNormComputation:
      3     data: Tensor
      4 
----> 5     @parallel_property
      6     def individual_result(self) -> Union[Tensor, float]:
      7         print(f"Computing individual_result pid={os.getpid()}")
      8         return expensive_to_compute(float(self.data))

NameError: name 'parallel_property' is not defined
In [13]:
from parallel_compute import ParallelNormComputation
if __name__ == "__main__":
    print(f"Host pid={os.getpid()}")
    computation = ParallelNormComputation(data)
    parallel_compute(computation, [ParallelNormComputation.normalized_result], max_workers=3)
    print(computation.normalized_result)
---------------------------------------------------------------------------
ImportError                               Traceback (most recent call last)
Cell In[13], line 1
----> 1 from parallel_compute import ParallelNormComputation
      2 if __name__ == "__main__":
      3     print(f"Host pid={os.getpid()}")
      4     computation = ParallelNormComputation(data)

File ~/work/PhiML/PhiML/docs/parallel_compute.py:7
      4 from typing import Union
      6 from phiml import Tensor, tensor, mean, batch
----> 7 from phiml.dataclasses import parallel_property
     10 def expensive_to_compute(x: float) -> float:
     11     import time

ImportError: cannot import name 'parallel_property' from 'phiml.dataclasses' (/opt/hostedtoolcache/Python/3.11.15/x64/lib/python3.11/site-packages/phiml/dataclasses/__init__.py)

Caching Properties on Disk¶

For large data, it may be beneficial to cache intermediate results on disk instead of in memory. This can be achieved by setting the memory_limit and cache_dir arguments of parallel_execute. Storing intermediate results on disk also reduces the transferred data between the main process and the workers, which can be a bottleneck for large data. See this example.