Source code for pennylane_lightning.lightning_gpu.lightning_gpu
# Copyright 2022-2023 Xanadu Quantum Technologies Inc.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
r"""
This module contains the :class:`~.LightningGPU` class, a PennyLane simulator device that
interfaces with the NVIDIA cuQuantum cuStateVec simulator library for GPU-enabled calculations.
"""
from ctypes.util import find_library
from importlib import util as imp_util
from itertools import product
from pathlib import Path
from typing import List, Union
from warnings import warn
import numpy as np
import pennylane as qml
from pennylane import BasisState, DeviceError, QuantumFunctionError, Rot, StatePrep, math
from pennylane.measurements import Expectation, State
from pennylane.ops.op_math import Adjoint
from pennylane.wires import Wires
from pennylane_lightning.core._serialize import QuantumScriptSerializer, global_phase_diagonal
from pennylane_lightning.core._version import __version__
# pylint: disable=import-error, no-name-in-module, ungrouped-imports
from pennylane_lightning.core.lightning_base import LightningBase
try:
from pennylane_lightning.lightning_gpu_ops import (
DevPool,
MeasurementsC64,
MeasurementsC128,
StateVectorC64,
StateVectorC128,
backend_info,
get_gpu_arch,
is_gpu_supported,
)
from pennylane_lightning.lightning_gpu_ops.algorithms import (
AdjointJacobianC64,
AdjointJacobianC128,
create_ops_listC64,
create_ops_listC128,
)
try:
# pylint: disable=no-name-in-module
from pennylane_lightning.lightning_gpu_ops import (
DevTag,
MeasurementsMPIC64,
MeasurementsMPIC128,
MPIManager,
StateVectorMPIC64,
StateVectorMPIC128,
)
from pennylane_lightning.lightning_gpu_ops.algorithmsMPI import (
AdjointJacobianMPIC64,
AdjointJacobianMPIC128,
create_ops_listMPIC64,
create_ops_listMPIC128,
)
MPI_SUPPORT = True
except ImportError:
MPI_SUPPORT = False
if find_library("custatevec") is None and not imp_util.find_spec(
"cuquantum"
): # pragma: no cover
raise ImportError(
"custatevec libraries not found. Please pip install the appropriate custatevec library in a virtual environment."
)
if not DevPool.getTotalDevices(): # pragma: no cover
raise ValueError("No supported CUDA-capable device found")
if not is_gpu_supported(): # pragma: no cover
raise ValueError(f"CUDA device is an unsupported version: {get_gpu_arch()}")
LGPU_CPP_BINARY_AVAILABLE = True
except (ImportError, ValueError) as e:
warn(str(e), UserWarning)
backend_info = None
LGPU_CPP_BINARY_AVAILABLE = False
def _gpu_dtype(dtype, mpi=False):
if dtype not in [np.complex128, np.complex64]: # pragma: no cover
raise ValueError(f"Data type is not supported for state-vector computation: {dtype}")
if mpi:
return StateVectorMPIC128 if dtype == np.complex128 else StateVectorMPIC64
return StateVectorC128 if dtype == np.complex128 else StateVectorC64
def _adj_dtype(use_csingle, mpi=False):
if mpi:
return AdjointJacobianMPIC64 if use_csingle else AdjointJacobianMPIC128
return AdjointJacobianC64 if use_csingle else AdjointJacobianC128
def _mebibytesToBytes(mebibytes):
return mebibytes * 1024 * 1024
allowed_operations = {
"Identity",
"BasisState",
"QubitStateVector",
"StatePrep",
"QubitUnitary",
"ControlledQubitUnitary",
"MultiControlledX",
"DiagonalQubitUnitary",
"PauliX",
"PauliY",
"PauliZ",
"MultiRZ",
"GlobalPhase",
"C(GlobalPhase)",
"Hadamard",
"S",
"Adjoint(S)",
"T",
"Adjoint(T)",
"SX",
"Adjoint(SX)",
"CNOT",
"SWAP",
"ISWAP",
"PSWAP",
"Adjoint(ISWAP)",
"SISWAP",
"Adjoint(SISWAP)",
"SQISW",
"CSWAP",
"Toffoli",
"CY",
"CZ",
"PhaseShift",
"ControlledPhaseShift",
"RX",
"RY",
"RZ",
"Rot",
"CRX",
"CRY",
"CRZ",
"CRot",
"IsingXX",
"IsingYY",
"IsingZZ",
"IsingXY",
"SingleExcitation",
"SingleExcitationPlus",
"SingleExcitationMinus",
"DoubleExcitation",
"DoubleExcitationPlus",
"DoubleExcitationMinus",
"QubitCarry",
"QubitSum",
"OrbitalRotation",
"QFT",
"ECR",
"BlockEncode",
}
allowed_observables = {
"PauliX",
"PauliY",
"PauliZ",
"Hadamard",
"SparseHamiltonian",
"Hamiltonian",
"LinearCombination",
"Hermitian",
"Identity",
"Sum",
"Prod",
"SProd",
}
gate_cache_needs_hash = (
qml.BlockEncode,
qml.ControlledQubitUnitary,
qml.DiagonalQubitUnitary,
qml.MultiControlledX,
qml.OrbitalRotation,
qml.PSWAP,
qml.QubitUnitary,
)
[docs]class LightningGPU(LightningBase): # pylint: disable=too-many-instance-attributes
"""PennyLane Lightning GPU device.
A GPU-backed Lightning device using NVIDIA cuQuantum SDK.
Use of this device requires pre-built binaries or compilation from source. Check out the
:doc:`/lightning_gpu/installation` guide for more details.
Args:
wires (int): the number of wires to initialize the device with
mpi (bool): enable MPI support. MPI support will be enabled if ``mpi`` is set as``True``.
mpi_buf_size (int): size of GPU memory (in MiB) set for MPI operation and its default value is 64 MiB.
sync (bool): immediately sync with host-sv after applying operations
c_dtype: Datatypes for statevector representation. Must be one of ``np.complex64`` or ``np.complex128``.
shots (int): How many times the circuit should be evaluated (or sampled) to estimate
the expectation values. Defaults to ``None`` if not specified. Setting
to ``None`` results in computing statistics like expectation values and
variances analytically.
batch_obs (Union[bool, int]): determine whether to use multiple GPUs within the same node or not
"""
name = "Lightning GPU PennyLane plugin"
short_name = "lightning.gpu"
operations = allowed_operations
observables = allowed_observables
_backend_info = backend_info
config = Path(__file__).parent / "lightning_gpu.toml"
_CPP_BINARY_AVAILABLE = LGPU_CPP_BINARY_AVAILABLE
def __init__(
self,
wires,
*,
mpi: bool = False,
mpi_buf_size: int = 0,
sync=False,
c_dtype=np.complex128,
shots=None,
batch_obs: Union[bool, int] = False,
): # pylint: disable=too-many-arguments
if c_dtype is np.complex64:
self.use_csingle = True
elif c_dtype is np.complex128:
self.use_csingle = False
else:
raise TypeError(f"Unsupported complex type: {c_dtype}")
super().__init__(wires, shots=shots, c_dtype=c_dtype)
self._dp = DevPool()
if not mpi:
self._mpi = False
self._num_local_wires = self.num_wires
self._gpu_state = _gpu_dtype(c_dtype)(self._num_local_wires)
else:
self._mpi = True
self._mpi_init_helper(self.num_wires)
if mpi_buf_size < 0:
raise TypeError(f"Unsupported mpi_buf_size value: {mpi_buf_size}")
if mpi_buf_size:
if mpi_buf_size & (mpi_buf_size - 1):
raise TypeError(
f"Unsupported mpi_buf_size value: {mpi_buf_size}. mpi_buf_size should be power of 2."
)
# Memory size in bytes
sv_memsize = np.dtype(c_dtype).itemsize * (1 << self._num_local_wires)
if _mebibytesToBytes(mpi_buf_size) > sv_memsize:
w_msg = "The MPI buffer size is larger than the local state vector size."
warn(
w_msg,
RuntimeWarning,
)
self._gpu_state = _gpu_dtype(c_dtype, mpi)(
self._mpi_manager,
self._devtag,
mpi_buf_size,
self._num_global_wires,
self._num_local_wires,
)
self._sync = sync
self._batch_obs = batch_obs
self._create_basis_state(0)
def _mpi_init_helper(self, num_wires):
"""Set up MPI checks."""
if not MPI_SUPPORT:
raise ImportError("MPI related APIs are not found.")
# initialize MPIManager and config check in the MPIManager ctor
self._mpi_manager = MPIManager()
# check if number of GPUs per node is larger than
# number of processes per node
numDevices = self._dp.getTotalDevices()
numProcsNode = self._mpi_manager.getSizeNode()
if numDevices < numProcsNode:
raise ValueError(
"Number of devices should be larger than or equal to the number of processes on each node."
)
# check if the process number is larger than number of statevector elements
if self._mpi_manager.getSize() > (1 << (num_wires - 1)):
raise ValueError(
"Number of processes should be smaller than the number of statevector elements."
)
# set the number of global and local wires
commSize = self._mpi_manager.getSize()
self._num_global_wires = commSize.bit_length() - 1
self._num_local_wires = num_wires - self._num_global_wires
# set GPU device
rank = self._mpi_manager.getRank()
deviceid = rank % numProcsNode
self._dp.setDeviceID(deviceid)
self._devtag = DevTag(deviceid)
@staticmethod
def _asarray(arr, dtype=None):
arr = np.asarray(arr) # arr is not copied
if arr.dtype.kind not in ["f", "c"]:
return arr
if not dtype:
dtype = arr.dtype
return arr
# pylint disable=missing-function-docstring
[docs] def reset(self):
"""Reset the device"""
super().reset()
# init the state vector to |00..0>
self._gpu_state.resetGPU(False) # Sync reset
@property
def state(self):
# pylint disable=missing-function-docstring
"""Copy the state vector data from the device to the host.
A state vector Numpy array is explicitly allocated on the host to store and return the data.
**Example**
>>> dev = qml.device('lightning.gpu', wires=1)
>>> dev.apply([qml.PauliX(wires=[0])])
>>> print(dev.state)
[0.+0.j 1.+0.j]
"""
state = np.zeros(1 << self._num_local_wires, dtype=self.C_DTYPE)
state = self._asarray(state, dtype=self.C_DTYPE)
self.syncD2H(state)
return state
@property
def create_ops_list(self):
"""Returns create_ops_list function of the matching precision."""
if self._mpi:
return create_ops_listMPIC64 if self.use_csingle else create_ops_listMPIC128
return create_ops_listC64 if self.use_csingle else create_ops_listC128
@property
def measurements(self):
"""Returns Measurements constructor of the matching precision."""
if self._mpi:
return (
MeasurementsMPIC64(self._gpu_state)
if self.use_csingle
else MeasurementsMPIC128(self._gpu_state)
)
return (
MeasurementsC64(self._gpu_state)
if self.use_csingle
else MeasurementsC128(self._gpu_state)
)
[docs] def syncD2H(self, state_vector, use_async=False):
"""Copy the state vector data on device to a state vector on the host provided by the user
Args:
state_vector(array[complex]): the state vector array on host
use_async(bool): indicates whether to use asynchronous memory copy from host to device or not.
Note: This function only supports synchronized memory copy.
**Example**
>>> dev = qml.device('lightning.gpu', wires=1)
>>> dev.apply([qml.PauliX(wires=[0])])
>>> state_vector = np.zeros(2**dev.num_wires).astype(dev.C_DTYPE)
>>> dev.syncD2H(state_vector)
>>> print(state_vector)
[0.+0.j 1.+0.j]
"""
self._gpu_state.DeviceToHost(state_vector.ravel(order="C"), use_async)
[docs] def syncH2D(self, state_vector, use_async=False):
"""Copy the state vector data on host provided by the user to the state vector on the device
Args:
state_vector(array[complex]): the state vector array on host.
use_async(bool): indicates whether to use asynchronous memory copy from host to device or not.
Note: This function only supports synchronized memory copy.
**Example**
>>> dev = qml.device('lightning.gpu', wires=3)
>>> obs = qml.Identity(0) @ qml.PauliX(1) @ qml.PauliY(2)
>>> obs1 = qml.Identity(1)
>>> H = qml.Hamiltonian([1.0, 1.0], [obs1, obs])
>>> state_vector = np.array([0.0 + 0.0j, 0.0 + 0.1j, 0.1 + 0.1j, 0.1 + 0.2j,
0.2 + 0.2j, 0.3 + 0.3j, 0.3 + 0.4j, 0.4 + 0.5j,], dtype=np.complex64,)
>>> dev.syncH2D(state_vector)
>>> res = dev.expval(H)
>>> print(res)
1.0
"""
self._gpu_state.HostToDevice(state_vector.ravel(order="C"), use_async)
def _create_basis_state(self, index, use_async=False):
"""Return a computational basis state over all wires.
Args:
index (int): integer representing the computational basis state.
use_async(bool): indicates whether to use asynchronous memory copy from host to device or not.
Note: This function only supports synchronized memory copy.
"""
self._gpu_state.setBasisState(index, use_async)
def _apply_state_vector(self, state, device_wires, use_async=False):
"""Initialize the state vector on GPU with a specified state on host.
Note that any use of this method will introduce host-overheads.
Args:
state (array[complex]): normalized input state (on host) of length ``2**len(wires)``
or broadcasted state of shape ``(batch_size, 2**len(wires))``
device_wires (Wires): wires that get initialized in the state
use_async(bool): indicates whether to use asynchronous memory copy from host to device or not.
Note: This function only supports synchronized memory copy from host to device.
"""
# translate to wire labels used by device
device_wires = self.map_wires(device_wires)
state = self._asarray(state, dtype=self.C_DTYPE) # this operation on host
output_shape = [2] * self._num_local_wires
if len(device_wires) == self.num_wires and Wires(sorted(device_wires)) == device_wires:
# Initialize the entire device state with the input state
if self.num_wires == self._num_local_wires:
self.syncH2D(self._reshape(state, output_shape))
return
local_state = np.zeros(1 << self._num_local_wires, dtype=self.C_DTYPE)
self._mpi_manager.Scatter(state, local_state, 0)
# Initialize the entire device state with the input state
self.syncH2D(self._reshape(local_state, output_shape))
return
# generate basis states on subset of qubits via the cartesian product
basis_states = np.array(list(product([0, 1], repeat=len(device_wires))))
# get basis states to alter on full set of qubits
unravelled_indices = np.zeros((2 ** len(device_wires), self.num_wires), dtype=int)
unravelled_indices[:, device_wires] = basis_states
# get indices for which the state is changed to input state vector elements
ravelled_indices = np.ravel_multi_index(unravelled_indices.T, [2] * self.num_wires)
# set the state vector on GPU with the unravelled_indices and their corresponding values
self._gpu_state.setStateVector(
ravelled_indices, state, use_async
) # this operation on device
def _apply_basis_state(self, state, wires):
"""Initialize the state vector in a specified computational basis state on GPU directly.
Args:
state (array[int]): computational basis state (on host) of shape ``(wires,)``
consisting of 0s and 1s.
wires (Wires): wires that the provided computational state should be initialized on
Note: This function does not support broadcasted inputs yet.
"""
# translate to wire labels used by device
device_wires = self.map_wires(wires)
# length of basis state parameter
n_basis_state = len(state)
state = state.tolist() if hasattr(state, "tolist") else state
if not set(state).issubset({0, 1}):
raise ValueError("BasisState parameter must consist of 0 or 1 integers.")
if n_basis_state != len(device_wires):
raise ValueError("BasisState parameter and wires must be of equal length.")
# get computational basis state number
basis_states = 2 ** (self.num_wires - 1 - np.array(device_wires))
basis_states = qml.math.convert_like(basis_states, state)
num = int(qml.math.dot(state, basis_states))
self._create_basis_state(num)
[docs] def apply_lightning(self, operations):
"""Apply a list of operations to the state tensor.
Args:
operations (list[~pennylane.operation.Operation]): operations to apply
dtype (type): Type of numpy ``complex`` to be used. Can be important
to specify for large systems for memory allocation purposes.
Returns:
array[complex]: the output state tensor
"""
# Skip over identity operations instead of performing
# matrix multiplication with the identity.
for ops in operations:
if isinstance(ops, qml.Identity):
continue
if isinstance(ops, Adjoint):
name = ops.base.name
invert_param = True
else:
name = ops.name
invert_param = False
method = getattr(self._gpu_state, name, None)
wires = self.wires.indices(ops.wires)
if isinstance(ops, qml.ops.op_math.Controlled) and isinstance(
ops.base, qml.GlobalPhase
):
controls = ops.control_wires
control_values = ops.control_values
param = ops.base.parameters[0]
matrix = global_phase_diagonal(param, self.wires, controls, control_values)
self._gpu_state.apply(name, wires, False, [], matrix)
elif method is None:
# Inverse can be set to False since qml.matrix(ops) is already in inverted form
try:
mat = qml.matrix(ops)
except AttributeError: # pragma: no cover
# To support older versions of PL
mat = ops.matrix
r_dtype = np.float32 if self.use_csingle else np.float64
param = [[r_dtype(ops.hash)]] if isinstance(ops, gate_cache_needs_hash) else []
if len(mat) == 0:
raise ValueError("Unsupported operation")
self._gpu_state.apply(
name,
wires,
False,
param,
mat.ravel(order="C"), # inv = False: Matrix already in correct form;
) # Parameters can be ignored for explicit matrices; F-order for cuQuantum
else:
param = ops.parameters
method(wires, invert_param, param)
# pylint: disable=unused-argument
[docs] def apply(self, operations, rotations=None, **kwargs):
"""Applies a list of operations to the state tensor."""
# State preparation is currently done in Python
if operations: # make sure operations[0] exists
if isinstance(operations[0], StatePrep):
self._apply_state_vector(operations[0].parameters[0].copy(), operations[0].wires)
operations = operations[1:]
elif isinstance(operations[0], BasisState):
self._apply_basis_state(operations[0].parameters[0], operations[0].wires)
operations = operations[1:]
for operation in operations:
if isinstance(operation, (StatePrep, BasisState)):
raise DeviceError(
f"Operation {operation.name} cannot be used after other "
+ f"Operations have already been applied on a {self.short_name} device."
)
self.apply_lightning(operations)
@staticmethod
def _check_adjdiff_supported_operations(operations):
"""Check Lightning adjoint differentiation method support for a tape.
Raise ``QuantumFunctionError`` if ``tape`` contains not supported measurements,
observables, or operations by the Lightning adjoint differentiation method.
Args:
tape (.QuantumTape): quantum tape to differentiate.
"""
for op in operations:
if op.num_params > 1 and not isinstance(op, Rot):
raise QuantumFunctionError(
f"The {op.name} operation is not supported using "
'the "adjoint" differentiation method'
)
def _init_process_jacobian_tape(self, tape, starting_state, use_device_state):
"""Generate an initial state vector for ``_process_jacobian_tape``."""
if starting_state is not None:
if starting_state.size != 2 ** len(self.wires):
raise QuantumFunctionError(
"The number of qubits of starting_state must be the same as "
"that of the device."
)
self._apply_state_vector(starting_state, self.wires)
elif not use_device_state:
self.reset()
self.apply(tape.operations)
return self._gpu_state
# pylint: disable=too-many-branches
[docs] def adjoint_jacobian(self, tape, starting_state=None, use_device_state=False):
"""Implements the adjoint method outlined in
`Jones and Gacon <https://arxiv.org/abs/2009.02823>`__ to differentiate an input tape.
After a forward pass, the circuit is reversed by iteratively applying adjoint
gates to scan backwards through the circuit.
"""
if self.shots is not None:
warn(
"Requested adjoint differentiation to be computed with finite shots."
" The derivative is always exact when using the adjoint differentiation method.",
UserWarning,
)
tape_return_type = self._check_adjdiff_supported_measurements(tape.measurements)
if not tape_return_type: # the tape does not have measurements
return np.array([], dtype=self.state.dtype)
if tape_return_type is State: # pragma: no cover
raise QuantumFunctionError(
"Adjoint differentiation method does not support measurement StateMP."
"Use vjp method instead for this purpose."
)
# Check adjoint diff support
self._check_adjdiff_supported_operations(tape.operations)
processed_data = self._process_jacobian_tape(
tape, starting_state, use_device_state, self._mpi, self._batch_obs
)
if not processed_data: # training_params is empty
return np.array([], dtype=self.state.dtype)
trainable_params = processed_data["tp_shift"]
# pylint: disable=pointless-string-statement
"""
This path enables controlled batching over the requested observables, be they explicit, or part of a Hamiltonian.
The traditional path will assume there exists enough free memory to preallocate all arrays and run through each observable iteratively.
However, for larger system, this becomes impossible, and we hit memory issues very quickly. the batching support here enables several functionalities:
- Pre-allocate memory for all observables on the primary GPU (`batch_obs=False`, default behaviour): This is the simplest path, and works best for few observables, and moderate qubit sizes. All memory is preallocated for each observable, and run through iteratively on a single GPU.
- Evenly distribute the observables over all available GPUs (`batch_obs=True`): This will evenly split the data into ceil(num_obs/num_gpus) chunks, and allocate enough space on each GPU up-front before running through them concurrently. This relies on C++ threads to handle the orchestration.
- Allocate at most `n` observables per GPU (`batch_obs=n`): Providing an integer value restricts each available GPU to at most `n` copies of the statevector, and hence `n` given observables for a given batch. This will iterate over the data in chnuks of size `n*num_gpus`.
"""
adjoint_jacobian = _adj_dtype(self.use_csingle, self._mpi)()
if self._batch_obs: # Batching of Measurements
if not self._mpi: # Single-node path, controlled batching over available GPUs
num_obs = len(processed_data["obs_serialized"])
batch_size = (
num_obs
if isinstance(self._batch_obs, bool)
else self._batch_obs * self._dp.getTotalDevices()
)
jac = []
for chunk in range(0, num_obs, batch_size):
obs_chunk = processed_data["obs_serialized"][chunk : chunk + batch_size]
jac_chunk = adjoint_jacobian.batched(
self._gpu_state,
obs_chunk,
processed_data["ops_serialized"],
trainable_params,
)
jac.extend(jac_chunk)
else: # MPI path, restrict memory per known GPUs
jac = adjoint_jacobian.batched(
self._gpu_state,
processed_data["obs_serialized"],
processed_data["ops_serialized"],
trainable_params,
)
else:
jac = adjoint_jacobian(
self._gpu_state,
processed_data["obs_serialized"],
processed_data["ops_serialized"],
trainable_params,
)
jac = np.array(jac) # only for parameters differentiable with the adjoint method
jac = jac.reshape(-1, len(trainable_params))
jac_r = np.zeros((len(tape.observables), processed_data["all_params"]))
if not self._batch_obs:
jac_r[:, processed_data["record_tp_rows"]] = jac
else:
# Reduce over decomposed expval(H), if required.
for idx in range(len(processed_data["obs_idx_offsets"][0:-1])):
if (
processed_data["obs_idx_offsets"][idx + 1]
- processed_data["obs_idx_offsets"][idx]
) > 1:
jac_r[idx, :] = np.sum(
jac[
processed_data["obs_idx_offsets"][idx] : processed_data[
"obs_idx_offsets"
][idx + 1],
:,
],
axis=0,
)
else:
jac_r[idx, :] = jac[
processed_data["obs_idx_offsets"][idx] : processed_data["obs_idx_offsets"][
idx + 1
],
:,
]
return self._adjoint_jacobian_processing(jac_r)
# pylint: disable=inconsistent-return-statements, line-too-long, missing-function-docstring
[docs] def vjp(self, measurements, grad_vec, starting_state=None, use_device_state=False):
"""Generate the processing function required to compute the vector-Jacobian products
of a tape.
This function can be used with multiple expectation values or a quantum state.
When a quantum state is given,
.. code-block:: python
vjp_f = dev.vjp([qml.state()], grad_vec)
vjp = vjp_f(tape)
computes :math:`w = (w_1,\\cdots,w_m)` where
.. math::
w_k = \\langle v| \\frac{\\partial}{\\partial \\theta_k} | \\psi_{\\pmb{\\theta}} \\rangle.
Here, :math:`m` is the total number of trainable parameters,
:math:`\\pmb{\\theta}` is the vector of trainable parameters and
:math:`\\psi_{\\pmb{\\theta}}` is the output quantum state.
Args:
measurements (list): List of measurement processes for vector-Jacobian product.
Now it must be expectation values or a quantum state.
grad_vec (tensor_like): Gradient-output vector. Must have shape matching the output
shape of the corresponding tape, i.e. number of measurements if the return
type is expectation or :math:`2^N` if the return type is statevector
starting_state (tensor_like): post-forward pass state to start execution with.
It should be complex-valued. Takes precedence over ``use_device_state``.
use_device_state (bool): use current device state to initialize.
A forward pass of the same circuit should be the last thing the device
has executed. If a ``starting_state`` is provided, that takes precedence.
Returns:
The processing function required to compute the vector-Jacobian products of a tape.
"""
if self.shots is not None:
warn(
"Requested adjoint differentiation to be computed with finite shots."
" The derivative is always exact when using the adjoint differentiation method.",
UserWarning,
)
tape_return_type = self._check_adjdiff_supported_measurements(measurements)
if math.allclose(grad_vec, 0) or tape_return_type is None:
return lambda tape: math.convert_like(np.zeros(len(tape.trainable_params)), grad_vec)
if tape_return_type is Expectation:
if len(grad_vec) != len(measurements):
raise ValueError(
"Number of observables in the tape must be the same as the length of grad_vec in the vjp method"
)
if np.iscomplexobj(grad_vec):
raise ValueError(
"The vjp method only works with a real-valued grad_vec when the tape is returning an expectation value"
)
ham = qml.Hamiltonian(grad_vec, [m.obs for m in measurements])
# pylint: disable=protected-access
def processing_fn(tape):
nonlocal ham
num_params = len(tape.trainable_params)
if num_params == 0:
return np.array([], dtype=self.state.dtype)
new_tape = tape.copy()
new_tape._measurements = [qml.expval(ham)]
return self.adjoint_jacobian(new_tape, starting_state, use_device_state)
return processing_fn
# pylint: disable=attribute-defined-outside-init
[docs] def sample(self, observable, shot_range=None, bin_size=None, counts=False):
"""Return samples of an observable."""
diagonalizing_gates = observable.diagonalizing_gates()
if diagonalizing_gates:
self.apply(diagonalizing_gates)
if not isinstance(observable, qml.PauliZ):
self._samples = self.generate_samples()
results = super().sample(
observable, shot_range=shot_range, bin_size=bin_size, counts=counts
)
if diagonalizing_gates:
self.apply([qml.adjoint(g, lazy=False) for g in reversed(diagonalizing_gates)])
return results
[docs] def generate_samples(self):
"""Generate samples
Returns:
array[int]: array of samples in binary representation with shape
``(dev.shots, dev.num_wires)``
"""
return self.measurements.generate_samples(len(self.wires), self.shots).astype(
int, copy=False
)
# pylint: disable=protected-access
[docs] def expval(self, observable, shot_range=None, bin_size=None):
"""Expectation value of the supplied observable.
Args:
observable: A PennyLane observable.
shot_range (tuple[int]): 2-tuple of integers specifying the range of samples
to use. If not specified, all samples are used.
bin_size (int): Divides the shot range into bins of size ``bin_size``, and
returns the measurement statistic separately over each bin. If not
provided, the entire shot range is treated as a single bin.
Returns:
Expectation value of the observable
"""
if self.shots is not None:
# estimate the expectation value
samples = self.sample(observable, shot_range=shot_range, bin_size=bin_size)
return np.squeeze(np.mean(samples, axis=0))
if isinstance(observable, qml.SparseHamiltonian):
if self._mpi:
# Identity for CSR_SparseHamiltonian to pass to processes with rank != 0 to reduce
# host(cpu) memory requirements
obs = qml.Identity(0)
Hmat = qml.Hamiltonian([1.0], [obs]).sparse_matrix()
H_sparse = qml.SparseHamiltonian(Hmat, wires=range(1))
CSR_SparseHamiltonian = H_sparse.sparse_matrix().tocsr()
# CSR_SparseHamiltonian for rank == 0
if self._mpi_manager.getRank() == 0:
CSR_SparseHamiltonian = observable.sparse_matrix().tocsr()
else:
CSR_SparseHamiltonian = observable.sparse_matrix().tocsr()
return self.measurements.expval(
CSR_SparseHamiltonian.indptr,
CSR_SparseHamiltonian.indices,
CSR_SparseHamiltonian.data,
)
# use specialized functors to compute expval(Hermitian)
if isinstance(observable, qml.Hermitian):
observable_wires = self.map_wires(observable.wires)
if self._mpi and len(observable_wires) > self._num_local_wires:
raise RuntimeError(
"MPI backend does not support Hermitian with number of target wires larger than local wire number."
)
matrix = observable.matrix()
return self.measurements.expval(matrix, observable_wires)
if (
isinstance(observable, qml.ops.Hamiltonian)
or (observable.arithmetic_depth > 0)
or isinstance(observable.name, List)
):
ob_serialized = QuantumScriptSerializer(
self.short_name, self.use_csingle, self._mpi
)._ob(observable, self.wire_map)
return self.measurements.expval(ob_serialized)
# translate to wire labels used by device
observable_wires = self.map_wires(observable.wires)
return self.measurements.expval(observable.name, observable_wires)
[docs] def probability_lightning(self, wires=None):
"""Return the probability of each computational basis state.
Args:
wires (Iterable[Number, str], Number, str, Wires): wires to return
marginal probabilities for. Wires not provided are traced out of the system.
Returns:
array[float]: list of the probabilities
"""
# translate to wire labels used by device
observable_wires = self.map_wires(wires)
# Device returns as col-major orderings, so perform transpose on data for bit-index shuffle for now.
local_prob = self.measurements.probs(observable_wires)
if len(local_prob) > 0:
num_local_wires = len(local_prob).bit_length() - 1 if len(local_prob) > 0 else 0
return local_prob.reshape([2] * num_local_wires).transpose().reshape(-1)
return local_prob
[docs] def var(self, observable, shot_range=None, bin_size=None):
"""Variance of the supplied observable.
Args:
observable: A PennyLane observable.
shot_range (tuple[int]): 2-tuple of integers specifying the range of samples
to use. If not specified, all samples are used.
bin_size (int): Divides the shot range into bins of size ``bin_size``, and
returns the measurement statistic separately over each bin. If not
provided, the entire shot range is treated as a single bin.
Returns:
Variance of the observable
"""
if self.shots is not None:
# estimate the var
# Lightning doesn't support sampling yet
samples = self.sample(observable, shot_range=shot_range, bin_size=bin_size)
return np.squeeze(np.var(samples, axis=0))
if isinstance(observable, qml.SparseHamiltonian):
csr_hamiltonian = observable.sparse_matrix(wire_order=self.wires).tocsr(copy=False)
return self.measurements.var(
csr_hamiltonian.indptr,
csr_hamiltonian.indices,
csr_hamiltonian.data,
)
if (
isinstance(observable, (qml.Hermitian, qml.ops.Hamiltonian))
or (observable.arithmetic_depth > 0)
or isinstance(observable.name, List)
):
ob_serialized = QuantumScriptSerializer(
self.short_name, self.use_csingle, self._mpi
)._ob(observable, self.wire_map)
return self.measurements.var(ob_serialized)
# translate to wire labels used by device
observable_wires = self.map_wires(observable.wires)
return self.measurements.var(observable.name, observable_wires)
_modules/pennylane_lightning/lightning_gpu/lightning_gpu
Download Python script
Download Notebook
View on GitHub