Source code for pennylane.io.qualtran_io
# Copyright 2018-2025 Xanadu Quantum Technologies Inc.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
This submodule contains the adapter class for Qualtran-PennyLane interoperability.
"""
# TODO: Remove when PL supports pylint==3.3.6 (it is considered a useless-suppression) [sc-91362]
# pylint: disable=unused-argument
from collections import defaultdict
from functools import cached_property, singledispatch, wraps
from typing import Dict, List, Tuple
import numpy as np
import pennylane.measurements as qmeas
import pennylane.ops as qops
import pennylane.templates as qtemps
from pennylane import math
from pennylane.operation import (
DecompositionUndefinedError,
MatrixUndefinedError,
Operation,
Operator,
)
from pennylane.queuing import AnnotatedQueue, QueuingManager
from pennylane.registers import registers
from pennylane.tape import make_qscript
from pennylane.templates.state_preparations.superposition import _assign_states
from pennylane.wires import WiresLike
from pennylane.workflow import construct_tape
from pennylane.workflow.qnode import QNode
try:
import cirq
import qualtran as qt
from qualtran import Bloq, CtrlSpec
from qualtran._infra.gate_with_registers import split_qubits
from qualtran.bloqs import basic_gates as qt_gates
qualtran = True
except (ModuleNotFoundError, ImportError) as import_error:
qualtran = False
Bloq = object
@singledispatch
def _get_op_call_graph(op):
"""Return call graph for PennyLane Operator. If the call graph is not implemented,
return ``None``, which means we will build the call graph via decomposition"""
# TODO: Integrate with resource operators and the new decomposition pipelines
return None
@_get_op_call_graph.register
def _(op: qtemps.subroutines.qpe.QuantumPhaseEstimation):
"""Call graph for Quantum Phase Estimation"""
# From ResourceQFT
gate_counts = defaultdict(int, {})
gate_counts[qt_gates.Hadamard()] = len(op.estimation_wires)
controlled_unitary = _map_to_bloq(op.hyperparameters["unitary"]).controlled(CtrlSpec(cvs=[1]))
gate_counts[controlled_unitary] = (2 ** len(op.estimation_wires)) - 1
adjoint_qft = _map_to_bloq(qtemps.QFT(wires=op.estimation_wires), map_ops=False).adjoint()
gate_counts[adjoint_qft] = 1
return gate_counts
@_get_op_call_graph.register
def _(op: qtemps.subroutines.TrotterizedQfunc):
"""Call graph for qml.trotterize"""
# From ResourceTrotterizedQfunc
n = op.hyperparameters["n"]
order = op.hyperparameters["order"]
k = order // 2
qfunc = op.hyperparameters["qfunc"]
qfunc_args = op.parameters[1:]
base_hyper_params = ("n", "order", "qfunc", "reverse")
with QueuingManager.stop_recording():
with AnnotatedQueue() as q:
qfunc_args = op.parameters
qfunc_kwargs = {
k: v for k, v in op.hyperparameters.items() if not k in base_hyper_params
}
qfunc = op.hyperparameters["qfunc"]
qfunc(*qfunc_args, wires=op.wires, **qfunc_kwargs)
call_graph = defaultdict(int, {})
if order == 1:
for q_op in q.queue:
call_graph[_map_to_bloq(q_op)] += 1
return call_graph
num_gates = 2 * n * (5 ** (k - 1))
for q_op in q.queue:
call_graph[_map_to_bloq(q_op)] += num_gates
return call_graph
@_get_op_call_graph.register
def _(op: qtemps.state_preparations.Superposition):
"""Call graph for Superposition"""
# From ResourceSuperposition
gate_types = defaultdict(int, {})
wires = op.wires
coeffs = op.coeffs
bases = op.hyperparameters["bases"]
num_basis_states = len(bases)
size_basis_state = len(bases[0]) # assuming they are all the same size
dic_state = dict(zip(bases, coeffs))
perms = _assign_states(bases)
new_dic_state = {perms[key]: val for key, val in dic_state.items() if key in perms}
sorted_coefficients = [
value
for _, value in sorted(
new_dic_state.items(), key=lambda item: int("".join(map(str, item[0])), 2)
)
]
msp = qops.StatePrep(
math.stack(sorted_coefficients),
wires=wires[-int(math.ceil(math.log2(len(coeffs)))) :],
pad_with=0,
)
gate_types[_map_to_bloq(msp)] = 1
cnot = qt_gates.CNOT()
num_zero_ctrls = size_basis_state // 2
control_values = [1] * num_zero_ctrls + [0] * (size_basis_state - num_zero_ctrls)
multi_x = _map_to_bloq(
qops.MultiControlledX(wires=range(size_basis_state + 1), control_values=control_values)
)
basis_size = 2**size_basis_state
prob_matching_basis_states = num_basis_states / basis_size
num_permutes = round(num_basis_states * (1 - prob_matching_basis_states))
if num_permutes:
gate_types[cnot] = num_permutes * (size_basis_state // 2) # average number of bits to flip
gate_types[multi_x] = 2 * num_permutes # for compute and uncompute
return gate_types
@_get_op_call_graph.register
def _(op: qtemps.state_preparations.QROMStatePreparation):
"""Call graph for QROMStatePreparation"""
# From ResourceQROMStatePreparation
def _add_qrom_and_adjoint(gate_types, bitstrings, control_wires):
"""Helper to create a QROM, count it and its adjoint."""
qrom_op = qtemps.QROM(
bitstrings=bitstrings,
target_wires=precision_wires,
control_wires=control_wires,
work_wires=work_wires,
clean=False,
)
gate_types[_map_to_bloq(qrom_op)] += 1
gate_types[_map_to_bloq(qops.adjoint(qrom_op))] += 1
gate_types = defaultdict(int, {})
positive_and_real = not any(c.imag != 0 or c.real < 0 for c in op.state_vector)
num_state_qubits = int(math.log2(len(op.state_vector)))
precision_wires = op.hyperparameters["precision_wires"]
input_wires = op.hyperparameters["input_wires"]
work_wires = op.hyperparameters["work_wires"]
num_precision_wires = len(precision_wires)
# Use helper for the first QROM
_add_qrom_and_adjoint(
gate_types, bitstrings=["0" * (num_precision_wires - 1) + "1"], control_wires=[]
)
zero_string = "0" * num_precision_wires
one_string = "0" * (num_precision_wires - 1) + "1" if num_precision_wires > 0 else ""
# Use helper inside the main loop
for i in range(1, num_state_qubits):
num_bit_flips = 2 ** (i - 1)
bitstrings = [zero_string] * num_bit_flips + [one_string] * num_bit_flips
_add_qrom_and_adjoint(gate_types, bitstrings, control_wires=input_wires[:i])
gate_types[_map_to_bloq(qops.CRY(0, wires=[0, 1]))] = num_precision_wires * num_state_qubits
# Use helper for the final conditional QROM
if not positive_and_real:
num_bit_flips = 2 ** (num_state_qubits - 1)
bitstrings = [zero_string] * num_bit_flips + [one_string] * num_bit_flips
_add_qrom_and_adjoint(gate_types, bitstrings, control_wires=input_wires)
gate_types[
_map_to_bloq(
qops.ctrl(
qops.GlobalPhase((2 * np.pi), wires=input_wires[0]),
control=0,
)
)
] = num_precision_wires
return gate_types
@_get_op_call_graph.register
def _(op: qops.BasisState):
"""Call graph for Basis State"""
gate_types = defaultdict(int, {})
gate_types[qt_gates.XGate()] = sum(op.parameters[0])
return gate_types
@_get_op_call_graph.register
def _(op: qtemps.subroutines.QROM):
"""Call graph for QROM"""
# From ResourceQROM
gate_types = defaultdict(int, {})
bitstrings = op.hyperparameters["bitstrings"]
num_bitstrings = len(bitstrings)
num_bit_flips = sum(bits.count("1") for bits in bitstrings)
num_work_wires = len(op.hyperparameters["work_wires"])
size_bitstring = len(op.hyperparameters["target_wires"])
num_control_wires = len(op.hyperparameters["control_wires"])
clean = op.hyperparameters["clean"]
if num_control_wires == 0:
gate_types[qt_gates.XGate()] = num_bit_flips
return gate_types
cnot = qt_gates.CNOT()
hadamard = qt_gates.Hadamard()
num_parallel_computations = (num_work_wires + size_bitstring) // size_bitstring
square_fact = math.floor(math.sqrt(num_bitstrings)) # use a square scheme for rows and cloumns
num_parallel_computations = min(num_parallel_computations, square_fact)
num_swap_wires = math.floor(math.log2(num_parallel_computations))
num_select_wires = math.ceil(math.log2(math.ceil(num_bitstrings / (2**num_swap_wires))))
swap_work_wires = (int(2**num_swap_wires) - 1) * size_bitstring
free_work_wires = num_work_wires - swap_work_wires
swap_clean_prefactor = 1
select_clean_prefactor = 1
if clean:
gate_types[hadamard] = 2 * size_bitstring
swap_clean_prefactor = 4
select_clean_prefactor = 2
# SELECT cost:
gate_types[cnot] = num_bit_flips # each unitary in the select is just a CNOT
num_select_wires = int(num_select_wires)
multi_x = _map_to_bloq(
qops.MultiControlledX(
wires=range(num_select_wires + 1),
control_values=[True] * num_select_wires,
work_wires=range(num_select_wires + 1, num_select_wires + 1 + free_work_wires),
)
)
num_total_ctrl_possibilities = 2**num_select_wires
gate_types[multi_x] = select_clean_prefactor * (
2 * num_total_ctrl_possibilities # two applications targetting the aux qubit
)
num_zero_controls = (2 * num_total_ctrl_possibilities * num_select_wires) // 2
gate_types[qt_gates.XGate()] = select_clean_prefactor * (
num_zero_controls * 2 # conjugate 0 controls on the multi-qubit x gates from above
)
# SWAP cost:
ctrl_swap = qt_gates.TwoBitCSwap()
gate_types[ctrl_swap] = swap_clean_prefactor * ((2**num_swap_wires) - 1) * size_bitstring
return gate_types
@_get_op_call_graph.register
def _(op: qtemps.subroutines.QFT):
"""Call graph for Quantum Fourier Transform"""
# From PL Decomposition
gate_types = defaultdict(int, {})
num_wires = len(op.wires)
gate_types[qt_gates.Hadamard()] = num_wires
gate_types[_map_to_bloq(qops.ControlledPhaseShift(1, [0, 1]))] = (
num_wires * (num_wires - 1) // 2
)
gate_types[qt_gates.TwoBitSwap()] = num_wires // 2
return gate_types
@_get_op_call_graph.register
def _(op: qtemps.subroutines.QSVT):
"""Call graph for Quantum Singular Value Transform"""
# From ResouceQSVT
gate_types = defaultdict(int, {})
UA = op.hyperparameters["UA"]
projectors = op.hyperparameters["projectors"]
num_projectors = len(projectors)
for proj_op in projectors[:-1]:
gate_types[_map_to_bloq(proj_op)] += 1
gate_types[_map_to_bloq(UA)] += num_projectors // 2
gate_types[_map_to_bloq(UA).adjoint()] += (num_projectors - 1) // 2
gate_types[_map_to_bloq(projectors[-1])] += 1
return gate_types
@_get_op_call_graph.register
def _(op: qtemps.subroutines.Select):
"""Call graph for Select"""
# From ResourceSelect
gate_types = defaultdict(int, {})
ops = op.hyperparameters["ops"]
cmpr_ops = [_map_to_bloq(op) for op in ops]
x = qt_gates.XGate()
num_ops = len(cmpr_ops)
num_ctrl_wires = int(np.ceil(np.log2(num_ops)))
num_total_ctrl_possibilities = 2**num_ctrl_wires # 2^n
num_zero_controls = num_total_ctrl_possibilities // 2
gate_types[x] = num_zero_controls * 2 # conjugate 0 controls
for cmp_rep in cmpr_ops:
ctrl_op = cmp_rep.controlled(CtrlSpec(cvs=[1] * num_ctrl_wires))
if cmp_rep == qt_gates.XGate() and num_ctrl_wires == 1:
ctrl_op = qt_gates.CNOT()
gate_types[ctrl_op] += 1
return gate_types
@_get_op_call_graph.register
def _(op: qops.StatePrep):
"""Call graph for StatePrep"""
# MottonenStatePrep
gate_types = defaultdict(int, {})
num_wires = len(op.wires)
rz = qt_gates.Rz(0)
cnot = qt_gates.CNOT()
r_count = 2 ** (num_wires + 2) - 5
cnot_count = 2 ** (num_wires + 2) - 4 * num_wires - 4
if r_count:
gate_types[rz] = r_count
if cnot_count:
gate_types[cnot] = cnot_count
return gate_types
@_get_op_call_graph.register
def _(op: qtemps.subroutines.ModExp):
"""Call graph for ModExp"""
# From ResourceModExp
mod = op.hyperparameters["mod"]
num_work_wires = len(op.hyperparameters["work_wires"])
num_x_wires = len(op.hyperparameters["x_wires"])
mult_resources = {}
if mod == 2**num_x_wires:
num_aux_wires = num_x_wires
num_aux_swap = num_x_wires
else:
num_aux_wires = num_work_wires - 1
num_aux_swap = num_aux_wires - 1
qft = _map_to_bloq(qtemps.QFT(wires=range(num_aux_wires)), map_ops=False)
qft_dag = qft.adjoint()
sequence = _map_to_bloq(
qtemps.ControlledSequence(
qtemps.PhaseAdder(k=3, x_wires=range(1, num_x_wires + 1)), control=[0]
)
)
sequence_dag = sequence.adjoint()
cnot = qt_gates.CNOT()
mult_resources = {}
mult_resources[qft] = 2
mult_resources[qft_dag] = 2
mult_resources[sequence] = 1
mult_resources[sequence_dag] = 1
mult_resources[cnot] = min(num_x_wires, num_aux_swap)
gate_types = defaultdict(int, {})
ctrl_spec = CtrlSpec(cvs=[1])
for comp_rep in mult_resources:
new_rep = comp_rep.controlled(ctrl_spec)
if comp_rep == qt_gates.CNOT():
new_rep = qt_gates.Toffoli()
# cancel out QFTs from consecutive Multipliers
if hasattr(comp_rep, "op"):
if comp_rep.op.name == "QFT":
gate_types[new_rep] = 1
elif hasattr(comp_rep, "subbloq"):
if comp_rep.subbloq.op.name == "QFT":
gate_types[new_rep] = 1
else:
gate_types[new_rep] = mult_resources[comp_rep] * ((2**num_x_wires) - 1)
return gate_types
@singledispatch
def _map_to_bloq(op, map_ops=True, custom_mapping=None, **kwargs):
"""Map PennyLane operators to Qualtran Bloqs. Operators with direct equivalents are directly
mapped to their Qualtran equivalent even if ``map_ops`` is set to ``False``. Other operators are
given a smart default mapping. When given a ``custom_mapping``, the custom mapping is used."""
if not isinstance(op, Operator):
return ToBloq(op, map_ops=map_ops, custom_mapping=custom_mapping, **kwargs)
if custom_mapping is not None:
return custom_mapping[op]
return ToBloq(op, map_ops=map_ops, **kwargs)
def _handle_custom_map(op, map_ops, custom_mapping, **kwargs):
"""
Handles the custom mapping and wrapping logic
Args:
op (Operation): a PennyLane operator to be converted to a Qualtran Bloq.
map_ops (bool): Whether to map operations to a Qualtran Bloq. Operations are wrapped
as a ``ToBloq`` when False. Default is True.
custom_mapping (dict): Dictionary to specify a mapping between a PennyLane operator and a
Qualtran Bloq. Default is None.
Returns:
Optional[`Bloq`]: A ``ToBloq`` or the ``Bloq`` defined in the custom mapping. ``None`` if
map_ops is True but no custom mapping is found.
"""
if not map_ops:
return ToBloq(op, **kwargs)
if custom_mapping is not None and op in custom_mapping:
return custom_mapping[op]
return None
# pylint: disable=import-outside-toplevel
@_map_to_bloq.register
def _(
op: qtemps.subroutines.qpe.QuantumPhaseEstimation,
map_ops=True,
custom_mapping=None,
**kwargs,
):
from qualtran.bloqs.phase_estimation import RectangularWindowState
from qualtran.bloqs.phase_estimation.text_book_qpe import TextbookQPE
mapped_op = _handle_custom_map(op, map_ops, custom_mapping, **kwargs)
if mapped_op is not None:
return mapped_op
return TextbookQPE(
unitary=_map_to_bloq(op.hyperparameters["unitary"]),
ctrl_state_prep=RectangularWindowState(len(op.hyperparameters["estimation_wires"])),
)
# pylint: disable=import-outside-toplevel
@_map_to_bloq.register
def _(op: qtemps.subroutines.QFT, custom_mapping=None, map_ops=True, **kwargs):
"""Mapping for QFT, which maps to ``qt.QFTTextBook`` by default"""
from qualtran.bloqs.qft import QFTTextBook
mapped_op = _handle_custom_map(op, map_ops, custom_mapping, **kwargs)
if mapped_op is not None:
return mapped_op
return QFTTextBook(len(op.wires))
# pylint: disable=import-outside-toplevel
@_map_to_bloq.register
def _(op: qtemps.subroutines.QROM, map_ops=True, custom_mapping=None, **kwargs):
"""Mapping for QROM that defaults to either ``QROAMClean`` or ``SelectSwapQROM``"""
from qualtran.bloqs.data_loading.qroam_clean import QROAMClean
from qualtran.bloqs.data_loading.select_swap_qrom import SelectSwapQROM
mapped_op = _handle_custom_map(op, map_ops, custom_mapping, **kwargs)
if mapped_op is not None:
return mapped_op
data = np.array([int(b, 2) for b in op.bitstrings])
if op.clean:
return QROAMClean.build_from_data(data)
return SelectSwapQROM.build_from_data(data)
# pylint: disable=import-outside-toplevel
@_map_to_bloq.register
def _(op: qtemps.subroutines.ModExp, map_ops=True, custom_mapping=None, **kwargs):
"""Mapping for ``ModExp``"""
from qualtran.bloqs.cryptography.rsa import ModExp
mapped_op = _handle_custom_map(op, map_ops, custom_mapping, **kwargs)
if mapped_op is not None:
return mapped_op
return ModExp(
base=op.hyperparameters["base"],
mod=op.hyperparameters["mod"],
exp_bitsize=len(op.hyperparameters["x_wires"]),
x_bitsize=len(op.hyperparameters["output_wires"]),
)
def _disable_custom_mapping(func):
"""Decorator to disable custom mapping and raise error for atomic gates."""
@wraps(func)
def wrapper(op, **kwargs):
if kwargs.get("custom_mapping") and op in kwargs.get("custom_mapping", {}):
raise ValueError(
"Custom mappings are not possible for basic operations. We suggest replacing basic operations "
"such as X, Y, or other gates with direct Qualtran equivalents, before or after mapping to Qualtran."
)
return func(op, **kwargs)
return wrapper
@_map_to_bloq.register
@_disable_custom_mapping
def _(op: qops.GlobalPhase, **kwargs):
return qt_gates.GlobalPhase(exponent=op.data[0] / np.pi)
@_map_to_bloq.register
@_disable_custom_mapping
def _(op: qops.Hadamard, **kwargs):
return qt_gates.Hadamard()
@_map_to_bloq.register
@_disable_custom_mapping
def _(op: qops.Identity, **kwargs):
return qt_gates.Identity()
@_map_to_bloq.register
@_disable_custom_mapping
def _(op: qops.RX, **kwargs):
return qt_gates.Rx(angle=float(op.data[0]))
@_map_to_bloq.register
@_disable_custom_mapping
def _(op: qops.RY, **kwargs):
return qt_gates.Ry(angle=float(op.data[0]))
@_map_to_bloq.register
@_disable_custom_mapping
def _(op: qops.RZ, **kwargs):
return qt_gates.Rz(angle=float(op.data[0]))
@_map_to_bloq.register
@_disable_custom_mapping
def _(op: qops.S, **kwargs):
return qt_gates.SGate()
@_map_to_bloq.register
@_disable_custom_mapping
def _(op: qops.SWAP, **kwargs):
return qt_gates.TwoBitSwap()
@_map_to_bloq.register
@_disable_custom_mapping
def _(op: qops.CSWAP, **kwargs):
return qt_gates.TwoBitCSwap()
@_map_to_bloq.register
@_disable_custom_mapping
def _(op: qops.T, **kwargs):
return qt_gates.TGate()
@_map_to_bloq.register
@_disable_custom_mapping
def _(op: qops.X, **kwargs):
return qt_gates.XGate()
@_map_to_bloq.register
@_disable_custom_mapping
def _(op: qops.Y, **kwargs):
return qt_gates.YGate()
@_map_to_bloq.register
@_disable_custom_mapping
def _(op: qops.CY, **kwargs):
return qt_gates.CYGate()
@_map_to_bloq.register
@_disable_custom_mapping
def _(op: qops.Z, **kwargs):
return qt_gates.ZGate()
@_map_to_bloq.register
@_disable_custom_mapping
def _(op: qops.CZ, **kwargs):
return qt_gates.CZ()
@_map_to_bloq.register
def _(op: qops.Adjoint, map_ops=True, custom_mapping=None, **kwargs):
return _map_to_bloq(op.base, custom_mapping=custom_mapping, map_ops=map_ops, **kwargs).adjoint()
@_map_to_bloq.register
def _(op: qops.Controlled, map_ops=True, custom_mapping=None, **kwargs):
if isinstance(op, qops.CNOT):
return qt_gates.CNOT()
ctrl_spec = CtrlSpec(cvs=[int(v) for v in op.control_values])
return _map_to_bloq(
op.base, map_ops=map_ops, custom_mapping=custom_mapping, **kwargs
).controlled(ctrl_spec)
@_map_to_bloq.register
@_disable_custom_mapping
def _(op: qmeas.MeasurementProcess, **kwargs):
return None
def _get_to_pl_op():
@singledispatch
def _to_pl_op(bloq, wires):
return FromBloq(bloq=bloq, wires=wires)
@_to_pl_op.register
def _(bloq: qt.bloqs.basic_gates.CNOT, wires):
return qops.CNOT(wires=wires)
@_to_pl_op.register
def _(bloq: qt.bloqs.basic_gates.GlobalPhase, wires):
return qops.GlobalPhase(bloq.exponent * np.pi, wires)
@_to_pl_op.register
def _(bloq: qt.bloqs.basic_gates.Hadamard, wires):
return qops.Hadamard(wires)
@_to_pl_op.register
def _(bloq: qt.bloqs.basic_gates.Identity, wires):
return qops.Identity(wires)
@_to_pl_op.register
def _(bloq: qt.bloqs.basic_gates.Rx, wires):
return qops.RX(bloq.angle, wires)
@_to_pl_op.register
def _(bloq: qt.bloqs.basic_gates.Ry, wires):
return qops.RY(bloq.angle, wires)
@_to_pl_op.register
def _(bloq: qt.bloqs.basic_gates.Rz, wires):
return qops.RZ(bloq.angle, wires)
@_to_pl_op.register
def _(bloq: qt.bloqs.basic_gates.SGate, wires):
return qops.adjoint(qops.S(wires)) if bloq.is_adjoint else qops.S(wires)
@_to_pl_op.register
def _(bloq: qt.bloqs.basic_gates.TwoBitSwap, wires):
return qops.SWAP(wires)
@_to_pl_op.register
def _(bloq: qt.bloqs.basic_gates.TwoBitCSwap, wires):
return qops.CSWAP(wires)
@_to_pl_op.register
def _(bloq: qt.bloqs.basic_gates.TGate, wires):
return qops.adjoint(qops.T(wires)) if bloq.is_adjoint else qops.T(wires)
@_to_pl_op.register
def _(bloq: qt.bloqs.basic_gates.Toffoli, wires):
return qops.Toffoli(wires)
@_to_pl_op.register
def _(bloq: qt.bloqs.basic_gates.XGate, wires):
return qops.X(wires)
@_to_pl_op.register
def _(bloq: qt.bloqs.basic_gates.YGate, wires):
return qops.Y(wires)
@_to_pl_op.register
def _(bloq: qt.bloqs.basic_gates.CYGate, wires):
return qops.CY(wires)
@_to_pl_op.register
def _(bloq: qt.bloqs.basic_gates.ZGate, wires):
return qops.Z(wires)
@_to_pl_op.register
def _(bloq: qt.bloqs.basic_gates.CZ, wires):
return qops.CZ(wires)
@_to_pl_op.register(qt.bloqs.bookkeeping.Allocate)
@_to_pl_op.register(qt.bloqs.bookkeeping.Cast)
@_to_pl_op.register(qt.bloqs.bookkeeping.Free)
@_to_pl_op.register(qt.bloqs.bookkeeping.Join)
@_to_pl_op.register(qt.bloqs.bookkeeping.Partition)
@_to_pl_op.register(qt.bloqs.bookkeeping.Split)
def _(bloq, wires):
return None
return _to_pl_op
[docs]
def bloq_registers(bloq: "qt.Bloq"):
"""Reads a `Qualtran Bloq <https://qualtran.readthedocs.io/en/latest/bloqs/index.html#bloqs-library>`_
signature and returns a dictionary mapping the Bloq's register names to :class:`~.Wires`.
.. note::
This function requires the latest version of Qualtran. We recommend installing the latest
release via ``pip``:
.. code-block:: console
pip install qualtran
The keys of the returned dictionary are the register names in the Qualtran Bloq. The
values are :class:`~.Wires` objects with a length equal to the bitsize of its respective
register. The wires are indexed in ascending order, starting from 0.
This function makes it easy to access the wires that a Bloq acts on and use them to precisely
control how gates connect.
Args:
bloq (Bloq): an initialized Qualtran ``Bloq`` to be wrapped as a PennyLane operator
Returns:
dict: A dictionary mapping the names of the Bloq's registers to :class:`~.Wires`
objects with the same lengths as the bitsizes of their respective registers.
Raises:
TypeError: bloq must be an instance of ``Bloq``.
**Example**
This example shows how to find the estimation wires of a textbook Quantum Phase Estimation Bloq.
>>> from qualtran.bloqs.phase_estimation import RectangularWindowState, TextbookQPE
>>> from qualtran.bloqs.basic_gates import ZPowGate
>>> textbook_qpe_small = TextbookQPE(ZPowGate(exponent=2 * 0.234), RectangularWindowState(3))
>>> qml.bloq_registers(textbook_qpe_small)
{'q': Wires([0]), 'qpe_reg': Wires([1, 2, 3])}
"""
if not isinstance(bloq, qt.Bloq):
raise TypeError(f"bloq must be an instance of {qt.Bloq}.")
wire_register_dict = defaultdict()
for reg in bloq.signature.lefts():
wire_register_dict[reg.name] = reg.bitsize
for reg in bloq.signature.rights():
wire_register_dict[reg.name] = reg.bitsize
return registers(wire_register_dict)
def _get_named_registers(regs):
"""Returns a ``qml.registers`` object associated with the named registers in the bloq"""
temp_register_dict = {reg.name: reg.total_bits() for reg in regs}
return registers(temp_register_dict)
def _preprocess_bloq(bloq):
"""Processes a bloq's information to prepare for decomposition"""
# Bloqs need to be decomposed in order to access the connections
cbloq = bloq.decompose_bloq() if not isinstance(bloq, qt.CompositeBloq) else bloq
temp_registers = _get_named_registers(cbloq.signature.lefts())
soq_to_wires = {
qt.Soquet(qt.LeftDangle, idx=idx, reg=reg): (
list(temp_registers[reg.name])[idx[0]]
if len(idx) == 1
else list(temp_registers[reg.name])
)
for reg in cbloq.signature.lefts()
for idx in reg.all_idxs()
}
# This is to track the number of wires defined at the LeftDangle stage
# so if we need to add more wires, we know what index to start at
soq_to_wires_len = 0
if len(soq_to_wires.values()) > 0:
soq_to_wires_len = list(soq_to_wires.values())[-1]
if not isinstance(soq_to_wires_len, int):
soq_to_wires_len = list(soq_to_wires.values())[-1][-1]
soq_to_wires_len += 1
return cbloq, soq_to_wires, soq_to_wires_len
[docs]
class FromBloq(Operation):
r"""An adapter for using a `Qualtran Bloq <https://qualtran.readthedocs.io/en/latest/bloqs/index.html#bloqs-library>`__
as a PennyLane :class:`~.Operation`.
.. note::
This class requires the latest version of Qualtran. We recommend installing the latest
release via ``pip``:
.. code-block:: console
pip install qualtran
Args:
bloq (qualtran.Bloq): an initialized Qualtran ``Bloq`` to be wrapped as a PennyLane operator
wires (WiresLike): The wires the operator acts on. The number of wires can be determined by using the
signature of the ``Bloq`` using ``bloq.signature.n_qubits()``.
Raises:
TypeError: bloq must be an instance of ``Bloq``.
**Example**
This example shows how to use ``qml.FromBloq``:
>>> from qualtran.bloqs.basic_gates import CNOT
>>> qualtran_cnot = qml.FromBloq(CNOT(), wires=[0, 1])
>>> qualtran_cnot.matrix()
array([[1.+0.j, 0.+0.j, 0.+0.j, 0.+0.j],
[0.+0.j, 1.+0.j, 0.+0.j, 0.+0.j],
[0.+0.j, 0.+0.j, 0.+0.j, 1.+0.j],
[0.+0.j, 0.+0.j, 1.+0.j, 0.+0.j]])
This example shows how to use ``qml.FromBloq`` inside a device:
>>> from qualtran.bloqs.basic_gates import CNOT
>>> dev = qml.device("default.qubit") # Execute on device
>>> @qml.qnode(dev)
... def circuit():
... qml.FromBloq(CNOT(), wires=[0, 1])
... return qml.state()
>>> circuit()
array([1.+0.j, 0.+0.j, 0.+0.j, 0.+0.j])
.. details::
:title: Advanced Example
This example shows how to use ``qml.FromBloq`` to implement a textbook Quantum Phase Estimation Bloq inside a device:
.. code-block::
from qualtran.bloqs.phase_estimation import RectangularWindowState, TextbookQPE
from qualtran.bloqs.chemistry.trotter.ising import IsingXUnitary, IsingZZUnitary
from qualtran.bloqs.chemistry.trotter.trotterized_unitary import TrotterizedUnitary
# Parameters for the TrotterizedUnitary
nsites = 5
j_zz, gamma_x = 2, 0.1
zz_bloq = IsingZZUnitary(nsites=nsites, angle=0.02 * j_zz)
x_bloq = IsingXUnitary(nsites=nsites, angle=0.01 * gamma_x)
trott_unitary = TrotterizedUnitary(
bloqs=(x_bloq, zz_bloq), timestep=0.01,
indices=(0, 1, 0), coeffs=(0.5 * gamma_x, j_zz, 0.5 * gamma_x)
)
# Instantiate the TextbookQPE and pass in the unitary
textbook_qpe = TextbookQPE(trott_unitary, RectangularWindowState(3))
# Execute on device
dev = qml.device("default.qubit")
@qml.qnode(dev)
def circuit():
qml.FromBloq(textbook_qpe, wires=range(textbook_qpe.signature.n_qubits()))
return qml.probs(wires=[5, 6, 7])
circuit()
.. details::
:title: Usage Details
The decomposition of a ``Bloq`` wrapped in ``qml.FromBloq`` may use more wires than expected.
For example, when we wrap Qualtran's ``CZPowGate``, we get
>>> from qualtran.bloqs.basic_gates import CZPowGate
>>> qml.FromBloq(CZPowGate(0.468, eps=1e-11), wires=[0, 1]).decomposition()
[FromBloq(And, wires=Wires([0, 1, 'alloc_free_2'])),
FromBloq(Z**0.468, wires=Wires(['alloc_free_2'])),
FromBloq(And†, wires=Wires([0, 1, 'alloc_free_2']))]
This behaviour results from the decomposition of ``CZPowGate`` as defined in Qualtran,
which allocates and frees a wire in the same ``bloq``. In this situation,
PennyLane automatically allocates this wire under the hood, and that additional wire is
named ``alloc_free_{idx}``. The indexing starts at the length of the wires defined in the
signature, which in the case of ``CZPowGate`` is :math:`2`. Due to the current
limitations of PennyLane, these wires cannot be accessed manually or mapped.
"""
def __init__(self, bloq, wires: WiresLike):
if not isinstance(bloq, qt.Bloq):
raise TypeError(f"bloq must be an instance of {qt.Bloq}.")
self._hyperparameters = {"bloq": bloq}
super().__init__(wires=wires, id=None)
def __repr__(self):
return f'FromBloq({self.hyperparameters["bloq"]}, wires={self.wires})'
[docs]
@staticmethod
def compute_decomposition(wires, bloq): # pylint: disable=arguments-differ, too-many-branches
ops = []
if len(wires) != bloq.signature.n_qubits():
raise ValueError(
f"The length of wires must match the signature of {qt.Bloq}. Please provide a list of wires of length {bloq.signature.n_qubits()}"
)
try:
cbloq, soq_to_wires, soq_to_wires_len = _preprocess_bloq(bloq)
for binst, pred_cxns, succ_cxns in cbloq.iter_bloqnections():
if isinstance(binst.bloq, qt.bloqs.bookkeeping.Partition):
in_quregs = {}
for succ in succ_cxns:
soq = succ.left
if soq.reg.side == qt.Side.RIGHT and not soq.reg.name in in_quregs:
soq_to_wires_len -= np.prod(soq.reg.shape) * soq.reg.bitsize
for succ in succ_cxns:
soq = succ.left
if soq.reg.side == qt.Side.RIGHT and not soq.reg.name in in_quregs:
total_elements = np.prod(soq.reg.shape) * soq.reg.bitsize
ascending_vals = np.arange(
soq_to_wires_len,
soq_to_wires_len + total_elements,
dtype=object,
)
soq_to_wires_len += total_elements
in_quregs[soq.reg.name] = ascending_vals.reshape(
(*soq.reg.shape, soq.reg.bitsize)
)
soq_to_wires[soq] = in_quregs[soq.reg.name][soq.idx]
continue
in_quregs = {
reg.name: np.empty((*reg.shape, reg.bitsize), dtype=object)
for reg in binst.bloq.signature.lefts()
}
# The out_quregs inform us of the total # of wires in the circuit to account for
# wires that are split or allocated in the cbloq
out_quregs = {
reg.name: np.empty((*reg.shape, reg.bitsize), dtype=object)
for reg in binst.bloq.signature.rights()
}
for pred in pred_cxns:
soq = pred.right
soq_to_wires[soq] = soq_to_wires[pred.left]
in_quregs[soq.reg.name][soq.idx] = np.squeeze(soq_to_wires[soq])
for succ in succ_cxns:
soq = succ.left
if soq.reg.side == qt.Side.RIGHT:
# When in_quregs != out_quregs, it means that there are wires unaccounted
# for. We account for these wires and update soq_to_wires and in_quregs
# accordingly.
if len(in_quregs) != len(out_quregs):
total_elements = np.prod(soq.reg.shape) * soq.reg.bitsize
ascending_vals = np.arange(
soq_to_wires_len,
total_elements + soq_to_wires_len,
dtype=object,
)
soq_to_wires_len += total_elements
in_quregs[soq.reg.name] = ascending_vals.reshape(
(*soq.reg.shape, soq.reg.bitsize)
)
soq_to_wires[soq] = in_quregs[soq.reg.name][soq.idx]
total_wires = [int(w) for ws in in_quregs.values() for w in list(ws.ravel())]
mapped_wires = [wires[idx] for idx in total_wires if idx < len(wires)]
ghost_wires = [f"alloc_free_{val}" for val in total_wires if val >= len(wires)]
op = _get_to_pl_op()(binst.bloq, mapped_wires + ghost_wires)
if op:
ops.append(op)
except (qt.DecomposeNotImplementedError, qt.DecomposeTypeError):
pass
if len(ops) == 0:
raise DecompositionUndefinedError
return ops
# pylint: disable=invalid-overridden-method, arguments-renamed
@property
def has_matrix(self) -> bool:
r"""Return if the ``Bloq`` has a valid matrix representation."""
bloq = self.hyperparameters["bloq"]
matrix = bloq.tensor_contract()
return matrix.shape == (2 ** len(self.wires), 2 ** len(self.wires))
# TODO: Remove when PL supports pylint==3.3.6 (it is considered a useless-suppression) [sc-91362]
# pylint: disable=no-method-argument
[docs]
def compute_matrix(*params, **hyperparams): # pylint: disable=no-self-argument
bloq = hyperparams["bloq"]
matrix = bloq.tensor_contract()
if matrix.shape != (2 ** len(params[0].wires), 2 ** len(params[0].wires)):
raise MatrixUndefinedError
return matrix
class _QReg:
"""Used as a container for qubits that form a `Register` of a given bitsize. This is a modified
version of `_QReg <https://github.com/quantumlib/Qualtran/blob/main/qualtran/cirq_interop/_cirq_to_bloq.py>`_
found in Qualtran as well.
Each instance of `_QReg` would correspond to a `Soquet` in Bloqs and represents an opaque collection
of qubits that together form a quantum register.
"""
def __init__(self, qubits: Tuple["cirq.Qid", ...], dtype: "qt.QDType"):
if isinstance(qubits, cirq.Qid):
self.qubits = (qubits,)
else:
self.qubits = tuple(qubits)
self.dtype = dtype
self._initialized = True
def __setattr__(self, name, value):
"""Makes the instance immutable after initialization."""
if getattr(self, "_initialized", False):
raise AttributeError(
f"Cannot set attribute '{name}'. Instances of _QReg are immutable."
)
super().__setattr__(name, value)
def __repr__(self) -> str:
"""Provides a developer-friendly string representation."""
return f"_QReg(qubits={self.qubits!r}, dtype={self.dtype!r})"
# Override the __eq__ and __hash__ functions to handle single qubit registers
# that are functionally the same but have different dtypes.
def __eq__(self, other) -> bool:
if not isinstance(other, _QReg):
return False
return self.qubits == other.qubits
def __hash__(self):
return hash(self.qubits)
def _ensure_in_reg_exists(
bb: "qt.BloqBuilder",
in_reg: "_QReg",
qreg_to_qvar: Dict["_QReg", "qt.Soquet"],
) -> None:
"""Modified function from the Qualtran-Cirq interop module to ensure `qreg_to_qvar[in_reg]`
exists. If `in_reg` is not found in `qreg_to_qvar`, that means that the input qubit register
is a multi-qubit register. All in_regs should be single qubit registers, so this would be a
bug, and an AssertionError is raised. To capture control flow, multi-qubit registers will be
allowed, and we will remove the AssertionError and use Split and Join operations as needed.
Args:
bb (qt.BloqBuilder): an instance of a Qualtran BloqBuilder
in_reg (_QReg): a container for qubits that form a Register of a given bitsize
qreg_to_qvar (Dict[_QReg, qt.Soquet]): a dictionary of _QRegs that corresponds to Soquets
Raises:
AssertionError: `in_reg` was not found in `qreg_to_qvar`, meaning there exists multi-qubit
registers that we do not support at the moment
"""
all_mapped_qubits = {q for qreg in qreg_to_qvar for q in qreg.qubits}
qubits_to_allocate = [q for q in in_reg.qubits if q not in all_mapped_qubits]
if qubits_to_allocate:
n_alloc = len(qubits_to_allocate)
qreg_to_qvar[
_QReg(qubits_to_allocate, dtype=qt.QBit() if n_alloc == 1 else qt.QAny(n_alloc))
] = bb.allocate(n_alloc)
# if in_reg not in qreg_to_qvar: splits & joins needed, which shouldn't be the case
assert in_reg in qreg_to_qvar, f"Input register {in_reg} not found, suggesting a bug"
def _gather_input_soqs(bb: "qt.BloqBuilder", op_quregs, qreg_to_qvar):
"""Modified function from Qualtran-Cirq interop module that collects input Soquets.
Args:
bb (qt.BloqBuilder): an instance of a Qualtran BloqBuilder
op_quregs (Dict[str, _QRegs]): a dict of register names that corresponds to _QRegs
qreg_to_qvar (Dict[str, qt.Soquet]): a dict of register names that corresponds to input Soquets
Returns:
dict: in_reg was not found in qreg_to_qvar
"""
qvars_in = {}
for reg_name, quregs in op_quregs.items():
flat_soqs: List[qt.Soquet] = []
for qureg in quregs.flatten():
_ensure_in_reg_exists(bb, qureg, qreg_to_qvar)
flat_soqs.append(qreg_to_qvar[qureg])
qvars_in[reg_name] = np.array(flat_soqs).reshape(quregs.shape)
return qvars_in
[docs]
class ToBloq(Bloq): # pylint:disable=useless-object-inheritance (Inherit qt.Bloq optionally)
r"""
An adapter to convert a PennyLane :class:`~.QNode`, ``Qfunc``, or :class:`~.Operation` to a
`Qualtran Bloq <https://qualtran.readthedocs.io/en/latest/bloqs/index.html#bloqs-library>`__.
.. note::
This class requires the latest version of Qualtran. We recommend installing the latest
release via ``pip``:
.. code-block:: console
pip install qualtran
Args:
op (QNode| Qfunc | Operation): a PennyLane ``QNode``, ``Qfunc``, or operator to be wrapped
as a Qualtran Bloq.
map_ops (bool): Whether to map operations to a Qualtran Bloq. Operations are wrapped
as a ``ToBloq`` when ``False``. Default is ``True``.
custom_mapping (dict): Dictionary to specify a mapping between a PennyLane operator and a
Qualtran Bloq. A default mapping is used if not defined.
Raises:
TypeError: operator must be an instance of :class:`~.Operation`.
.. seealso:: :func:`~.to_bloq` for the recommended way to convert from PennyLane objects to
their Qualtran equivalents
**Example**
This example shows how to use ``qml.ToBloq``:
>>> from qualtran.resource_counting.generalizers import generalize_rotation_angle
>>> op = qml.QuantumPhaseEstimation(
... qml.RX(0.2, wires=[0]), estimation_wires=[1, 2]
... )
>>> op_as_bloq = qml.ToBloq(op)
>>> graph, sigma = op_as_bloq.call_graph(generalize_rotation_angle)
>>> sigma
{Hadamard(): 4,
Controlled(subbloq=Rx(angle=0.2, eps=1e-11), ctrl_spec=CtrlSpec(qdtypes=(QBit(),), cvs=(array(1),))): 3,
TwoBitSwap(): 1,
CNOT(): 2,
ZPowGate(exponent=\phi, eps=5e-12): 2,
ZPowGate(exponent=\phi, eps=1e-11): 1}
"""
def __init__(self, op, map_ops=False, custom_mapping=None, **kwargs):
if not qualtran:
raise ImportError(
"Optional dependency 'qualtran' is required "
"for ToBloq functionality but is not installed. Try `pip install qualtran`."
)
if not isinstance(op, Operator) and not isinstance(op, QNode) and not callable(op):
raise TypeError(
f"Input must be either an instance of {Operator}, {QNode} or a quantum function."
)
self.op = op
self.map_ops = map_ops
self.custom_mapping = custom_mapping
self._kwargs = kwargs
super().__init__()
@cached_property
def signature(self) -> "qt.Signature":
"""Compute and return Qualtran signature for given op or QNode."""
if isinstance(self.op, QNode):
self.op.name = "QNode"
num_wires = len(construct_tape(self.op)(**self._kwargs).wires)
elif isinstance(self.op, Operator):
num_wires = len(self.op.wires)
else:
num_wires = len(make_qscript(self.op)(**self._kwargs).wires)
return qt.Signature([qt.Register("qubits", qt.QBit(), shape=num_wires)])
[docs]
def decompose_bloq(self): # pylint:disable=too-many-branches
"""Decompose the bloq using the op's decomposition or the tape of the QNode"""
try:
if isinstance(self.op, QNode):
tape = construct_tape(self.op)(**self._kwargs)
ops = tape.circuit
all_wires = list(tape.wires)
elif isinstance(self.op, Operator):
ops = self.op.decomposition()
all_wires = list(self.op.wires)
else:
tape = make_qscript(self.op)(**self._kwargs)
ops = tape.operations
all_wires = list(tape.wires)
signature = self.signature
in_quregs = out_quregs = {"qubits": np.array(all_wires).reshape(len(all_wires), 1)}
in_key = list(in_quregs.keys())[0]
out_key = list(out_quregs.keys())[0]
in_quregs = {
in_key: np.apply_along_axis(
_QReg, -1, in_quregs[in_key], signature.get_left(in_key).dtype
)
}
out_quregs = {
out_key: np.apply_along_axis(
_QReg, -1, out_quregs[out_key], signature.get_right(out_key).dtype
)
}
bb, initial_soqs = qt.BloqBuilder.from_signature(signature, add_registers_allowed=False)
# `signature.lefts()` can be thought of as input qubits. For our purposes LEFT and
# RIGHT signatures will in most cases match since there are no allocated & freed
# qubits. Here, qreg_to_qvar is a map between a register and a Soquet. This serves
# as the foundation to wire up the rest of the bloqs.
qreg_to_qvar = {}
for reg in signature.lefts():
assert reg.name in in_quregs
soqs = initial_soqs[reg.name]
assert in_quregs[reg.name].shape == soqs.shape
qreg_to_qvar |= zip(in_quregs[reg.name].flatten(), soqs.flatten())
# Add each operation to the composite Bloq.
for op in ops:
bloq = _map_to_bloq(
op, map_ops=self.map_ops, custom_mapping=self.custom_mapping, **self._kwargs
)
if bloq is None:
continue
if bloq.signature == qt.Signature([]):
bb.add(bloq)
continue
reg_dtypes = [r.dtype for r in bloq.signature]
# Find input / output registers.
all_op_quregs = {
k: np.apply_along_axis(_QReg, -1, *(v, reg_dtypes[i])) # type: ignore
for i, (k, v) in enumerate(split_qubits(bloq.signature, op.wires).items())
}
in_op_quregs = {reg.name: all_op_quregs[reg.name] for reg in bloq.signature.lefts()}
# Find input Soquets, by potentially allocating new Bloq registers corresponding to
# input `in_quregs` and updating the `qreg_to_qvar` mapping.
qvars_in = _gather_input_soqs(bb, in_op_quregs, qreg_to_qvar)
# Add Bloq to the `CompositeBloq` compute graph and get corresponding output Soquets.
qvars_out = bb.add_d(bloq, **qvars_in)
# Update `qreg_to_qvar` mapping using output soquets `qvars_out`.
for reg in bloq.signature:
# all_op_quregs should exist for both LEFT & RIGHT registers.
assert reg.name in all_op_quregs
quregs = all_op_quregs[reg.name]
if reg.side != qt.Side.LEFT:
assert quregs.shape == np.array(qvars_out[reg.name]).shape
qreg_to_qvar |= zip(
quregs.flatten(), np.array(qvars_out[reg.name]).flatten()
)
# Combine Soquets to match the right signature.
final_soqs_dict = _gather_input_soqs(
bb,
{reg.name: out_quregs[reg.name] for reg in signature.rights()},
qreg_to_qvar,
)
final_soqs_set = set(soq for soqs in final_soqs_dict.values() for soq in soqs.flatten())
# Free all dangling Soquets which are not part of the final soquets set.
for qvar in qreg_to_qvar.values():
if qvar not in final_soqs_set:
bb.free(qvar)
cbloq = bb.finalize(**final_soqs_dict)
return cbloq
except DecompositionUndefinedError as undefined_decomposition:
raise qt.DecomposeNotImplementedError from undefined_decomposition
[docs]
def build_call_graph(self, ssa):
"""Build Qualtran call graph with defined call graph if available, otherwise build
said call graph with the decomposition"""
call_graph = _get_op_call_graph(self.op)
if call_graph:
return call_graph
return self.decompose_bloq().build_call_graph(ssa)
def __repr__(self):
if isinstance(self.op, QNode):
return "ToBloq(QNode)"
if isinstance(self.op, Operation):
return f"ToBloq({self.op.name})"
return "ToBloq(Qfunc)"
def __eq__(self, other):
if type(other) is type(self):
return self.op == other.op
return False
def __hash__(self):
return hash(self.op)
def __str__(self):
if hasattr(self.op, "name"):
return f"PL{self.op.name}"
return "PLQfunc"
[docs]
def to_bloq(circuit, map_ops: bool = True, custom_mapping: dict = None, **kwargs):
"""
Converts a PennyLane :class:`~.QNode`, ``Qfunc``, or :class:`~.Operation` to the corresponding `Qualtran Bloq <https://qualtran.readthedocs.io/en/latest/bloqs/index.html#bloqs-library>`__.
.. note::
This class requires the latest version of Qualtran. We recommend installing the latest
release via ``pip``:
.. code-block:: console
pip install qualtran
Args:
circuit (QNode| Qfunc | Operation): a PennyLane ``QNode``, ``Qfunc``, or operator to be wrapped
as a Qualtran Bloq.
map_ops (bool): Whether to map operations to a Qualtran Bloq. Operations are wrapped
as a ``ToBloq`` when ``False``. Default is ``True``.
custom_mapping (dict): Dictionary to specify a mapping between a PennyLane operator and a
Qualtran Bloq. A default mapping is used if not defined.
Returns:
Bloq: The Qualtran Bloq that corresponds to the given circuit or :class:`~.Operation` and
options.
.. seealso:: :class:`~.ToBloq` for the Bloq objects created when no Qualtran equivalent is found
**Example**
This example shows how to use ``qml.to_bloq``:
>>> from qualtran.resource_counting.generalizers import generalize_rotation_angle
>>> op = qml.QuantumPhaseEstimation(
... qml.RX(0.2, wires=[0]), estimation_wires=[1, 2]
... )
>>> op_as_bloq = qml.to_bloq(op)
>>> graph, sigma = op_as_bloq.call_graph(generalize_rotation_angle)
>>> sigma
{Allocate(dtype=QFxp(bitsize=2, num_frac=2, signed=False), dirty=False): 1,
Hadamard(): 4,
Controlled(subbloq=Rx(angle=0.2, eps=1e-11), ctrl_spec=CtrlSpec(qdtypes=(QBit(),), cvs=(array(1),))): 3,
And(cv1=1, cv2=1, uncompute=True): 1,
And(cv1=1, cv2=1, uncompute=False): 1,
ZPowGate(exponent=\\phi, eps=1e-10): 1,
TwoBitSwap(): 1}
.. details::
:title: Usage Details
Some PennyLane operators don't have a direct equivalent in Qualtran. For example, in Qualtran, there
are many varieties of Quantum Phase Estimation. When ``qml.to_bloq`` is called on
:class:`~pennylane.QuantumPhaseEstimation`, a smart default is chosen.
>>> qml.to_bloq(qml.QuantumPhaseEstimation(
... unitary=qml.RX(0.1, wires=0), estimation_wires=range(1, 5)
... ))
TextbookQPE(unitary=Rx(angle=0.1, eps=1e-11), ctrl_state_prep=RectangularWindowState(bitsize=4), qft_inv=Adjoint(subbloq=QFTTextBook(bitsize=4, with_reverse=True)))
Note that the chosen Qualtran Bloq may not be an exact equivalent. If an exact
equivalent is needed, we recommend setting ``map_ops`` to ``False``.
This will wrap the input PennyLane operator as a Qualtran Bloq, enabling Qualtran functions
such as ``decompose_bloq`` or ``call_graph``, but maintaining the PennyLane decomposition definition of the operator.
>>> qml.to_bloq(qml.QuantumPhaseEstimation(
... unitary=qml.RX(0.1, wires=0), estimation_wires=range(1, 5)
... ), map_ops=False)
ToBloq(QuantumPhaseEstimation)
Alternatively, users can provide a custom mapping that maps a PennyLane operator to a
specific Qualtran Bloq. It is recommended to map operators at the high level, rather than
attempt to map operators that appear in the operator's decomposition.
>>> from qualtran.bloqs.phase_estimation import TextbookQPE
>>> from qualtran.bloqs.phase_estimation.lp_resource_state import LPResourceState
>>> op = qml.QuantumPhaseEstimation(
... unitary=qml.RX(0.1, wires=0), estimation_wires=range(1, 5)
... )
>>> custom_mapping = {
... op : TextbookQPE(
... unitary=qml.to_bloq(qml.RX(0.1, wires=0)),
... ctrl_state_prep=LPResourceState(4),
... )
... }
>>> qml.to_bloq(op, custom_mapping=custom_mapping)
TextbookQPE(unitary=Rx(angle=0.1, eps=1e-11), ctrl_state_prep=LPResourceState(bitsize=4), qft_inv=Adjoint(subbloq=QFTTextBook(bitsize=4, with_reverse=True)))
"""
if not qualtran:
raise ImportError(
"The `to_bloq` function requires Qualtran to be installed. You can install"
"qualtran via: pip install qualtran."
)
if map_ops and custom_mapping:
return _map_to_bloq(circuit, map_ops=True, custom_mapping=custom_mapping, **kwargs)
return _map_to_bloq(circuit, map_ops=map_ops, **kwargs)
_modules/pennylane/io/qualtran_io
Download Python script
Download Notebook
View on GitHub