Source code for pennylane.concurrency.executors.native.multiproc
# Copyright 2018-2025 Xanadu Quantum Technologies Inc.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
r"""
.. currentmodule:: pennylane.concurrency.executors.native.multiproc
This module provides abstractions around the Python ``multiprocessing`` library, with support for function execution using multiple processes.
"""
import inspect
from collections.abc import Callable, Sequence
from multiprocessing import get_context
from typing import Any, Optional
from ..base import ExecBackendConfig
from .api import PyNativeExec
[docs]
class MPPoolExec(PyNativeExec):
r"""
Python standard library executor class backed by ``multiprocessing.Pool``.
This executor wraps Python standard library `multiprocessing.Pool <https://docs.python.org/3/library/multiprocessing.html#module-multiprocessing.pool>`_ interface, and provides support for execution using multiple processes.
Args:
max_workers: the maximum number of concurrent units (processes) to use
persist: allow the executor backend to persist between executions. True avoids
potentially costly set-up and tear-down, where supported.
Explicit calls to ``shutdown`` will set this to False.
**kwargs: Keyword arguments to pass-through to the executor backend.
"""
@classmethod
def _exec_backend(cls):
return get_context("spawn").Pool
def __init__(self, max_workers: Optional[int] = None, persist: bool = False, **kwargs):
super().__init__(max_workers=max_workers, persist=persist, **kwargs)
self._cfg = ExecBackendConfig(
submit_fn="apply",
map_fn="map",
starmap_fn="starmap",
shutdown_fn="close",
submit_unpack=False,
map_unpack=False,
blocking=True,
)
[docs]
def map(self, fn: Callable, *args: Sequence[Any], **kwargs):
if len(inspect.signature(fn).parameters) > 1: # pragma: no cover
try:
# attempt offloading to starmap
return self.starmap(fn, zip(*args), **kwargs)
except Exception as e:
raise ValueError(
"Python's `multiprocessing.Pool` does not support `map` calls with multiple arguments. "
"Consider a different backend, or use `starmap` instead."
) from e
return super().map(fn, *args, **kwargs)
_modules/pennylane/concurrency/executors/native/multiproc
Download Python script
Download Notebook
View on GitHub