Source code for pennylane_lightning.lightning_qubit
# Copyright 2021 Xanadu Quantum Technologies Inc.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
r"""
This module contains the :class:`~.LightningQubit` class, a PennyLane simulator device that
interfaces with C++ for fast linear algebra calculations.
"""
from typing import List
from warnings import warn
from os import getenv
from itertools import islice
import numpy as np
from pennylane import (
math,
gradients,
BasisState,
QubitStateVector,
QubitUnitary,
Projector,
Hermitian,
Rot,
QuantumFunctionError,
DeviceError,
)
from pennylane.devices import DefaultQubit
from pennylane.operation import Tensor, Operation
from pennylane.measurements import MeasurementProcess, Expectation, State
from pennylane.wires import Wires
# Remove after the next release of PL
# Add from pennylane import matrix
import pennylane as qml
from ._version import __version__
try:
from .lightning_qubit_ops import (
adjoint_diff,
MeasuresC64,
StateVectorC64,
MeasuresC128,
StateVectorC128,
Kokkos_info,
allocate_aligned_array,
get_alignment,
best_alignment,
)
from ._serialize import _serialize_observables, _serialize_ops
CPP_BINARY_AVAILABLE = True
except ModuleNotFoundError:
CPP_BINARY_AVAILABLE = False
def _chunk_iterable(it, num_chunks):
"Lazy-evaluated chunking of given iterable from https://stackoverflow.com/a/22045226"
it = iter(it)
return iter(lambda: tuple(islice(it, num_chunks)), ())
def _remove_snapshot_from_operations(operations):
operations = operations.copy()
operations.discard("Snapshot")
return operations
def _remove_op_arithmetic_from_observables(observables):
observables = observables.copy()
observables.discard("Sum")
observables.discard("SProd")
observables.discard("Prod")
return observables
class LightningQubit(DefaultQubit):
"""PennyLane Lightning device.
An extension of PennyLane's built-in ``default.qubit`` device that interfaces with C++ to
perform fast linear algebra calculations.
Use of this device requires pre-built binaries or compilation from source. Check out the
:doc:`/installation` guide for more details.
Args:
wires (int): the number of wires to initialize the device with
c_dtype: Datatypes for statevector representation. Must be one of ``np.complex64`` or ``np.complex128``.
shots (int): How many times the circuit should be evaluated (or sampled) to estimate
the expectation values. Defaults to ``None`` if not specified. Setting
to ``None`` results in computing statistics like expectation values and
variances analytically.
batch_obs (bool): Determine whether we process observables parallelly when computing the
jacobian. This value is only relevant when the lightning qubit is built with OpenMP.
"""
name = "Lightning Qubit PennyLane plugin"
short_name = "lightning.qubit"
pennylane_requires = ">=0.15"
version = __version__
author = "Xanadu Inc."
_CPP_BINARY_AVAILABLE = True
operations = _remove_snapshot_from_operations(DefaultQubit.operations)
observables = _remove_op_arithmetic_from_observables(DefaultQubit.observables)
def __init__(self, wires, *, c_dtype=np.complex128, shots=None, batch_obs=False):
if c_dtype is np.complex64:
r_dtype = np.float32
self.use_csingle = True
elif c_dtype is np.complex128:
r_dtype = np.float64
self.use_csingle = False
else:
raise TypeError(f"Unsupported complex Type: {c_dtype}")
super().__init__(wires, r_dtype=r_dtype, c_dtype=c_dtype, shots=shots)
self._batch_obs = batch_obs
@staticmethod
def _asarray(arr, dtype=None):
arr = np.asarray(arr) # arr is not copied
if arr.dtype.kind not in ["f", "c"]:
return arr
if not dtype:
dtype = arr.dtype
# We allocate a new aligned memory and copy data to there if alignment or dtype mismatches
# Note that get_alignment does not neccsarily returns CPUMemoryModel(Unaligned) even for
# numpy allocated memory as the memory location happens to be aligend.
if int(get_alignment(arr)) < int(best_alignment()) or arr.dtype != dtype:
new_arr = allocate_aligned_array(arr.size, np.dtype(dtype)).reshape(arr.shape)
np.copyto(new_arr, arr)
arr = new_arr
return arr
[docs] @classmethod
def capabilities(cls):
capabilities = super().capabilities().copy()
capabilities.update(
model="qubit",
supports_reversible_diff=False,
supports_inverse_operations=True,
supports_analytic_computation=True,
supports_broadcasting=False,
returns_state=True,
)
capabilities.pop("passthru_devices", None)
return capabilities
[docs] def apply(self, operations, rotations=None, **kwargs):
# State preparation is currently done in Python
if operations: # make sure operations[0] exists
if isinstance(operations[0], QubitStateVector):
self._apply_state_vector(operations[0].parameters[0].copy(), operations[0].wires)
del operations[0]
elif isinstance(operations[0], BasisState):
self._apply_basis_state(operations[0].parameters[0], operations[0].wires)
del operations[0]
for operation in operations:
if isinstance(operation, (QubitStateVector, BasisState)):
raise DeviceError(
"Operation {} cannot be used after other Operations have already been "
"applied on a {} device.".format(operation.name, self.short_name)
)
if operations:
self._pre_rotated_state = self.apply_lightning(self._state, operations)
else:
self._pre_rotated_state = self._state
if rotations:
if any(isinstance(r, QubitUnitary) for r in rotations):
super().apply(operations=[], rotations=rotations)
else:
self._state = self.apply_lightning(np.copy(self._pre_rotated_state), rotations)
else:
self._state = self._pre_rotated_state
[docs] def apply_lightning(self, state, operations):
"""Apply a list of operations to the state tensor.
Args:
state (array[complex]): the input state tensor
operations (list[~pennylane.operation.Operation]): operations to apply
dtype (type): Type of numpy ``complex`` to be used. Can be important
to specify for large systems for memory allocation purposes.
Returns:
array[complex]: the output state tensor
"""
state_vector = np.ravel(state)
if self.use_csingle:
# use_csingle
sim = StateVectorC64(state_vector)
else:
# self.C_DTYPE is np.complex128 by default
sim = StateVectorC128(state_vector)
# Skip over identity operations instead of performing
# matrix multiplication with the identity.
skipped_ops = ["Identity"]
for o in operations:
if o.base_name in skipped_ops:
continue
name = o.name.split(".")[0] # The split is because inverse gates have .inv appended
method = getattr(sim, name, None)
wires = self.wires.indices(o.wires)
if method is None:
# Inverse can be set to False since qml.matrix(o) is already in inverted form
method = getattr(sim, "applyMatrix")
try:
method(qml.matrix(o), wires, False)
except AttributeError: # pragma: no cover
# To support older versions of PL
method(o.matrix, wires, False)
else:
inv = o.inverse
param = o.parameters
method(wires, inv, param)
return np.reshape(state_vector, state.shape)
@staticmethod
def _check_adjdiff_supported_measurements(measurements: List[MeasurementProcess]):
"""Check whether given list of measurement is supported by adjoint_diff.
Args:
measurements (List[MeasurementProcess]): a list of measurement processes to check.
Returns:
Expectation or State: a common return type of measurements.
"""
if len(measurements) == 0:
return None
if len(measurements) == 1 and measurements[0].return_type is State:
return State
# Now the return_type of measurement processes must be expectation
if not all([m.return_type is Expectation for m in measurements]):
raise QuantumFunctionError(
"Adjoint differentiation method does not support expectation return type "
"mixed with other return types"
)
for m in measurements:
if not isinstance(m.obs, Tensor):
if isinstance(m.obs, Projector):
raise QuantumFunctionError(
"Adjoint differentiation method does not support the Projector observable"
)
else:
if any([isinstance(o, Projector) for o in m.obs.non_identity_obs]):
raise QuantumFunctionError(
"Adjoint differentiation method does not support the Projector observable"
)
return Expectation
@staticmethod
def _check_adjdiff_supported_operations(operations):
"""Check Lightning adjoint differentiation method support for a tape.
Raise ``QuantumFunctionError`` if ``tape`` contains not supported measurements,
observables, or operations by the Lightning adjoint differentiation method.
Args:
tape (.QuantumTape): quantum tape to differentiate.
"""
for op in operations:
if op.num_params > 1 and not isinstance(op, Rot):
raise QuantumFunctionError(
f"The {op.name} operation is not supported using "
'the "adjoint" differentiation method'
)
def _process_jacobian_tape(self, tape, starting_state, use_device_state):
# To support np.complex64 based on the type of self._state
if self.use_csingle:
create_ops_list = adjoint_diff.create_ops_list_C64
else:
create_ops_list = adjoint_diff.create_ops_list_C128
# Initialization of state
if starting_state is not None:
if starting_state.size != 2 ** len(self.wires):
raise QuantumFunctionError(
"The number of qubits of starting_state must be the same as "
"that of the device."
)
ket = self._asarray(starting_state, dtype=self.C_DTYPE)
else:
if not use_device_state:
self.reset()
self.apply(tape.operations)
ket = self._pre_rotated_state
obs_serialized = _serialize_observables(tape, self.wire_map, use_csingle=self.use_csingle)
ops_serialized, use_sp = _serialize_ops(tape, self.wire_map)
ops_serialized = create_ops_list(*ops_serialized)
# We need to filter out indices in trainable_params which do not
# correspond to operators.
trainable_params = sorted(tape.trainable_params)
if len(trainable_params) == 0:
return None
tp_shift = []
record_tp_rows = []
all_params = 0
for op_idx, tp in enumerate(trainable_params):
op, _ = tape.get_operation(
op_idx
) # get op_idx-th operator among differentiable operators
if isinstance(op, Operation) and not isinstance(op, (BasisState, QubitStateVector)):
# We now just ignore non-op or state preps
tp_shift.append(tp)
record_tp_rows.append(all_params)
all_params += 1
if use_sp:
# When the first element of the tape is state preparation. Still, I am not sure
# whether there must be only one state preparation...
tp_shift = [i - 1 for i in tp_shift]
ket = ket.reshape(-1)
state_vector = StateVectorC64(ket) if self.use_csingle else StateVectorC128(ket)
return {
"state_vector": state_vector,
"obs_serialized": obs_serialized,
"ops_serialized": ops_serialized,
"tp_shift": tp_shift,
"record_tp_rows": record_tp_rows,
"all_params": all_params,
}
[docs] def adjoint_jacobian(self, tape, starting_state=None, use_device_state=False):
if self.shots is not None:
warn(
"Requested adjoint differentiation to be computed with finite shots."
" The derivative is always exact when using the adjoint differentiation method.",
UserWarning,
)
tape_return_type = self._check_adjdiff_supported_measurements(tape.measurements)
if not tape_return_type: # the tape does not have measurements
return np.array([], dtype=self._state.dtype)
if tape_return_type is State:
raise QuantumFunctionError(
"This method does not support statevector return type. "
"Use vjp method instead for this purpose."
)
self._check_adjdiff_supported_operations(tape.operations)
processed_data = self._process_jacobian_tape(tape, starting_state, use_device_state)
if not processed_data: # training_params is empty
return np.array([], dtype=self._state.dtype)
trainable_params = processed_data["tp_shift"]
# If requested batching over observables, chunk into OMP_NUM_THREADS sized chunks.
# This will allow use of Lightning with adjoint for large-qubit numbers AND large
# numbers of observables, enabling choice between compute time and memory use.
requested_threads = int(getenv("OMP_NUM_THREADS", "1"))
if self._batch_obs and requested_threads > 1:
obs_partitions = _chunk_iterable(processed_data["obs_serialized"], requested_threads)
jac = []
for obs_chunk in obs_partitions:
jac_local = adjoint_diff.adjoint_jacobian(
processed_data["state_vector"],
obs_chunk,
processed_data["ops_serialized"],
trainable_params,
)
jac.extend(jac_local)
else:
jac = adjoint_diff.adjoint_jacobian(
processed_data["state_vector"],
processed_data["obs_serialized"],
processed_data["ops_serialized"],
trainable_params,
)
jac = np.array(jac)
jac = jac.reshape(-1, len(trainable_params))
jac_r = np.zeros((jac.shape[0], processed_data["all_params"]))
jac_r[:, processed_data["record_tp_rows"]] = jac
return jac_r
[docs] def vjp(self, measurements, dy, starting_state=None, use_device_state=False):
"""Generate the processing function required to compute the vector-Jacobian products of a tape.
This function can be used with multiple expectation values or a quantum state. When a quantum state
is given,
.. code-block:: python
vjp_f = dev.vjp([qml.state()], dy)
vjp = vjp_f(tape)
computes :math:`w = (w_1,\cdots,w_m)` where
.. math::
w_k = \\langle v| \\frac{\partial}{\partial \\theta_k} | \psi_{\pmb{\\theta}} \\rangle.
Here, :math:`m` is the total number of trainable parameters, :math:`\pmb{\\theta}` is the vector of trainable parameters and :math:`\psi_{\pmb{\\theta}}`
is the output quantum state.
Args:
measurements (list): List of measurement processes for vector-Jacobian product. Now it must be expectation values or a quantum state.
dy (tensor_like): Gradient-output vector. Must have shape matching the output shape of the corresponding tape, i.e. number of measrurements if the return type is expectation or :math:`2^N` if the return type is statevector
starting_state (tensor_like): post-forward pass state to start execution with. It should be
complex-valued. Takes precedence over ``use_device_state``.
use_device_state (bool): use current device state to initialize. A forward pass of the same
circuit should be the last thing the device has executed. If a ``starting_state`` is
provided, that takes precedence.
Returns:
The processing function required to compute the vector-Jacobian products of a tape.
"""
if self.shots is not None:
warn(
"Requested adjoint differentiation to be computed with finite shots."
" The derivative is always exact when using the adjoint differentiation method.",
UserWarning,
)
tape_return_type = self._check_adjdiff_supported_measurements(measurements)
if math.allclose(dy, 0) or tape_return_type is None:
return lambda tape: math.convert_like(np.zeros(len(tape.trainable_params)), dy)
if tape_return_type is Expectation:
if len(dy) != len(measurements):
raise ValueError(
"Number of observables in the tape must be the same as the length of dy in the vjp method"
)
if np.iscomplexobj(dy):
raise ValueError(
"The vjp method only works with a real-valued dy when the tape is returning an expectation value"
)
ham = qml.Hamiltonian(dy, [m.obs for m in measurements])
def processing_fn(tape):
nonlocal ham
num_params = len(tape.trainable_params)
if num_params == 0:
return np.array([], dtype=self._state.dtype)
new_tape = tape.copy()
new_tape._measurements = [qml.expval(ham)]
return self.adjoint_jacobian(new_tape, starting_state, use_device_state).reshape(-1)
return processing_fn
if tape_return_type is State:
if len(dy) != 2 ** len(self.wires):
raise ValueError(
"Size of the provided vector dy must be the same as the size of the statevector"
)
if np.isrealobj(dy):
warn(
"The vjp method only works with complex-valued dy when the tape is returning a statevector. Upcasting dy."
)
dy = dy.astype(self.C_DTYPE)
def processing_fn(tape):
nonlocal dy
processed_data = self._process_jacobian_tape(tape, starting_state, use_device_state)
return adjoint_diff.statevector_vjp(
processed_data["state_vector"],
processed_data["ops_serialized"],
dy,
processed_data["tp_shift"],
)
return processing_fn
[docs] def batch_vjp(
self, tapes, dys, reduction="append", starting_state=None, use_device_state=False
):
"""Generate the processing function required to compute the vector-Jacobian products
of a batch of tapes.
Args:
tapes (Sequence[.QuantumTape]): sequence of quantum tapes to differentiate
dys (Sequence[tensor_like]): Sequence of gradient-output vectors ``dy``. Must be the
same length as ``tapes``. Each ``dy`` tensor should have shape
matching the output shape of the corresponding tape.
reduction (str): Determines how the vector-Jacobian products are returned.
If ``append``, then the output of the function will be of the form
``List[tensor_like]``, with each element corresponding to the VJP of each
input tape. If ``extend``, then the output VJPs will be concatenated.
starting_state (tensor_like): post-forward pass state to start execution with. It should be
complex-valued. Takes precedence over ``use_device_state``.
use_device_state (bool): use current device state to initialize. A forward pass of the same
circuit should be the last thing the device has executed. If a ``starting_state`` is
provided, that takes precedence.
Returns:
The processing function required to compute the vector-Jacobian products of a batch of tapes.
"""
fns = []
# Loop through the tapes and dys vector
for tape, dy in zip(tapes, dys):
fn = self.vjp(
tape.measurements,
dy,
starting_state=starting_state,
use_device_state=use_device_state,
)
fns.append(fn)
def processing_fns(tapes):
vjps = []
for t, f in zip(tapes, fns):
vjp = f(t)
if isinstance(reduction, str):
getattr(vjps, reduction)(vjp)
elif callable(reduction):
reduction(vjps, vjp)
return vjps
return processing_fns
[docs] def probability(self, wires=None, shot_range=None, bin_size=None):
"""Return the probability of each computational basis state.
Devices that require a finite number of shots always return the
estimated probability.
Args:
wires (Iterable[Number, str], Number, str, Wires): wires to return
marginal probabilities for. Wires not provided are traced out of the system.
shot_range (tuple[int]): 2-tuple of integers specifying the range of samples
to use. If not specified, all samples are used.
bin_size (int): Divides the shot range into bins of size ``bin_size``, and
returns the measurement statistic separately over each bin. If not
provided, the entire shot range is treated as a single bin.
Returns:
array[float]: list of the probabilities
"""
if self.shots is not None:
return self.estimate_probability(wires=wires, shot_range=shot_range, bin_size=bin_size)
wires = wires or self.wires
wires = Wires(wires)
# translate to wire labels used by device
device_wires = self.map_wires(wires)
# To support np.complex64 based on the type of self._state
dtype = self._state.dtype
ket = np.ravel(self._state)
state_vector = StateVectorC64(ket) if self.use_csingle else StateVectorC128(ket)
M = MeasuresC64(state_vector) if self.use_csingle else MeasuresC128(state_vector)
return M.probs(device_wires)
[docs] def generate_samples(self):
"""Generate samples
Returns:
array[int]: array of samples in binary representation with shape ``(dev.shots, dev.num_wires)``
"""
# Initialization of state
ket = np.ravel(self._state)
state_vector = StateVectorC64(ket) if self.use_csingle else StateVectorC128(ket)
M = MeasuresC64(state_vector) if self.use_csingle else MeasuresC128(state_vector)
return M.generate_samples(len(self.wires), self.shots).astype(int, copy=False)
[docs] def expval(self, observable, shot_range=None, bin_size=None):
"""Expectation value of the supplied observable.
Args:
observable: A PennyLane observable.
shot_range (tuple[int]): 2-tuple of integers specifying the range of samples
to use. If not specified, all samples are used.
bin_size (int): Divides the shot range into bins of size ``bin_size``, and
returns the measurement statistic separately over each bin. If not
provided, the entire shot range is treated as a single bin.
Returns:
Expectation value of the observable
"""
if isinstance(observable.name, List) or observable.name in [
"Identity",
"Projector",
"Hermitian",
"Hamiltonian",
]:
return super().expval(observable, shot_range=shot_range, bin_size=bin_size)
if self.shots is not None:
# estimate the expectation value
# LightningQubit doesn't support sampling yet
samples = self.sample(observable, shot_range=shot_range, bin_size=bin_size)
return np.squeeze(np.mean(samples, axis=0))
# Initialization of state
ket = np.ravel(self._pre_rotated_state)
state_vector = StateVectorC64(ket) if self.use_csingle else StateVectorC128(ket)
M = MeasuresC64(state_vector) if self.use_csingle else MeasuresC128(state_vector)
if observable.name == "SparseHamiltonian":
if Kokkos_info()["USE_KOKKOS"] == True:
# converting COO to CSR sparse representation.
CSR_SparseHamiltonian = observable.data[0].tocsr(copy=False)
return M.expval(
CSR_SparseHamiltonian.indptr,
CSR_SparseHamiltonian.indices,
CSR_SparseHamiltonian.data,
)
return super().expval(observable, shot_range=shot_range, bin_size=bin_size)
# translate to wire labels used by device
observable_wires = self.map_wires(observable.wires)
return M.expval(observable.name, observable_wires)
[docs] def var(self, observable, shot_range=None, bin_size=None):
"""Variance of the supplied observable.
Args:
observable: A PennyLane observable.
shot_range (tuple[int]): 2-tuple of integers specifying the range of samples
to use. If not specified, all samples are used.
bin_size (int): Divides the shot range into bins of size ``bin_size``, and
returns the measurement statistic separately over each bin. If not
provided, the entire shot range is treated as a single bin.
Returns:
Variance of the observable
"""
if isinstance(observable.name, List) or observable.name in [
"Identity",
"Projector",
"Hermitian",
]:
return super().var(observable, shot_range=shot_range, bin_size=bin_size)
if self.shots is not None:
# estimate the var
# LightningQubit doesn't support sampling yet
samples = self.sample(observable, shot_range=shot_range, bin_size=bin_size)
return np.squeeze(np.var(samples, axis=0))
# Initialization of state
ket = np.ravel(self._pre_rotated_state)
state_vector = StateVectorC64(ket) if self.use_csingle else StateVectorC128(ket)
M = MeasuresC64(state_vector) if self.use_csingle else MeasuresC128(state_vector)
# translate to wire labels used by device
observable_wires = self.map_wires(observable.wires)
return M.var(observable.name, observable_wires)
if not CPP_BINARY_AVAILABLE:
[docs] class LightningQubit(DefaultQubit): # pragma: no cover
name = "Lightning Qubit PennyLane plugin"
short_name = "lightning.qubit"
pennylane_requires = ">=0.15"
version = __version__
author = "Xanadu Inc."
_CPP_BINARY_AVAILABLE = False
operations = _remove_snapshot_from_operations(DefaultQubit.operations)
def __init__(self, wires, *, c_dtype=np.complex128, **kwargs):
warn(
"Pre-compiled binaries for lightning.qubit are not available. Falling back to "
"using the Python-based default.qubit implementation. To manually compile from "
"source, follow the instructions at "
"https://pennylane-lightning.readthedocs.io/en/latest/installation.html.",
UserWarning,
)
if c_dtype is np.complex64:
r_dtype = np.float32
elif c_dtype is np.complex128:
r_dtype = np.float64
else:
raise TypeError(f"Unsupported complex Type: {c_dtype}")
super().__init__(wires, r_dtype=r_dtype, c_dtype=c_dtype, **kwargs)
_modules/pennylane_lightning/lightning_qubit
Download Python script
Download Notebook
View on GitHub