Source code for pennylane.labs.resource_estimation.resource_tracking
# Copyright 2025 Xanadu Quantum Technologies Inc.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
r"""Core resource tracking logic."""
from collections import defaultdict
from collections.abc import Callable, Iterable
from functools import singledispatch, wraps
from pennylane.labs.resource_estimation.ops.op_math.symbolic import (
ResourceAdjoint,
ResourceControlled,
ResourcePow,
)
from pennylane.labs.resource_estimation.qubit_manager import AllocWires, FreeWires, QubitManager
from pennylane.labs.resource_estimation.resource_config import ResourceConfig
from pennylane.labs.resource_estimation.resource_mapping import map_to_resource_op
from pennylane.labs.resource_estimation.resource_operator import (
CompressedResourceOp,
GateCount,
ResourceOperator,
)
from pennylane.labs.resource_estimation.resources_base import Resources
from pennylane.operation import Operation
from pennylane.queuing import AnnotatedQueue, QueuingManager
from pennylane.wires import Wires
# pylint: disable=protected-access,too-many-arguments
# user-friendly gateset for visual checks and initial compilation
StandardGateSet = frozenset(
{
"X",
"Y",
"Z",
"Hadamard",
"SWAP",
"CNOT",
"S",
"T",
"Adjoint(S)",
"Adjoint(T)",
"Toffoli",
"RX",
"RY",
"RZ",
"PhaseShift",
}
)
# realistic gateset for useful compilation of circuits
DefaultGateSet = frozenset(
{
"X",
"Y",
"Z",
"Hadamard",
"CNOT",
"S",
"T",
"Toffoli",
}
)
[docs]
def estimate(
obj: ResourceOperator | Callable | Resources | list,
gate_set: set | None = None,
work_wires: int | dict = 0,
tight_budget: bool = False,
config: ResourceConfig | None = None,
) -> Resources | Callable:
r"""Estimate the quantum resources required from a circuit or operation in terms of the gates
provided in the gateset.
Args:
obj (Union[ResourceOperator, Callable, Resources, List]): The quantum circuit or operation
to obtain resources from.
gate_set (Set, optional): A set of names (strings) of the fundamental operations to track
counts for throughout the quantum workflow.
work_wires (int | dict | optional): The number of available zeroed and/or any_state ancilla
qubits. If an integer is provided, it specifies the number of zeroed ancillas. If a
dictionary is provided, it should have the keys ``"zeroed"`` and ``"any_state"``.
Defaults to ``0``.
tight_budget (bool | None): Determines whether extra zeroed state wires can be allocated when they
exceed the available amount. The default is ``False``.
config (ResourceConfig, optional): A ResourceConfig object of additional parameters which sets default values
when they are not specified on the operator.
Returns:
Resources: the quantum resources required to execute the circuit
Raises:
TypeError: could not obtain resources for obj of type :code:`type(obj)`
**Example**
We can track the resources of a quantum workflow by passing the quantum function defining our
workflow directly into this function.
.. code-block:: python
import pennylane.labs.resource_estimation as plre
def my_circuit():
for w in range(2):
plre.ResourceHadamard(wires=w)
plre.ResourceCNOT(wires=[0,1])
plre.ResourceRX(wires=0)
plre.ResourceRY(wires=1)
plre.ResourceQFT(num_wires=3, wires=[0, 1, 2])
return
Note that we are passing a python function NOT a :class:`~.QNode`. The resources for this
workflow are then obtained by:
>>> config = plre.ResourceConfig()
>>> config.set_single_qubit_rot_precision(1e-4)
>>> res = plre.estimate(
... my_circuit,
... gate_set = plre.DefaultGateSet,
... config=config,
... )()
...
>>> print(res)
--- Resources: ---
Total qubits: 3
Total gates : 279
Qubit breakdown:
clean qubits: 0, dirty qubits: 0, algorithmic qubits: 3
Gate breakdown:
{'Hadamard': 5, 'CNOT': 10, 'T': 264}
"""
return _estimate_resources_dispatch(obj, gate_set, work_wires, tight_budget, config)
@singledispatch
def _estimate_resources_dispatch(
obj: ResourceOperator | Callable | Resources | list,
gate_set: set | None = None,
work_wires: int | dict = 0,
tight_budget: bool = False,
config: ResourceConfig | None = None,
) -> Resources | Callable:
"""Internal singledispatch function for resource estimation."""
raise TypeError(
f"Could not obtain resources for obj of type {type(obj)}. obj must be one of Resources, Callable or ResourceOperator"
)
@_estimate_resources_dispatch.register
def _resources_from_qfunc(
obj: Callable,
gate_set: set | None = None,
work_wires=0,
tight_budget=False,
config: ResourceConfig | None = None,
) -> Callable:
"""Get resources from a quantum function which queues operations"""
@wraps(obj)
def wrapper(*args, **kwargs):
with AnnotatedQueue() as q:
obj(*args, **kwargs)
qm = QubitManager(work_wires, tight_budget)
# Get algorithm wires:
num_algo_qubits = 0
circuit_wires = []
for op in q.queue:
if isinstance(op, (ResourceOperator, Operation)):
if op.wires:
circuit_wires.append(op.wires)
else:
num_algo_qubits = max(num_algo_qubits, op.num_wires)
num_algo_qubits += len(Wires.all_wires(circuit_wires))
qm.algo_qubits = num_algo_qubits # set the algorithmic qubits in the qubit manager
# Obtain resources in the gate_set
compressed_res_ops_lst = _ops_to_compressed_reps(q.queue)
gate_counts = defaultdict(int)
for cmp_rep_op in compressed_res_ops_lst:
_update_counts_from_compressed_res_op(
cmp_rep_op, gate_counts, qbit_mngr=qm, gate_set=gate_set, config=config
)
return Resources(qubit_manager=qm, gate_types=gate_counts)
return wrapper
@_estimate_resources_dispatch.register
def _resources_from_resource(
obj: Resources,
gate_set: set | None = None,
work_wires=0,
tight_budget=None,
config: ResourceConfig | None = None,
) -> Resources:
"""Further process resources from a resources object."""
existing_qm = obj.qubit_manager
if work_wires is not None:
if isinstance(work_wires, dict):
clean_wires = work_wires["clean"]
dirty_wires = work_wires["dirty"]
else:
clean_wires = work_wires
dirty_wires = 0
existing_qm._clean_qubit_counts = max(clean_wires, existing_qm._clean_qubit_counts)
existing_qm._dirty_qubit_counts = max(dirty_wires, existing_qm._dirty_qubit_counts)
if tight_budget is not None:
existing_qm.tight_budget = tight_budget
gate_counts = defaultdict(int)
for cmpr_rep_op, count in obj.gate_types.items():
_update_counts_from_compressed_res_op(
cmpr_rep_op,
gate_counts,
qbit_mngr=existing_qm,
gate_set=gate_set,
scalar=count,
config=config,
)
# Update:
return Resources(qubit_manager=existing_qm, gate_types=gate_counts)
@_estimate_resources_dispatch.register
def _resources_from_resource_ops(
obj: ResourceOperator,
gate_set: set | None = None,
work_wires=0,
tight_budget=None,
config: ResourceConfig | None = None,
) -> Resources:
"""Extract resources from a resource operator."""
return _resources_from_resource(
1 * obj,
gate_set,
work_wires,
tight_budget,
config,
)
@_estimate_resources_dispatch.register
def _resources_from_pl_ops(
obj: Operation,
gate_set: set | None = None,
work_wires=0,
tight_budget=None,
config: ResourceConfig | None = None,
) -> Resources:
"""Extract resources from a pl operator."""
obj = map_to_resource_op(obj)
return _resources_from_resource(
1 * obj,
gate_set,
work_wires,
tight_budget,
config,
)
def _update_counts_from_compressed_res_op(
cp_rep: CompressedResourceOp,
gate_counts_dict,
qbit_mngr,
gate_set: set | None = None,
scalar: int = 1,
config: ResourceConfig | None = None,
) -> None:
"""Modifies the `gate_counts_dict` argument by adding the (scaled) resources of the operation provided.
Args:
cp_rep (CompressedResourceOp): operation in compressed representation to extract resources from
gate_counts_dict (Dict): base dictionary to modify with the resource counts
gate_set (Set): the set of operations to track resources with respect to
scalar (int, optional): optional scalar to multiply the counts. Defaults to 1.
config (Dict, optional): additional parameters to specify the resources from an operator. Defaults to resource_config.
"""
if gate_set is None:
gate_set = DefaultGateSet
if config is None:
config = ResourceConfig()
## If op in gate_set add to resources
if cp_rep.name in gate_set:
gate_counts_dict[cp_rep] += scalar
return
## Else decompose cp_rep using its resource decomp [cp_rep --> list[GateCounts]] and extract resources
decomp_func, kwargs = _get_decomposition(cp_rep, config)
params = {key: value for key, value in cp_rep.params.items() if value is not None}
filtered_kwargs = {key: value for key, value in kwargs.items() if key not in params}
resource_decomp = decomp_func(**params, **filtered_kwargs)
qubit_alloc_sum = _sum_allocated_wires(resource_decomp)
for action in resource_decomp:
if isinstance(action, GateCount):
_update_counts_from_compressed_res_op(
action.gate,
gate_counts_dict,
qbit_mngr=qbit_mngr,
scalar=scalar * action.count,
gate_set=gate_set,
config=config,
)
continue
if isinstance(action, AllocWires):
if qubit_alloc_sum != 0 and scalar > 1:
qbit_mngr.grab_clean_qubits(action.num_wires * scalar)
else:
qbit_mngr.grab_clean_qubits(action.num_wires)
if isinstance(action, FreeWires):
if qubit_alloc_sum != 0 and scalar > 1:
qbit_mngr.free_qubits(action.num_wires * scalar)
else:
qbit_mngr.free_qubits(action.num_wires)
return
def _sum_allocated_wires(decomp):
"""Sum together the allocated and released wires in a decomposition."""
s = 0
for action in decomp:
if isinstance(action, AllocWires):
s += action.num_wires
if isinstance(action, FreeWires):
s -= action.num_wires
return s
@QueuingManager.stop_recording()
def _ops_to_compressed_reps(
ops: Iterable[Operation | ResourceOperator],
) -> list[CompressedResourceOp]:
"""Convert the sequence of operations to a list of compressed resource ops.
Args:
ops (Iterable[Union[Operation, ResourceOperator]]): set of operations to convert
Returns:
List[CompressedResourceOp]: set of converted compressed resource ops
"""
cmp_rep_ops = []
for op in ops: # We are skipping measurement processes here.
if isinstance(op, ResourceOperator):
cmp_rep_ops.append(op.resource_rep_from_op())
if isinstance(op, Operation): # map: op --> res_op, then: res_op --> cmprsd_res_op
cmp_rep_ops.append(map_to_resource_op(op).resource_rep_from_op())
return cmp_rep_ops
def _get_decomposition(
cp_rep: CompressedResourceOp, config: ResourceConfig
) -> tuple[Callable, dict]:
"""
Selects the appropriate decomposition function and kwargs from a config object.
This helper function centralizes the logic for choosing a decomposition,
handling standard, custom, and symbolic operator rules using a mapping.
Args:
cp_rep (CompressedResourceOp): The operator to find the decomposition for.
config (ResourceConfig): The configuration object containing decomposition rules.
Returns:
A tuple containing the decomposition function and its associated kwargs.
"""
op_type = cp_rep.op_type
_SYMBOLIC_DECOMP_MAP = {
ResourceAdjoint: "_adj_custom_decomps",
ResourceControlled: "_ctrl_custom_decomps",
ResourcePow: "_pow_custom_decomps",
}
if op_type in _SYMBOLIC_DECOMP_MAP:
decomp_attr_name = _SYMBOLIC_DECOMP_MAP[op_type]
custom_decomp_dict = getattr(config, decomp_attr_name)
base_op_type = cp_rep.params["base_cmpr_op"].op_type
kwargs = config.resource_op_precisions.get(base_op_type, {})
decomp_func = custom_decomp_dict.get(base_op_type, op_type.resource_decomp)
else:
kwargs = config.resource_op_precisions.get(op_type, {})
decomp_func = config._custom_decomps.get(op_type, op_type.resource_decomp)
return decomp_func, kwargs
_modules/pennylane/labs/resource_estimation/resource_tracking
Download Python script
Download Notebook
View on GitHub