Source code for pennylane.devices.null_qubit

# Copyright 2022 Xanadu Quantum Technologies Inc.

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at

#     http://www.apache.org/licenses/LICENSE-2.0

# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
The null.qubit device is a no-op device, useful for resource estimation, and for
benchmarking PennyLane's auxiliary functionality outside direct circuit evaluations.
"""


import inspect
import json
import logging
import time
from collections import defaultdict
from dataclasses import replace
from functools import lru_cache, singledispatch
from numbers import Number
from typing import Optional, Union

import numpy as np

from pennylane import math
from pennylane.devices.modifiers import simulator_tracking, single_tape_support
from pennylane.measurements import (
    ClassicalShadowMP,
    CountsMP,
    DensityMatrixMP,
    MeasurementProcess,
    MeasurementValue,
    ProbabilityMP,
    StateMP,
)
from pennylane.ops.op_math import Adjoint, Controlled, ControlledOp
from pennylane.tape import QuantumScriptOrBatch
from pennylane.transforms.core import TransformProgram
from pennylane.typing import Result, ResultBatch

from . import DefaultQubit, Device
from .execution_config import DefaultExecutionConfig, ExecutionConfig
from .preprocess import decompose

logger = logging.getLogger(__name__)
logger.addHandler(logging.NullHandler())

RESOURCES_FNAME_PREFIX = "__pennylane_resources_data_"


[docs] @singledispatch def zero_measurement( mp: MeasurementProcess, num_device_wires, shots: Optional[int], batch_size: int, interface: str ): """Create all-zero results for various measurement processes.""" return _zero_measurement(mp, num_device_wires, shots, batch_size, interface)
def _zero_measurement( mp: MeasurementProcess, num_device_wires: int, shots: Optional[int], batch_size, interface ): shape = mp.shape(shots, num_device_wires) if batch_size is not None: shape = (batch_size,) + shape if "jax" not in interface: return _cached_zero_return(shape, interface, mp.numeric_type) return math.zeros(shape, like=interface, dtype=mp.numeric_type) @lru_cache(maxsize=128) def _cached_zero_return(shape, interface, dtype): return math.zeros(shape, like=interface, dtype=dtype) @zero_measurement.register def _(mp: ClassicalShadowMP, num_device_wires, shots: Optional[int], batch_size, interface): if batch_size is not None: # shapes = [(batch_size,) + shape for shape in shapes] raise ValueError( "Parameter broadcasting is not supported with null.qubit and qml.classical_shadow" ) shape = mp.shape(shots, num_device_wires) return math.zeros(shape, like=interface, dtype=np.int8) @zero_measurement.register def _(mp: CountsMP, num_device_wires, shots, batch_size, interface): outcomes = [] if mp.obs is None and not isinstance(mp.mv, MeasurementValue): state = "0" * num_device_wires results = {state: math.asarray(shots, like=interface)} if mp.all_outcomes: outcomes = [f"{x:0{num_device_wires}b}" for x in range(1, 2**num_device_wires)] else: outcomes = sorted(mp.eigvals()) # always assign shots to the smallest results = {outcomes[0]: math.asarray(shots, like=interface)} outcomes = outcomes[1:] if mp.all_outcomes else [] if outcomes: zero = math.asarray(0, like=interface) for val in outcomes: results[val] = zero if batch_size is not None: results = tuple(results for _ in range(batch_size)) return results zero_measurement.register(DensityMatrixMP)(_zero_measurement) @zero_measurement.register(StateMP) @zero_measurement.register(ProbabilityMP) def _( mp: Union[StateMP, ProbabilityMP], num_device_wires: int, shots: Optional[int], batch_size, interface, ): num_wires = len(mp.wires) or num_device_wires state = [1.0] + [0.0] * (2**num_wires - 1) if batch_size is not None: state = [state] * batch_size return math.asarray(state, like=interface) def _interface(config: ExecutionConfig): return config.interface.get_like() if config.gradient_method == "backprop" else "numpy" def _simulate_resource_use(circuit, outfile): num_wires = len(circuit.wires) gate_types = defaultdict(int) for op in circuit.operations: controls = 0 adj = False while hasattr(op, "base"): if type(op) in (Controlled, ControlledOp): # Don't check this with `isinstance` to avoid unrolling ops like CNOT controls += len(op.control_wires) elif isinstance(op, Adjoint): adj = not adj else: break # Certain gates have "base" but shouldn't be broken down (like CNOT) # NOTE: Pow, Exp, Add, etc. intentionally not handled to be compatible with Catalyst op = op.base # Barrier is not a gate and takes no resources, so we skip it if op.name == "Barrier": # (this confuses codecov, despite being tested) continue # pragma: no cover name = op.name if adj: name = f"Adj({name})" if controls: name = f"{controls if controls > 1 else ''}C({name})" gate_types[name] += 1 # NOTE: For now, this information is being printed to match the behaviour of catalyst resource tracking. # In the future it may be better to return this information in a more structured way. json.dump( { "num_wires": num_wires, "num_gates": sum(gate_types.values()), "gate_types": gate_types, }, outfile, )
[docs] @simulator_tracking @single_tape_support class NullQubit(Device): """Null qubit device for PennyLane. This device performs no operations involved in numerical calculations. Instead the time spent in execution is dominated by support (or setting up) operations, like tape creation etc. Args: wires (int, Iterable[Number, str]): Number of wires present on the device, or iterable that contains unique labels for the wires as numbers (i.e., ``[-1, 0, 2]``) or strings (``['aux_wire', 'q1', 'q2']``). Default ``None`` if not specified. shots (int, Sequence[int], Sequence[Union[int, Sequence[int]]]): The default number of shots to use in executions involving this device. track_resources (bool): If True, track the number of resources used by the device and save them to a JSON file. The filename will match ``__pennylane_resources_data_*.json`` where the wildcard (asterisk) is replaced by the timestamp of when execution began in nanoseconds since Unix EPOCH. This argument is experimental and subject to change. **Example:** .. code-block:: python qs = qml.tape.QuantumScript( [qml.Hadamard(0), qml.CNOT([0, 1])], [qml.expval(qml.PauliZ(0)), qml.probs()], ) qscripts = [qs, qs, qs] >>> dev = NullQubit() >>> program, execution_config = dev.preprocess() >>> new_batch, post_processing_fn = program(qscripts) >>> results = dev.execute(new_batch, execution_config=execution_config) >>> post_processing_fn(results) ((array(0.), array([1., 0., 0., 0.])), (array(0.), array([1., 0., 0., 0.])), (array(0.), array([1., 0., 0., 0.]))) This device currently supports (trivial) derivatives: >>> from pennylane.devices import ExecutionConfig >>> dev.supports_derivatives(ExecutionConfig(gradient_method="device")) True This device can be used to track resource usage: .. code-block:: python n_layers = 50 n_wires = 100 shape = qml.StronglyEntanglingLayers.shape(n_layers=n_layers, n_wires=n_wires) @qml.qnode(dev) def circuit(params): qml.StronglyEntanglingLayers(params, wires=range(n_wires)) return [qml.expval(qml.Z(i)) for i in range(n_wires)] params = np.random.random(shape) with qml.Tracker(dev) as tracker: circuit(params) >>> tracker.history["resources"][0] num_wires: 100 num_gates: 10000 depth: 502 shots: Shots(total=None) gate_types: {'Rot': 5000, 'CNOT': 5000} gate_sizes: {1: 5000, 2: 5000} .. details:: :title: Tracking ``NullQubit`` tracks: * ``executions``: the number of unique circuits that would be required on quantum hardware * ``shots``: the number of shots * ``resources``: the :class:`~.resource.Resources` for the executed circuit. * ``simulations``: the number of simulations performed. One simulation can cover multiple QPU executions, such as for non-commuting measurements and batched parameters. * ``batches``: The number of times :meth:`~.execute` is called. * ``results``: The results of each call of :meth:`~.execute` * ``derivative_batches``: How many times :meth:`~.compute_derivatives` is called. * ``execute_and_derivative_batches``: How many times :meth:`~.execute_and_compute_derivatives` is called * ``vjp_batches``: How many times :meth:`~.compute_vjp` is called * ``execute_and_vjp_batches``: How many times :meth:`~.execute_and_compute_vjp` is called * ``jvp_batches``: How many times :meth:`~.compute_jvp` is called * ``execute_and_jvp_batches``: How many times :meth:`~.execute_and_compute_jvp` is called * ``derivatives``: How many circuits are submitted to :meth:`~.compute_derivatives` or :meth:`~.execute_and_compute_derivatives`. * ``vjps``: How many circuits are submitted to :meth:`~.compute_vjp` or :meth:`~.execute_and_compute_vjp` * ``jvps``: How many circuits are submitted to :meth:`~.compute_jvp` or :meth:`~.execute_and_compute_jvp` """ @property def name(self): """The name of the device.""" return "null.qubit" def __init__(self, wires=None, shots=None, track_resources=False) -> None: super().__init__(wires=wires, shots=shots) self._debugger = None self._track_resources = track_resources # this is required by Catalyst to toggle the tracker at runtime self.device_kwargs = {"track_resources": track_resources} def _simulate(self, circuit, interface): num_device_wires = len(self.wires) if self.wires else len(circuit.wires) results = [] if self._track_resources: timestamp = int(time.time() * 1e9) # nanoseconds since epoch resources_fname = f"{RESOURCES_FNAME_PREFIX}{timestamp}.json" with open(resources_fname, "x", encoding="utf-8") as f: _simulate_resource_use(circuit, f) for s in circuit.shots or [None]: r = tuple( zero_measurement(mp, num_device_wires, s, circuit.batch_size, interface) for mp in circuit.measurements ) results.append(r[0] if len(circuit.measurements) == 1 else r) if circuit.shots.has_partitioned_shots: return tuple(results) return results[0] def _derivatives(self, circuit, interface): shots = circuit.shots num_device_wires = len(self.wires) if self.wires else len(circuit.wires) n = len(circuit.trainable_params) derivatives = tuple( ( math.zeros_like( zero_measurement(mp, num_device_wires, shots, circuit.batch_size, interface) ), ) * n for mp in circuit.measurements ) if n == 1: derivatives = tuple(d[0] for d in derivatives) return derivatives[0] if len(derivatives) == 1 else derivatives @staticmethod def _vjp(circuit, interface): batch_size = circuit.batch_size n = len(circuit.trainable_params) res_shape = (n,) if batch_size is None else (n, batch_size) return math.zeros(res_shape, like=interface) @staticmethod def _jvp(circuit, interface): jvps = (math.asarray(0.0, like=interface),) * len(circuit.measurements) return jvps[0] if len(jvps) == 1 else jvps @staticmethod def _setup_execution_config(execution_config: ExecutionConfig) -> ExecutionConfig: """No-op function to allow for borrowing DefaultQubit.preprocess without AttributeErrors""" return execution_config @property def _max_workers(self): """No-op property to allow for borrowing DefaultQubit.preprocess without AttributeErrors""" return None # pylint: disable=cell-var-from-loop
[docs] def preprocess( self, execution_config=DefaultExecutionConfig ) -> tuple[TransformProgram, ExecutionConfig]: program = DefaultQubit.preprocess_transforms(self, execution_config) for t in program: if t.transform == decompose.transform: original_stopping_condition = t.kwargs["stopping_condition"] def new_stopping_condition(op): return (not op.has_decomposition) or original_stopping_condition(op) t.kwargs["stopping_condition"] = new_stopping_condition original_shots_stopping_condition = t.kwargs.get("stopping_condition_shots", None) if original_shots_stopping_condition: def new_shots_stopping_condition(op): return (not op.has_decomposition) or original_shots_stopping_condition(op) t.kwargs["stopping_condition_shots"] = new_shots_stopping_condition updated_values = {} if execution_config.gradient_method in ["best", "adjoint"]: updated_values["gradient_method"] = "device" if execution_config.use_device_gradient is None: updated_values["use_device_gradient"] = execution_config.gradient_method in { "best", "device", "adjoint", "backprop", } if execution_config.use_device_jacobian_product is None: updated_values["use_device_jacobian_product"] = ( execution_config.gradient_method == "device" ) if execution_config.grad_on_execution is None: updated_values["grad_on_execution"] = execution_config.gradient_method == "device" return program, replace(execution_config, **updated_values)
[docs] def execute( self, circuits: QuantumScriptOrBatch, execution_config: ExecutionConfig = DefaultExecutionConfig, ) -> Union[Result, ResultBatch]: if logger.isEnabledFor(logging.DEBUG): # pragma: no cover logger.debug( """Entry with args=(circuits=%s) called by=%s""", circuits, "::L".join( str(i) for i in inspect.getouterframes(inspect.currentframe(), 2)[1][1:3] ), ) return tuple(self._simulate(c, _interface(execution_config)) for c in circuits)
[docs] def supports_derivatives(self, execution_config=None, circuit=None): return execution_config is None or execution_config.gradient_method in ( "device", "backprop", "adjoint", )
[docs] def supports_vjp(self, execution_config=None, circuit=None): return execution_config is None or execution_config.gradient_method in ( "device", "backprop", "adjoint", )
[docs] def supports_jvp(self, execution_config=None, circuit=None): return execution_config is None or execution_config.gradient_method in ( "device", "backprop", "adjoint", )
[docs] def compute_derivatives( self, circuits: QuantumScriptOrBatch, execution_config: ExecutionConfig = DefaultExecutionConfig, ): return tuple(self._derivatives(c, _interface(execution_config)) for c in circuits)
[docs] def execute_and_compute_derivatives( self, circuits: QuantumScriptOrBatch, execution_config: ExecutionConfig = DefaultExecutionConfig, ): results = tuple(self._simulate(c, _interface(execution_config)) for c in circuits) jacs = tuple(self._derivatives(c, _interface(execution_config)) for c in circuits) return results, jacs
[docs] def compute_jvp( self, circuits: QuantumScriptOrBatch, tangents: tuple[Number], execution_config: ExecutionConfig = DefaultExecutionConfig, ): return tuple(self._jvp(c, _interface(execution_config)) for c in circuits)
[docs] def execute_and_compute_jvp( self, circuits: QuantumScriptOrBatch, tangents: tuple[Number], execution_config: ExecutionConfig = DefaultExecutionConfig, ): results = tuple(self._simulate(c, _interface(execution_config)) for c in circuits) jvps = tuple(self._jvp(c, _interface(execution_config)) for c in circuits) return results, jvps
[docs] def compute_vjp( self, circuits: QuantumScriptOrBatch, cotangents: tuple[Number], execution_config: ExecutionConfig = DefaultExecutionConfig, ): return tuple(self._vjp(c, _interface(execution_config)) for c in circuits)
[docs] def execute_and_compute_vjp( self, circuits: QuantumScriptOrBatch, cotangents: tuple[Number], execution_config: ExecutionConfig = DefaultExecutionConfig, ): results = tuple(self._simulate(c, _interface(execution_config)) for c in circuits) vjps = tuple(self._vjp(c, _interface(execution_config)) for c in circuits) return results, vjps
# TODO: Remove when PL supports pylint==3.3.6 (it is considered a useless-suppression) [sc-91362] # pylint: disable=unused-argument
[docs] def eval_jaxpr( self, jaxpr: "jax.extend.core.Jaxpr", consts: list, *args, execution_config=None ) -> list: from pennylane.capture.primitives import ( # pylint: disable=import-outside-toplevel AbstractMeasurement, ) def zeros_like(var): if isinstance(var.aval, AbstractMeasurement): shots = self.shots.total_shots s, dtype = var.aval.abstract_eval(num_device_wires=len(self.wires), shots=shots) return math.zeros(s, dtype=dtype, like="jax") return math.zeros(var.aval.shape, dtype=var.aval.dtype, like="jax") return [zeros_like(var) for var in jaxpr.outvars]