Source code for pennylane.concurrency.executors.external.mpi
# Copyright 2018-2025 Xanadu Quantum Technologies Inc.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
r"""
.. currentmodule:: pennylane.concurrency.executors.external.mpi
Contains concurrent executor abstractions for task-based workloads backed by mpi4py.
"""
from collections.abc import Callable, Sequence
from typing import Any, Optional
from ..base import ExecBackendConfig, ExtExec
# pylint: disable=import-outside-toplevel
[docs]
class MPIPoolExec(ExtExec): # pragma: no cover
r"""
MPIPoolExecutor abstraction class executor.
This executor wraps the `mpi4py.futures.MPIPoolExecutor <https://mpi4py.readthedocs.io/en/stable/mpi4py.futures.html#mpipoolexecutor>`_ class, and provides support for execution using multiple processes launched using MPI.
For an example script ``my_script.py``, and an installed mpi4py library with the active MPI environment, the executor can be used as follows:
.. code-block:: console
$ mpirun -n 4 -m mpi4py.futures my_script.py
See `mpi4py.futures - Command line <https://mpi4py.readthedocs.io/en/stable/mpi4py.futures.html#command-line>`_ for additional details on launching jobs.
.. note::
All calls to the executor are synchronous, and do not currently support the use of futures as a return object.
Args:
*args: non keyword arguments to pass through to the executor backend.
**kwargs: keyword arguments to pass through to the executor backend.
"""
def __init__(self, max_workers: Optional[int] = None, persist: bool = False, **kwargs):
if persist:
raise RuntimeError("The MPIPoolExec backend does not currently support persistence.")
super().__init__(max_workers=max_workers, **kwargs)
# Imports will initialise the MPI environment.
# Handle set-up upon object creation only.
from mpi4py import MPI
self._size = max_workers
self._comm = MPI.COMM_WORLD
self._cfg = ExecBackendConfig(
submit_fn="submit",
map_fn="map",
starmap_fn="starmap",
shutdown_fn="shutdown",
submit_unpack=True,
map_unpack=True,
blocking=True,
)
def __call__(self, dispatch: str, fn: Callable, *args, **kwargs):
r"""
dispatch: the named method to pass the function parameters
fn: the callable function to run on the executor backend
args: the arguments to pass to ``fn``
kwargs: the keyword arguments to pass to ``fn``
"""
kwargs.update({"use_pkl5": True})
return super().__call__(
dispatch,
fn,
*args,
**kwargs,
)
@property
def size(self):
return self._size
[docs]
def submit(self, fn: Callable, *args, **kwargs):
with self._exec_backend()(max_workers=self.size, use_pkl5=True) as executor:
output_f = executor.submit(fn, *args, **kwargs)
return output_f.result()
[docs]
def map(self, fn: Callable, *args: Sequence[Any], **kwargs):
chunksize = max(len(args) // self._size, 1)
with self._exec_backend()(max_workers=self.size, use_pkl5=True) as executor:
output_f = executor.map(fn, *args, chunksize=chunksize, **kwargs)
return list(output_f)
[docs]
def starmap(self, fn: Callable, args: Sequence[tuple], **kwargs):
chunksize = max(len(args) // self._size, 1)
with self._exec_backend()(max_workers=self.size, use_pkl5=True) as executor:
output_f = executor.starmap(fn, args, chunksize=chunksize, **kwargs)
return list(output_f)
def __del__(self):
self.shutdown()
@classmethod
def _exec_backend(cls):
from mpi4py.futures import MPIPoolExecutor
return MPIPoolExecutor
# pylint: disable=import-outside-toplevel
[docs]
class MPICommExec(ExtExec): # pragma: no cover
r"""
MPICommExecutor abstraction class functor. To be used if dynamic process spawning
required by MPIPoolExec is unsupported by the MPI implementation.
This executor wraps the `mpi4py.futures.MPICommExecutor <https://mpi4py.readthedocs.io/en/stable/mpi4py.futures.html#mpicommexecutor>`_ class, and provides support for execution using multiple processes launched using MPI.
For an example script ``my_script.py``, and an installed mpi4py library with the active MPI environment, the executor can be used as follows:
.. code-block:: console
$ mpirun -n 4 -m mpi4py.futures my_script.py
See `mpi4py.futures - Command line <https://mpi4py.readthedocs.io/en/stable/mpi4py.futures.html#command-line>`_ for additional details on launching jobs.
.. note::
All calls to the executor are synchronous, and do not currently support the use of futures as a return object.
"""
def __init__(self, max_workers=None, persist: bool = False, **kwargs):
if persist:
raise RuntimeError("The MPIPoolExec backend does not currently support persistence.")
super().__init__(max_workers=max_workers, **kwargs)
from mpi4py import MPI # Required to call MPI_Init
self._comm = MPI.COMM_WORLD
self._size = MPI.COMM_WORLD.Get_size()
self._cfg = ExecBackendConfig(
submit_fn="submit",
map_fn="map",
starmap_fn="starmap",
shutdown_fn="shutdown",
submit_unpack=True,
map_unpack=True,
blocking=False,
)
def __call__(self, dispatch: str, fn: Callable, *args, **kwargs):
r"""
dispatch: the named method to pass the function parameters
fn: the callable function to run on the executor backend
args: the arguments to pass to ``fn``
kwargs: the keyword arguments to pass to ``fn``
"""
kwargs.update({"use_pkl5": True})
return super().__call__(
dispatch,
fn,
*args,
**kwargs,
)
[docs]
def submit(self, fn: Callable, *args, **kwargs):
with self._exec_backend()(max_workers=self.size, use_pkl5=True) as executor:
output_f = executor.submit(fn, *args, **kwargs)
return output_f.result()
[docs]
def map(self, fn: Callable, *args: Sequence[Any], **kwargs):
chunksize = max(len(args) // self._size, 1)
with self._exec_backend()(max_workers=self.size, use_pkl5=True) as executor:
output_f = executor.map(fn, *args, chunksize=chunksize, **kwargs)
return list(output_f)
[docs]
def starmap(self, fn: Callable, args: Sequence[tuple], **kwargs):
chunksize = max(len(args) // self._size, 1)
with self._exec_backend()(max_workers=self.size, use_pkl5=True) as executor:
output_f = executor.starmap(fn, args, chunksize=chunksize, **kwargs)
return list(output_f)
@property
def size(self):
return self._size
def __del__(self):
self.shutdown()
@classmethod
def _exec_backend(cls):
from mpi4py.futures import MPICommExecutor
return MPICommExecutor
_modules/pennylane/concurrency/executors/external/mpi
Download Python script
Download Notebook
View on GitHub