Source code for pennylane.transforms.qcut

# Copyright 2022 Xanadu Quantum Technologies Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Functions for performing quantum circuit cutting.
"""

import copy
import inspect
import string
import uuid
import warnings
from collections.abc import Sequence as SequenceType
from dataclasses import InitVar, dataclass
from functools import partial
from itertools import compress, product
from pathlib import Path
from typing import Any, Callable, ClassVar, Dict, List, Optional, Sequence, Tuple, Union

from networkx import MultiDiGraph, has_path, weakly_connected_components

import pennylane as qml
from pennylane import apply, expval
from pennylane import numpy as np
from pennylane.pauli import string_to_pauli_word
from pennylane.measurements import Expectation, MeasurementProcess, Sample
from pennylane.operation import Operation, Operator, Tensor
from pennylane.ops.qubit.non_parametric_ops import WireCut
from pennylane.tape import QuantumTape
from pennylane.wires import Wires

from .batch_transform import batch_transform


class MeasureNode(Operation):
    """Placeholder node for measurement operations"""

    num_wires = 1
    grad_method = None

    def __init__(self, *params, wires=None, do_queue=True, id=None):
        id = id or str(uuid.uuid4())

        super().__init__(*params, wires=wires, do_queue=do_queue, id=id)


class PrepareNode(Operation):
    """Placeholder node for state preparations"""

    num_wires = 1
    grad_method = None

    def __init__(self, *params, wires=None, do_queue=True, id=None):
        id = id or str(uuid.uuid4())

        super().__init__(*params, wires=wires, do_queue=do_queue, id=id)


def replace_wire_cut_node(node: WireCut, graph: MultiDiGraph):
    """
    Replace a :class:`~.WireCut` node in the graph with a :class:`~.MeasureNode`
    and :class:`~.PrepareNode`.

    .. note::

        This function is designed for use as part of the circuit cutting workflow.
        Check out the :func:`qml.cut_circuit() <pennylane.cut_circuit>` transform for more details.

    Args:
        node (WireCut): the  :class:`~.WireCut` node to be replaced with a :class:`~.MeasureNode`
            and :class:`~.PrepareNode`
        graph (nx.MultiDiGraph): the graph containing the node to be replaced

    **Example**

    Consider the following circuit with a manually-placed wire cut:

    .. code-block:: python

        wire_cut = qml.WireCut(wires=0)

        with qml.tape.QuantumTape() as tape:
            qml.RX(0.4, wires=0)
            qml.apply(wire_cut)
            qml.RY(0.5, wires=0)
            qml.expval(qml.PauliZ(0))

    We can find the circuit graph and remove the wire cut node using:

    >>> graph = qml.transforms.qcut.tape_to_graph(tape)
    >>> qml.transforms.qcut.replace_wire_cut_node(wire_cut, graph)
    """
    predecessors = graph.pred[node]
    successors = graph.succ[node]

    predecessor_on_wire = {}
    for op, data in predecessors.items():
        for d in data.values():
            wire = d["wire"]
            predecessor_on_wire[wire] = op

    successor_on_wire = {}
    for op, data in successors.items():
        for d in data.values():
            wire = d["wire"]
            successor_on_wire[wire] = op

    order = graph.nodes[node]["order"]
    graph.remove_node(node)

    for wire in node.wires:
        predecessor = predecessor_on_wire.get(wire, None)
        successor = successor_on_wire.get(wire, None)

        meas = MeasureNode(wires=wire)
        prep = PrepareNode(wires=wire)

        # We are introducing a degeneracy in the order of the measure and prepare nodes
        # here but the order can be inferred as MeasureNode always precedes
        # the corresponding PrepareNode
        graph.add_node(meas, order=order)
        graph.add_node(prep, order=order)

        graph.add_edge(meas, prep, wire=wire)

        if predecessor is not None:
            graph.add_edge(predecessor, meas, wire=wire)
        if successor is not None:
            graph.add_edge(prep, successor, wire=wire)


[docs]def replace_wire_cut_nodes(graph: MultiDiGraph): """ Replace each :class:`~.WireCut` node in the graph with a :class:`~.MeasureNode` and :class:`~.PrepareNode`. .. note:: This function is designed for use as part of the circuit cutting workflow. Check out the :func:`qml.cut_circuit() <pennylane.cut_circuit>` transform for more details. Args: graph (nx.MultiDiGraph): The graph containing the :class:`~.WireCut` nodes to be replaced **Example** Consider the following circuit with manually-placed wire cuts: .. code-block:: python wire_cut_0 = qml.WireCut(wires=0) wire_cut_1 = qml.WireCut(wires=1) multi_wire_cut = qml.WireCut(wires=[0, 1]) with qml.tape.QuantumTape() as tape: qml.RX(0.4, wires=0) qml.apply(wire_cut_0) qml.RY(0.5, wires=0) qml.apply(wire_cut_1) qml.CNOT(wires=[0, 1]) qml.apply(multi_wire_cut) qml.RZ(0.6, wires=1) qml.expval(qml.PauliZ(0)) We can find the circuit graph and remove all the wire cut nodes using: >>> graph = qml.transforms.qcut.tape_to_graph(tape) >>> qml.transforms.qcut.replace_wire_cut_nodes(graph) """ for op in list(graph.nodes): if isinstance(op, WireCut): replace_wire_cut_node(op, graph)
def _add_operator_node(graph: MultiDiGraph, op: Operator, order: int, wire_latest_node: dict): """ Helper function to add operators as nodes during tape to graph conversion. """ graph.add_node(op, order=order) for wire in op.wires: if wire_latest_node[wire] is not None: parent_op = wire_latest_node[wire] graph.add_edge(parent_op, op, wire=wire) wire_latest_node[wire] = op
[docs]def tape_to_graph(tape: QuantumTape) -> MultiDiGraph: """ Converts a quantum tape to a directed multigraph. .. note:: This operation is designed for use as part of the circuit cutting workflow. Check out the :func:`qml.cut_circuit() <pennylane.cut_circuit>` transform for more details. Args: tape (QuantumTape): tape to be converted into a directed multigraph Returns: nx.MultiDiGraph: a directed multigraph that captures the circuit structure of the input tape **Example** Consider the following tape: .. code-block:: python with qml.tape.QuantumTape() as tape: qml.RX(0.4, wires=0) qml.RY(0.9, wires=0) qml.CNOT(wires=[0, 1]) qml.expval(qml.PauliZ(1)) Its corresponding circuit graph can be found using >>> qml.transforms.qcut.tape_to_graph(tape) <networkx.classes.multidigraph.MultiDiGraph at 0x7fe41cbd7210> """ graph = MultiDiGraph() wire_latest_node = {w: None for w in tape.wires} for order, op in enumerate(tape.operations): _add_operator_node(graph, op, order, wire_latest_node) order += 1 # pylint: disable=undefined-loop-variable for m in tape.measurements: obs = getattr(m, "obs", None) if obs is not None and isinstance(obs, Tensor): if m.return_type is Sample: raise ValueError( "Sampling from tensor products of observables " "is not supported in circuit cutting" ) for o in obs.obs: m_ = MeasurementProcess(m.return_type, obs=o) _add_operator_node(graph, m_, order, wire_latest_node) elif m.return_type is Sample and obs is None: for w in m.wires: s_ = qml.sample(qml.Projector([1], wires=w)) _add_operator_node(graph, s_, order, wire_latest_node) else: _add_operator_node(graph, m, order, wire_latest_node) order += 1 return graph
# pylint: disable=too-many-branches
[docs]def fragment_graph(graph: MultiDiGraph) -> Tuple[Tuple[MultiDiGraph], MultiDiGraph]: """ Fragments a graph into a collection of subgraphs as well as returning the communication (`quotient <https://en.wikipedia.org/wiki/Quotient_graph>`__) graph. The input ``graph`` is fragmented by disconnecting each :class:`~.MeasureNode` and :class:`~.PrepareNode` pair and finding the resultant disconnected subgraph fragments. Each node of the communication graph represents a subgraph fragment and the edges denote the flow of qubits between fragments due to the removed :class:`~.MeasureNode` and :class:`~.PrepareNode` pairs. .. note:: This operation is designed for use as part of the circuit cutting workflow. Check out the :func:`qml.cut_circuit() <pennylane.cut_circuit>` transform for more details. Args: graph (nx.MultiDiGraph): directed multigraph containing measure and prepare nodes at cut locations Returns: Tuple[Tuple[nx.MultiDiGraph], nx.MultiDiGraph]: the subgraphs of the cut graph and the communication graph. **Example** Consider the following circuit with manually-placed wire cuts: .. code-block:: python wire_cut_0 = qml.WireCut(wires=0) wire_cut_1 = qml.WireCut(wires=1) multi_wire_cut = qml.WireCut(wires=[0, 1]) with qml.tape.QuantumTape() as tape: qml.RX(0.4, wires=0) qml.apply(wire_cut_0) qml.RY(0.5, wires=0) qml.apply(wire_cut_1) qml.CNOT(wires=[0, 1]) qml.apply(multi_wire_cut) qml.RZ(0.6, wires=1) qml.expval(qml.PauliZ(0)) We can find the corresponding graph, remove all the wire cut nodes, and find the subgraphs and communication graph by using: >>> graph = qml.transforms.qcut.tape_to_graph(tape) >>> qml.transforms.qcut.replace_wire_cut_nodes(graph) >>> qml.transforms.qcut.fragment_graph(graph) ((<networkx.classes.multidigraph.MultiDiGraph object at 0x7fb3b2311940>, <networkx.classes.multidigraph.MultiDiGraph object at 0x7fb3b2311c10>, <networkx.classes.multidigraph.MultiDiGraph object at 0x7fb3b23e2820>, <networkx.classes.multidigraph.MultiDiGraph object at 0x7fb3b23e27f0>), <networkx.classes.multidigraph.MultiDiGraph object at 0x7fb3b23e26a0>) """ graph_copy = graph.copy() cut_edges = [] measure_nodes = [n for n in graph.nodes if isinstance(n, MeasurementProcess)] for node1, node2, wire_key in graph.edges: if isinstance(node1, MeasureNode): assert isinstance(node2, PrepareNode) cut_edges.append((node1, node2, wire_key)) graph_copy.remove_edge(node1, node2, key=wire_key) subgraph_nodes = weakly_connected_components(graph_copy) subgraphs = tuple(MultiDiGraph(graph_copy.subgraph(n)) for n in subgraph_nodes) communication_graph = MultiDiGraph() communication_graph.add_nodes_from(range(len(subgraphs))) for node1, node2, _ in cut_edges: for i, subgraph in enumerate(subgraphs): if subgraph.has_node(node1): start_fragment = i if subgraph.has_node(node2): end_fragment = i if start_fragment != end_fragment: communication_graph.add_edge(start_fragment, end_fragment, pair=(node1, node2)) else: # The MeasureNode and PrepareNode pair live in the same fragment and did not result # in a disconnection. We can therefore remove these nodes. Note that we do not need # to worry about adding back an edge between the predecessor to node1 and the successor # to node2 because our next step is to convert the fragment circuit graphs to tapes, # a process that does not depend on edge connections in the subgraph. subgraphs[start_fragment].remove_node(node1) subgraphs[end_fragment].remove_node(node2) terminal_indices = [i for i, s in enumerate(subgraphs) for n in measure_nodes if s.has_node(n)] subgraphs_connected_to_measurements = [] subgraphs_indices_to_remove = [] prepare_nodes_removed = [] for i, s in enumerate(subgraphs): if any(has_path(communication_graph, i, t) for t in terminal_indices): subgraphs_connected_to_measurements.append(s) else: subgraphs_indices_to_remove.append(i) prepare_nodes_removed.extend([n for n in s.nodes if isinstance(n, PrepareNode)]) measure_nodes_to_remove = [ m for p in prepare_nodes_removed for m, p_, _ in cut_edges if p is p_ ] communication_graph.remove_nodes_from(subgraphs_indices_to_remove) for m in measure_nodes_to_remove: for s in subgraphs_connected_to_measurements: if s.has_node(m): s.remove_node(m) return subgraphs_connected_to_measurements, communication_graph
def _find_new_wire(wires: Wires) -> int: """Finds a new wire label that is not in ``wires``.""" ctr = 0 while ctr in wires: ctr += 1 return ctr # pylint: disable=protected-access
[docs]def graph_to_tape(graph: MultiDiGraph) -> QuantumTape: """ Converts a directed multigraph to the corresponding :class:`~.QuantumTape`. To account for the possibility of needing to perform mid-circuit measurements, if any operations follow a :class:`MeasureNode` operation on a given wire then these operations are mapped to a new wire. .. note:: This function is designed for use as part of the circuit cutting workflow. Check out the :func:`qml.cut_circuit() <pennylane.cut_circuit>` transform for more details. Args: graph (nx.MultiDiGraph): directed multigraph to be converted to a tape Returns: QuantumTape: the quantum tape corresponding to the input graph **Example** Consider the following circuit: .. code-block:: python with qml.tape.QuantumTape() as tape: qml.RX(0.4, wires=0) qml.RY(0.5, wires=1) qml.CNOT(wires=[0, 1]) qml.transforms.qcut.MeasureNode(wires=1) qml.transforms.qcut.PrepareNode(wires=1) qml.CNOT(wires=[1, 0]) qml.expval(qml.PauliZ(0)) This circuit contains operations that follow a :class:`~.MeasureNode`. These operations will subsequently act on wire ``2`` instead of wire ``1``: >>> graph = qml.transforms.qcut.tape_to_graph(tape) >>> tape = qml.transforms.qcut.graph_to_tape(graph) >>> print(tape.draw()) 0: ──RX──────────╭●──────────────╭X─┤ <Z> 1: ──RY──────────╰X──MeasureNode─│──┤ 2: ──PrepareNode─────────────────╰●─┤ """ wires = Wires.all_wires([n.wires for n in graph.nodes]) ordered_ops = sorted( [(order, op) for op, order in graph.nodes(data="order")], key=lambda x: x[0] ) wire_map = {w: w for w in wires} reverse_wire_map = {v: k for k, v in wire_map.items()} copy_ops = [copy.copy(op) for _, op in ordered_ops if not isinstance(op, MeasurementProcess)] copy_meas = [copy.copy(op) for _, op in ordered_ops if isinstance(op, MeasurementProcess)] observables = [] with QuantumTape() as tape: for op in copy_ops: op = qml.map_wires(op, wire_map=wire_map, queue=True) if isinstance(op, MeasureNode): assert len(op.wires) == 1 measured_wire = op.wires[0] new_wire = _find_new_wire(wires) wires += new_wire original_wire = reverse_wire_map[measured_wire] wire_map[original_wire] = new_wire reverse_wire_map[new_wire] = original_wire if copy_meas: return_types = set(meas.return_type for meas in copy_meas) if len(return_types) > 1: raise ValueError( "Only a single return type can be used for measurement " "nodes in graph_to_tape" ) return_type = return_types.pop() if return_type not in {Sample, Expectation}: raise ValueError( "Invalid return type. Only expectation value and sampling measurements " "are supported in graph_to_tape" ) for meas in copy_meas: meas = qml.map_wires(meas, wire_map=wire_map) obs = meas.obs observables.append(obs) if return_type is Sample: apply(meas) if return_type is Expectation: if len(observables) > 1: qml.expval(Tensor(*observables)) else: qml.expval(obs) return tape
def _get_measurements( group: Sequence[Operator], measurements: Sequence[MeasurementProcess] ) -> List[MeasurementProcess]: """Pairs each observable in ``group`` with the circuit ``measurements``. Only a single measurement of an expectation value is currently supported in ``measurements``. Args: group (Sequence[Operator]): a collection of observables measurements (Sequence[MeasurementProcess]): measurements from the circuit Returns: List[MeasurementProcess]: the expectation values of ``g @ obs``, where ``g`` is iterated over ``group`` and ``obs`` is the observable composing the single measurement in ``measurements`` """ if len(group) == 0: # This ensures the measurements of the original tape are carried over to the # following tape configurations in the absence of any MeasureNodes in the fragment return measurements n_measurements = len(measurements) if n_measurements > 1: raise ValueError( "The circuit cutting workflow only supports circuits with a single output " "measurement" ) if n_measurements == 0: return [expval(g) for g in group] measurement = measurements[0] if measurement.return_type is not Expectation: raise ValueError( "The circuit cutting workflow only supports circuits with expectation " "value measurements" ) obs = measurement.obs return [expval(copy.copy(obs) @ g) for g in group] def _prep_zero_state(wire): qml.Identity(wire) def _prep_one_state(wire): qml.PauliX(wire) def _prep_plus_state(wire): qml.Hadamard(wire) def _prep_minus_state(wire): qml.PauliX(wire) qml.Hadamard(wire) def _prep_iplus_state(wire): qml.Hadamard(wire) qml.S(wires=wire) def _prep_iminus_state(wire): qml.PauliX(wire) qml.Hadamard(wire) qml.S(wires=wire) PREPARE_SETTINGS = [_prep_zero_state, _prep_one_state, _prep_plus_state, _prep_iplus_state]
[docs]def expand_fragment_tape( tape: QuantumTape, ) -> Tuple[List[QuantumTape], List[PrepareNode], List[MeasureNode]]: """ Expands a fragment tape into a sequence of tapes for each configuration of the contained :class:`MeasureNode` and :class:`PrepareNode` operations. .. note:: This function is designed for use as part of the circuit cutting workflow. Check out the :func:`qml.cut_circuit() <pennylane.cut_circuit>` transform for more details. Args: tape (QuantumTape): the fragment tape containing :class:`MeasureNode` and :class:`PrepareNode` operations to be expanded Returns: Tuple[List[QuantumTape], List[PrepareNode], List[MeasureNode]]: the tapes corresponding to each configuration and the order of preparation nodes and measurement nodes used in the expansion **Example** Consider the following circuit, which contains a :class:`~.MeasureNode` and :class:`~.PrepareNode` operation: .. code-block:: python with qml.tape.QuantumTape() as tape: qml.transforms.qcut.PrepareNode(wires=0) qml.RX(0.5, wires=0) qml.transforms.qcut.MeasureNode(wires=0) We can expand over the measurement and preparation nodes using: >>> tapes, prep, meas = qml.transforms.qcut.expand_fragment_tape(tape) >>> for t in tapes: ... print(qml.drawer.tape_text(t, decimals=1)) 0: ──I──RX(0.5)─┤ <I> <Z> 0: ──I──RX(0.5)─┤ <X> 0: ──I──RX(0.5)─┤ <Y> 0: ──X──RX(0.5)─┤ <I> <Z> 0: ──X──RX(0.5)─┤ <X> 0: ──X──RX(0.5)─┤ <Y> 0: ──H──RX(0.5)─┤ <I> <Z> 0: ──H──RX(0.5)─┤ <X> 0: ──H──RX(0.5)─┤ <Y> 0: ──H──S──RX(0.5)─┤ <I> <Z> 0: ──H──S──RX(0.5)─┤ <X> 0: ──H──S──RX(0.5)─┤ <Y> """ prepare_nodes = [o for o in tape.operations if isinstance(o, PrepareNode)] measure_nodes = [o for o in tape.operations if isinstance(o, MeasureNode)] wire_map = {mn.wires[0]: i for i, mn in enumerate(measure_nodes)} n_meas = len(measure_nodes) if n_meas >= 1: measure_combinations = qml.pauli.partition_pauli_group(len(measure_nodes)) else: measure_combinations = [[""]] tapes = [] for prepare_settings in product(range(len(PREPARE_SETTINGS)), repeat=len(prepare_nodes)): for measure_group in measure_combinations: if n_meas >= 1: group = [ string_to_pauli_word(paulis, wire_map=wire_map) for paulis in measure_group ] else: group = [] prepare_mapping = { n: PREPARE_SETTINGS[s] for n, s in zip(prepare_nodes, prepare_settings) } with QuantumTape() as tape_: for op in tape.operations: if isinstance(op, PrepareNode): w = op.wires[0] prepare_mapping[op](w) elif not isinstance(op, MeasureNode): apply(op) with qml.QueuingManager.stop_recording(): measurements = _get_measurements(group, tape.measurements) for meas in measurements: apply(meas) tapes.append(tape_) return tapes, prepare_nodes, measure_nodes
MC_STATES = [ _prep_zero_state, _prep_one_state, _prep_plus_state, _prep_minus_state, _prep_iplus_state, _prep_iminus_state, _prep_zero_state, _prep_one_state, ] def _identity(wire): qml.sample(qml.Identity(wires=wire)) def _pauliX(wire): qml.sample(qml.PauliX(wires=wire)) def _pauliY(wire): qml.sample(qml.PauliY(wires=wire)) def _pauliZ(wire): qml.sample(qml.PauliZ(wires=wire)) MC_MEASUREMENTS = [ _identity, _identity, _pauliX, _pauliX, _pauliY, _pauliY, _pauliZ, _pauliZ, ]
[docs]def expand_fragment_tapes_mc( tapes: Sequence[QuantumTape], communication_graph: MultiDiGraph, shots: int ) -> Tuple[List[QuantumTape], np.ndarray]: """ Expands fragment tapes into a sequence of random configurations of the contained pairs of :class:`MeasureNode` and :class:`PrepareNode` operations. For each pair, a measurement is sampled from the Pauli basis and a state preparation is sampled from the corresponding pair of eigenstates. A settings array is also given which tracks the configuration pairs. Since each of the 4 measurements has 2 possible eigenvectors, all configurations can be uniquely identified by 8 values. The number of rows is determined by the number of cuts and the number of columns is determined by the number of shots. .. note:: This function is designed for use as part of the sampling-based circuit cutting workflow. Check out the :func:`~.cut_circuit_mc` transform for more details. Args: tapes (Sequence[QuantumTape]): the fragment tapes containing :class:`MeasureNode` and :class:`PrepareNode` operations to be expanded communication_graph (nx.MultiDiGraph): the communication (quotient) graph of the fragmented full graph shots (int): number of shots Returns: Tuple[List[QuantumTape], np.ndarray]: the tapes corresponding to each configuration and the settings that track each configuration pair **Example** Consider the following circuit that contains a sample measurement: .. code-block:: python with qml.tape.QuantumTape() as tape: qml.Hadamard(wires=0) qml.CNOT(wires=[0, 1]) qml.WireCut(wires=1) qml.CNOT(wires=[1, 2]) qml.sample(wires=[0, 1, 2]) We can generate the fragment tapes using the following workflow: >>> g = qml.transforms.qcut.tape_to_graph(tape) >>> qml.transforms.qcut.replace_wire_cut_nodes(g) >>> subgraphs, communication_graph = qml.transforms.qcut.fragment_graph(g) >>> tapes = [qml.transforms.qcut.graph_to_tape(sg) for sg in subgraphs] We can then expand over the measurement and preparation nodes to generate random configurations using: .. code-block:: python >>> configs, settings = qml.transforms.qcut.expand_fragment_tapes_mc(tapes, communication_graph, 3) >>> print(settings) [[1 6 2]] >>> for i, (c1, c2) in enumerate(zip(configs[0], configs[1])): ... print(f"config {i}:") ... print(c1.draw()) ... print("") ... print(c2.draw()) ... print("") ... config 0: 0: ──H─╭●─┤ Sample[|1⟩⟨1|] 1: ────╰X─┤ Sample[Z] 1: ──I─╭●─┤ Sample[|1⟩⟨1|] 2: ────╰X─┤ Sample[|1⟩⟨1|] config 1: 0: ──H─╭●─┤ Sample[|1⟩⟨1|] 1: ────╰X─┤ Sample[Y] 1: ──H──S─╭●─┤ Sample[|1⟩⟨1|] 2: ───────╰X─┤ Sample[|1⟩⟨1|] config 2: 0: ──H─╭●─┤ Sample[|1⟩⟨1|] 1: ────╰X─┤ Sample[Y] 1: ──X──H──S─╭●─┤ Sample[|1⟩⟨1|] 2: ──────────╰X─┤ Sample[|1⟩⟨1|] """ pairs = [e[-1] for e in communication_graph.edges.data("pair")] settings = np.random.choice(range(8), size=(len(pairs), shots), replace=True) meas_settings = {pair[0].id: setting for pair, setting in zip(pairs, settings)} prep_settings = {pair[1].id: setting for pair, setting in zip(pairs, settings)} all_configs = [] for tape in tapes: frag_config = [] for shot in range(shots): with qml.tape.QuantumTape() as new_tape: for op in tape.operations: w = op.wires[0] if isinstance(op, PrepareNode): MC_STATES[prep_settings[op.id][shot]](w) elif not isinstance(op, MeasureNode): qml.apply(op) for meas in tape.measurements: qml.apply(meas) for op in tape.operations: meas_w = op.wires[0] if isinstance(op, MeasureNode): MC_MEASUREMENTS[meas_settings[op.id][shot]](meas_w) frag_config.append(new_tape) all_configs.append(frag_config) return all_configs, settings
def _reshape_results(results: Sequence, shots: int) -> List[List]: """ Helper function to reshape ``results`` into a two-dimensional nested list whose number of rows is determined by the number of shots and whose number of columns is determined by the number of cuts. """ results = [qml.math.flatten(r) for r in results] results = [results[i : i + shots] for i in range(0, len(results), shots)] results = list(map(list, zip(*results))) # calculate list-based transpose return results
[docs]def qcut_processing_fn_sample( results: Sequence, communication_graph: MultiDiGraph, shots: int ) -> List: """ Function to postprocess samples for the :func:`cut_circuit_mc() <pennylane.cut_circuit_mc>` transform. This removes superfluous mid-circuit measurement samples from fragment circuit outputs. .. note:: This function is designed for use as part of the sampling-based circuit cutting workflow. Check out the :func:`qml.cut_circuit_mc() <pennylane.cut_circuit_mc>` transform for more details. Args: results (Sequence): a collection of sample-based execution results generated from the random expansion of circuit fragments over measurement and preparation node configurations communication_graph (nx.MultiDiGraph): the communication graph determining connectivity between circuit fragments shots (int): the number of shots Returns: List[tensor_like]: the sampled output for all terminal measurements over the number of shots given """ res0 = results[0] results = _reshape_results(results, shots) out_degrees = [d for _, d in communication_graph.out_degree] samples = [] for result in results: sample = [] for fragment_result, out_degree in zip(result, out_degrees): sample.append(fragment_result[: -out_degree or None]) samples.append(np.hstack(sample)) return [qml.math.convert_like(np.array(samples), res0)]
[docs]def qcut_processing_fn_mc( results: Sequence, communication_graph: MultiDiGraph, settings: np.ndarray, shots: int, classical_processing_fn: callable, ): """ Function to postprocess samples for the :func:`cut_circuit_mc() <pennylane.cut_circuit_mc>` transform. This takes a user-specified classical function to act on bitstrings and generates an expectation value. .. note:: This function is designed for use as part of the sampling-based circuit cutting workflow. Check out the :func:`qml.cut_circuit_mc() <pennylane.cut_circuit_mc>` transform for more details. Args: results (Sequence): a collection of sample-based execution results generated from the random expansion of circuit fragments over measurement and preparation node configurations communication_graph (nx.MultiDiGraph): the communication graph determining connectivity between circuit fragments settings (np.ndarray): Each element is one of 8 unique values that tracks the specific measurement and preparation operations over all configurations. The number of rows is determined by the number of cuts and the number of columns is determined by the number of shots. shots (int): the number of shots classical_processing_fn (callable): A classical postprocessing function to be applied to the reconstructed bitstrings. The expected input is a bitstring; a flat array of length ``wires`` and the output should be a single number within the interval :math:`[-1, 1]`. Returns: float or tensor_like: the expectation value calculated in accordance to Eq. (35) of `Peng et al. <https://arxiv.org/abs/1904.00102>`__ """ res0 = results[0] results = _reshape_results(results, shots) out_degrees = [d for _, d in communication_graph.out_degree] evals = (0.5, 0.5, 0.5, -0.5, 0.5, -0.5, 0.5, -0.5) expvals = [] for result, setting in zip(results, settings.T): sample_terminal = [] sample_mid = [] for fragment_result, out_degree in zip(result, out_degrees): sample_terminal.append(fragment_result[: -out_degree or None]) sample_mid.append(fragment_result[-out_degree or len(fragment_result) :]) sample_terminal = np.hstack(sample_terminal) sample_mid = np.hstack(sample_mid) assert set(sample_terminal).issubset({np.array(0), np.array(1)}) assert set(sample_mid).issubset({np.array(-1), np.array(1)}) # following Eq.(35) of Peng et.al: https://arxiv.org/abs/1904.00102 f = classical_processing_fn(sample_terminal) if not -1 <= f <= 1: raise ValueError( "The classical processing function supplied must " "give output in the interval [-1, 1]" ) sigma_s = np.prod(sample_mid) t_s = f * sigma_s c_s = np.prod([evals[s] for s in setting]) K = len(sample_mid) expvals.append(8**K * c_s * t_s) return qml.math.convert_like(np.mean(expvals), res0)
[docs]@batch_transform def cut_circuit_mc( tape: QuantumTape, classical_processing_fn: Optional[callable] = None, auto_cutter: Union[bool, Callable] = False, max_depth: int = 1, shots: Optional[int] = None, device_wires: Optional[Wires] = None, **kwargs, ) -> Tuple[Tuple[QuantumTape], Callable]: """ Cut up a circuit containing sample measurements into smaller fragments using a Monte Carlo method. Following the approach of `Peng et al. <https://journals.aps.org/prl/abstract/10.1103/PhysRevLett.125.150504>`__, strategic placement of :class:`~.WireCut` operations can allow a quantum circuit to be split into disconnected circuit fragments. A circuit containing sample measurements can be cut and processed using Monte Carlo (MC) methods. This transform employs MC methods to allow for sampled measurement outcomes to be recombined to full bitstrings and, if a classical processing function is supplied, an expectation value will be evaluated. Args: tape (QuantumTape): the tape of the full circuit to be cut classical_processing_fn (callable): A classical postprocessing function to be applied to the reconstructed bitstrings. The expected input is a bitstring; a flat array of length ``wires``, and the output should be a single number within the interval :math:`[-1, 1]`. If not supplied, the transform will output samples. auto_cutter (Union[bool, Callable]): Toggle for enabling automatic cutting with the default :func:`~.kahypar_cut` partition method. Can also pass a graph partitioning function that takes an input graph and returns a list of edges to be cut based on a given set of constraints and objective. The default :func:`~.kahypar_cut` function requires KaHyPar to be installed using ``pip install kahypar`` for Linux and Mac users or visiting the instructions `here <https://kahypar.org>`__ to compile from source for Windows users. max_depth (int): The maximum depth used to expand the circuit while searching for wire cuts. Only applicable when transforming a QNode. shots (int): Number of shots. When transforming a QNode, this argument is set by the device's ``shots`` value or at QNode call time (if provided). Required when transforming a tape. device_wires (Wires): Wires of the device that the cut circuits are to be run on. When transforming a QNode, this argument is optional and will be set to the QNode's device wires. Required when transforming a tape. kwargs: Additional keyword arguments to be passed to a callable ``auto_cutter`` argument. For the default KaHyPar cutter, please refer to the docstring of functions :func:`~.find_and_place_cuts` and :func:`~.kahypar_cut` for the available arguments. Returns: Callable: Function which accepts the same arguments as the QNode. When called, this function will sample from the partitioned circuit fragments and combine the results using a Monte Carlo method. **Example** The following :math:`3`-qubit circuit contains a :class:`~.WireCut` operation and a :func:`~.sample` measurement. When decorated with ``@qml.cut_circuit_mc``, we can cut the circuit into two :math:`2`-qubit fragments: .. code-block:: python dev = qml.device("default.qubit", wires=2, shots=1000) @qml.cut_circuit_mc @qml.qnode(dev) def circuit(x): qml.RX(0.89, wires=0) qml.RY(0.5, wires=1) qml.RX(1.3, wires=2) qml.CNOT(wires=[0, 1]) qml.WireCut(wires=1) qml.CNOT(wires=[1, 2]) qml.RX(x, wires=0) qml.RY(0.7, wires=1) qml.RX(2.3, wires=2) return qml.sample(wires=[0, 2]) we can then execute the circuit as usual by calling the QNode: >>> x = 0.3 >>> circuit(x) tensor([[1, 1], [0, 1], [0, 1], ..., [0, 1], [0, 1], [0, 1]], requires_grad=True) Furthermore, the number of shots can be temporarily altered when calling the qnode: >>> results = circuit(x, shots=123) >>> results.shape (123, 2) Alternatively, if the optimal wire-cut placement is unknown for an arbitrary circuit, the ``auto_cutter`` option can be enabled to make attempts in finding such a optimal cut. The following examples shows this capability on the same circuit as above but with the :class:`~.WireCut` removed: .. code-block:: python @qml.cut_circuit_mc(auto_cutter=True) @qml.qnode(dev) def circuit(x): qml.RX(0.89, wires=0) qml.RY(0.5, wires=1) qml.RX(1.3, wires=2) qml.CNOT(wires=[0, 1]) qml.CNOT(wires=[1, 2]) qml.RX(x, wires=0) qml.RY(0.7, wires=1) qml.RX(2.3, wires=2) return qml.sample(wires=[0, 2]) >>> results = circuit(x, shots=123) >>> results.shape (123, 2) .. details:: :title: Usage Details Manually placing :class:`~.WireCut` operations and decorating the QNode with the ``cut_circuit_mc()`` batch transform is the suggested entrypoint into sampling-based circuit cutting using the Monte Carlo method. However, advanced users also have the option to work directly with a :class:`~.QuantumTape` and manipulate the tape to perform circuit cutting using the below functionality: .. autosummary:: :toctree: ~transforms.qcut.tape_to_graph ~transforms.qcut.find_and_place_cuts ~transforms.qcut.replace_wire_cut_nodes ~transforms.qcut.fragment_graph ~transforms.qcut.graph_to_tape ~transforms.qcut.remap_tape_wires ~transforms.qcut.expand_fragment_tapes_mc ~transforms.qcut.qcut_processing_fn_sample ~transforms.qcut.qcut_processing_fn_mc The following shows how these elementary steps are combined as part of the ``cut_circuit_mc()`` transform. Consider the circuit below: .. code-block:: python np.random.seed(42) with qml.tape.QuantumTape() as tape: qml.Hadamard(wires=0) qml.CNOT(wires=[0, 1]) qml.PauliX(wires=1) qml.WireCut(wires=1) qml.CNOT(wires=[1, 2]) qml.sample(wires=[0, 1, 2]) >>> print(tape.draw()) 0: ──H─╭●───────────┤ ╭Sample 1: ────╰X──X──//─╭●─┤ ├Sample 2: ──────────────╰X─┤ ╰Sample To cut the circuit, we first convert it to its graph representation: >>> graph = qml.transforms.qcut.tape_to_graph(tape) If, however, the optimal location of the :class:`~.WireCut` is unknown, we can use :func:`~.find_and_place_cuts` to make attempts in automatically finding such a cut given the device constraints. Using the same circuit as above but with the :class:`~.WireCut` removed, a slightly different cut with identical cost can be discovered and placed into the circuit with automatic cutting: .. code-block:: python with qml.tape.QuantumTape() as uncut_tape: qml.Hadamard(wires=0) qml.CNOT(wires=[0, 1]) qml.PauliX(wires=1) qml.CNOT(wires=[1, 2]) qml.sample(wires=[0, 1, 2]) >>> cut_graph = qml.transforms.qcut.find_and_place_cuts( ... graph=qml.transforms.qcut.tape_to_graph(uncut_tape), ... cut_strategy=qml.transforms.qcut.CutStrategy(max_free_wires=2), ... ) >>> print(qml.transforms.qcut.graph_to_tape(cut_graph).draw()) 0: ──H─╭●───────────┤ Sample[|1⟩⟨1|] 1: ────╰X──//──X─╭●─┤ Sample[|1⟩⟨1|] 2: ──────────────╰X─┤ Sample[|1⟩⟨1|] Our next step, using the original manual cut placement, is to remove the :class:`~.WireCut` nodes in the graph and replace with :class:`~.MeasureNode` and :class:`~.PrepareNode` pairs. >>> qml.transforms.qcut.replace_wire_cut_nodes(graph) The :class:`~.MeasureNode` and :class:`~.PrepareNode` pairs are placeholder operations that allow us to cut the circuit graph and then randomly select measurement and preparation configurations at cut locations. First, the :func:`~.fragment_graph` function pulls apart the graph into disconnected components as well as returning the `communication_graph <https://en.wikipedia.org/wiki/Quotient_graph>`__ detailing the connectivity between the components. >>> fragments, communication_graph = qml.transforms.qcut.fragment_graph(graph) We now convert the ``fragments`` back to :class:`~.QuantumTape` objects >>> fragment_tapes = [qml.transforms.qcut.graph_to_tape(f) for f in fragments] The circuit fragments can now be visualized: >>> print(fragment_tapes[0].draw()) 0: ──H─╭●─────────────────┤ Sample[|1⟩⟨1|] 1: ────╰X──X──MeasureNode─┤ >>> print(fragment_tapes[1].draw()) 1: ──PrepareNode─╭●─┤ Sample[|1⟩⟨1|] 2: ──────────────╰X─┤ Sample[|1⟩⟨1|] Additionally, we must remap the tape wires to match those available on our device. >>> dev = qml.device("default.qubit", wires=2, shots=1) >>> fragment_tapes = [ ... qml.transforms.qcut.remap_tape_wires(t, dev.wires) for t in fragment_tapes ... ] Note that the number of shots on the device is set to :math:`1` here since we will only require one execution per fragment configuration. In the following steps we introduce a shots value that will determine the number of fragment configurations. When using the ``cut_circuit_mc()`` decorator with a QNode, this shots value is automatically inferred from the provided device. Next, each circuit fragment is randomly expanded over :class:`~.MeasureNode` and :class:`~.PrepareNode` configurations. For each pair, a measurement is sampled from the Pauli basis and a state preparation is sampled from the corresponding pair of eigenstates. A settings array is also given which tracks the configuration pairs. Since each of the 4 measurements has 2 possible eigenvectors, all configurations can be uniquely identified by 8 values. The number of rows is determined by the number of cuts and the number of columns is determined by the number of shots. >>> shots = 3 >>> configurations, settings = qml.transforms.qcut.expand_fragment_tapes_mc( ... fragment_tapes, communication_graph, shots=shots ... ) >>> tapes = tuple(tape for c in configurations for tape in c) >>> settings tensor([[6, 3, 4]], requires_grad=True) Each configuration is drawn below: >>> for t in tapes: ... print(qml.drawer.tape_text(t)) ... print("") .. code-block:: 0: ──H─╭●────┤ Sample[|1⟩⟨1|] 1: ────╰X──X─┤ Sample[Z] 0: ──H─╭●────┤ Sample[|1⟩⟨1|] 1: ────╰X──X─┤ Sample[X] 0: ──H─╭●────┤ Sample[|1⟩⟨1|] 1: ────╰X──X─┤ Sample[Y] 0: ──I─╭●─┤ Sample[|1⟩⟨1|] 1: ────╰X─┤ Sample[|1⟩⟨1|] 0: ──X──S─╭●─┤ Sample[|1⟩⟨1|] 1: ───────╰X─┤ Sample[|1⟩⟨1|] 0: ──H─╭●─┤ Sample[|1⟩⟨1|] 1: ────╰X─┤ Sample[|1⟩⟨1|] The last step is to execute the tapes and postprocess the results using :func:`~.qcut_processing_fn_sample`, which processes the results to approximate the original full circuit output bitstrings. >>> results = qml.execute(tapes, dev, gradient_fn=None) >>> qml.transforms.qcut.qcut_processing_fn_sample( ... results, ... communication_graph, ... shots=shots, ... ) [array([[0., 0., 0.], [1., 0., 0.], [1., 0., 0.]])] Alternatively, it is possible to calculate an expectation value if a classical processing function is provided that will accept the reconstructed circuit bitstrings and return a value in the interval :math:`[-1, 1]`: .. code-block:: def fn(x): if x[0] == 0: return 1 if x[0] == 1: return -1 >>> qml.transforms.qcut.qcut_processing_fn_mc( ... results, ... communication_graph, ... settings, ... shots, ... fn ... ) array(-4.) Using the Monte Carlo approach of [Peng et. al](https://arxiv.org/abs/1904.00102), the `cut_circuit_mc` transform also supports returning sample-based expectation values of observables that are diagonal in the computational basis, as shown below for a `ZZ` measurement on wires `0` and `2`: .. code-block:: dev = qml.device("default.qubit", wires=2, shots=10000) def observable(bitstring): return (-1) ** np.sum(bitstring) @qml.cut_circuit_mc(classical_processing_fn=observable) @qml.qnode(dev) def circuit(x): qml.RX(0.89, wires=0) qml.RY(0.5, wires=1) qml.RX(1.3, wires=2) qml.CNOT(wires=[0, 1]) qml.WireCut(wires=1) qml.CNOT(wires=[1, 2]) qml.RX(x, wires=0) qml.RY(0.7, wires=1) qml.RX(2.3, wires=2) return qml.sample(wires=[0, 2]) We can now approximate the expectation value of the observable using >>> circuit(x) tensor(-0.776, requires_grad=True) """ # pylint: disable=unused-argument, too-many-arguments if len(tape.measurements) != 1: raise ValueError( "The Monte Carlo circuit cutting workflow only supports circuits " "with a single output measurement" ) if not all(m.return_type is Sample for m in tape.measurements): raise ValueError( "The Monte Carlo circuit cutting workflow only supports circuits " "with sampling-based measurements" ) for meas in tape.measurements: if meas.obs is not None: raise ValueError( "The Monte Carlo circuit cutting workflow only " "supports measurements in the computational basis. Please only specify " "wires to be sampled within qml.sample(), do not pass observables." ) g = tape_to_graph(tape) if auto_cutter is True or callable(auto_cutter): cut_strategy = kwargs.pop("cut_strategy", None) or CutStrategy( max_free_wires=len(device_wires) ) g = find_and_place_cuts( graph=g, cut_method=auto_cutter if callable(auto_cutter) else kahypar_cut, cut_strategy=cut_strategy, **kwargs, ) replace_wire_cut_nodes(g) fragments, communication_graph = fragment_graph(g) fragment_tapes = [graph_to_tape(f) for f in fragments] fragment_tapes = [qml.map_wires(t, dict(zip(t.wires, device_wires))) for t in fragment_tapes] configurations, settings = expand_fragment_tapes_mc( fragment_tapes, communication_graph, shots=shots ) tapes = tuple(tape for c in configurations for tape in c) if classical_processing_fn: return tapes, partial( qcut_processing_fn_mc, communication_graph=communication_graph, settings=settings, shots=shots, classical_processing_fn=classical_processing_fn, ) return tapes, partial( qcut_processing_fn_sample, communication_graph=communication_graph, shots=shots )
@cut_circuit_mc.custom_qnode_wrapper def qnode_execution_wrapper_mc(self, qnode, targs, tkwargs): """Here, we overwrite the QNode execution wrapper in order to replace execution variables""" transform_max_diff = tkwargs.pop("max_diff", None) tkwargs.setdefault("device_wires", qnode.device.wires) if "shots" in inspect.signature(qnode.func).parameters: raise ValueError( "Detected 'shots' as an argument of the quantum function to transform. " "The 'shots' argument name is reserved for overriding the number of shots " "taken by the device." ) def _wrapper(*args, **kwargs): if tkwargs.get("shots", False): raise ValueError( "Cannot provide a 'shots' value directly to the cut_circuit_mc " "decorator when transforming a QNode. Please provide the number of shots in " "the device or when calling the QNode." ) shots = kwargs.pop("shots", False) shots = shots or qnode.device.shots if shots is None: raise ValueError( "A shots value must be provided in the device " "or when calling the QNode to be cut" ) qnode.construct(args, kwargs) tapes, processing_fn = self.construct(qnode.qtape, *targs, **tkwargs, shots=shots) interface = qnode.interface execute_kwargs = getattr(qnode, "execute_kwargs", {}).copy() max_diff = execute_kwargs.pop("max_diff", 2) max_diff = transform_max_diff or max_diff gradient_fn = getattr(qnode, "gradient_fn", qnode.diff_method) gradient_kwargs = getattr(qnode, "gradient_kwargs", {}) if interface is None or not self.differentiable: gradient_fn = None execute_kwargs["cache"] = False res = qml.execute( tapes, device=qnode.device, gradient_fn=gradient_fn, interface=interface, max_diff=max_diff, override_shots=1, gradient_kwargs=gradient_kwargs, **execute_kwargs, ) out = processing_fn(res) if isinstance(out, list) and len(out) == 1: return out[0] return out return _wrapper def _get_symbol(i): """Finds the i-th ASCII symbol. Works for lowercase and uppercase letters, allowing i up to 51.""" if i >= len(string.ascii_letters): raise ValueError( "Set the use_opt_einsum argument to True when applying more than " f"{len(string.ascii_letters)} wire cuts to a circuit" ) return string.ascii_letters[i] # pylint: disable=too-many-branches def contract_tensors( tensors: Sequence, communication_graph: MultiDiGraph, prepare_nodes: Sequence[Sequence[PrepareNode]], measure_nodes: Sequence[Sequence[MeasureNode]], use_opt_einsum: bool = False, ): r"""Contract tensors according to the edges specified in the communication graph. .. note:: This function is designed for use as part of the circuit cutting workflow. Check out the :func:`qml.cut_circuit() <pennylane.cut_circuit>` transform for more details. Consider the three tensors :math:`T^{(1)}`, :math:`T^{(2)}`, and :math:`T^{(3)}`, along with their contraction equation .. math:: \sum_{ijklmn} T^{(1)}_{ij,km} T^{(2)}_{kl,in} T^{(3)}_{mn,jl} Each tensor is the result of the tomography of a circuit fragment and has some indices corresponding to state preparations (marked by the indices before the comma) and some indices corresponding to measurements (marked by the indices after the comma). An equivalent representation of the contraction equation is to use a directed multigraph known as the communication/quotient graph. In the communication graph, each tensor is assigned a node and edges are added between nodes to mark a contraction along an index. The communication graph resulting from the above contraction equation is a complete directed graph. In the communication graph provided by :func:`fragment_graph`, edges are composed of :class:`PrepareNode` and :class:`MeasureNode` pairs. To correctly map back to the contraction equation, we must keep track of the order of preparation and measurement indices in each tensor. This order is specified in the ``prepare_nodes`` and ``measure_nodes`` arguments. Args: tensors (Sequence): the tensors to be contracted communication_graph (nx.MultiDiGraph): the communication graph determining connectivity between the tensors prepare_nodes (Sequence[Sequence[PrepareNode]]): a sequence of size ``len(communication_graph.nodes)`` that determines the order of preparation indices in each tensor measure_nodes (Sequence[Sequence[MeasureNode]]): a sequence of size ``len(communication_graph.nodes)`` that determines the order of measurement indices in each tensor use_opt_einsum (bool): Determines whether to use the `opt_einsum <https://dgasmith.github.io/opt_einsum/>`__ package. This package is useful for faster tensor contractions of large networks but must be installed separately using, e.g., ``pip install opt_einsum``. Both settings for ``use_opt_einsum`` result in a differentiable contraction. Returns: float or tensor_like: the result of contracting the tensor network **Example** We first set up the tensors and their corresponding :class:`~.PrepareNode` and :class:`~.MeasureNode` orderings: .. code-block:: python from pennylane.transforms import qcut import networkx as nx import numpy as np tensors = [np.arange(4), np.arange(4, 8)] prep = [[], [qcut.PrepareNode(wires=0)]] meas = [[qcut.MeasureNode(wires=0)], []] The communication graph describing edges in the tensor network must also be constructed: .. code-block:: python graph = nx.MultiDiGraph([(0, 1, {"pair": (meas[0][0], prep[1][0])})]) The network can then be contracted using: >>> qml.transforms.qcut.contract_tensors(tensors, graph, prep, meas) 38 """ # pylint: disable=import-outside-toplevel if use_opt_einsum: try: from opt_einsum import contract, get_symbol except ImportError as e: raise ImportError( "The opt_einsum package is required when use_opt_einsum is set to " "True in the contract_tensors function. This package can be " "installed using:\npip install opt_einsum" ) from e else: contract = qml.math.einsum get_symbol = _get_symbol ctr = 0 tensor_indxs = [""] * len(communication_graph.nodes) meas_map = {} for i, (node, prep) in enumerate(zip(communication_graph.nodes, prepare_nodes)): predecessors = communication_graph.pred[node] for p in prep: for _, pred_edges in predecessors.items(): for pred_edge in pred_edges.values(): meas_op, prep_op = pred_edge["pair"] if p.id is prep_op.id: symb = get_symbol(ctr) ctr += 1 tensor_indxs[i] += symb meas_map[meas_op] = symb for i, (node, meas) in enumerate(zip(communication_graph.nodes, measure_nodes)): successors = communication_graph.succ[node] for m in meas: for _, succ_edges in successors.items(): for succ_edge in succ_edges.values(): meas_op, _ = succ_edge["pair"] if m.id is meas_op.id: symb = meas_map[meas_op] tensor_indxs[i] += symb eqn = ",".join(tensor_indxs) kwargs = {} if use_opt_einsum else {"like": tensors[0]} return contract(eqn, *tensors, **kwargs) CHANGE_OF_BASIS = qml.math.array( [[1.0, 1.0, 0.0, 0.0], [-1.0, -1.0, 2.0, 0.0], [-1.0, -1.0, 0.0, 2.0], [1.0, -1.0, 0.0, 0.0]] ) def _process_tensor(results, n_prep: int, n_meas: int): """Convert a flat slice of an individual circuit fragment's execution results into a tensor. This function performs the following steps: 1. Reshapes ``results`` into the intermediate shape ``(4,) * n_prep + (4**n_meas,)`` 2. Shuffles the final axis to follow the standard product over measurement settings. E.g., for ``n_meas = 2`` the standard product is: II, IX, IY, IZ, XI, ..., ZY, ZZ while the input order will be the result of ``qml.pauli.partition_pauli_group(2)``, i.e., II, IZ, ZI, ZZ, ..., YY. 3. Reshapes into the final target shape ``(4,) * (n_prep + n_meas)`` 4. Performs a change of basis for the preparation indices (the first ``n_prep`` indices) from the |0>, |1>, |+>, |+i> basis to the I, X, Y, Z basis using ``CHANGE_OF_BASIS``. Args: results (tensor_like): the input execution results n_prep (int): the number of preparation nodes in the corresponding circuit fragment n_meas (int): the number of measurement nodes in the corresponding circuit fragment Returns: tensor_like: the corresponding fragment tensor """ n = n_prep + n_meas dim_meas = 4**n_meas # Step 1 intermediate_shape = (4,) * n_prep + (dim_meas,) intermediate_tensor = qml.math.reshape(results, intermediate_shape) # Step 2 grouped = qml.pauli.partition_pauli_group(n_meas) grouped_flat = [term for group in grouped for term in group] order = qml.math.argsort(grouped_flat) if qml.math.get_interface(intermediate_tensor) == "tensorflow": # TensorFlow does not support slicing intermediate_tensor = qml.math.gather(intermediate_tensor, order, axis=-1) else: sl = [slice(None)] * n_prep + [order] intermediate_tensor = intermediate_tensor[tuple(sl)] # Step 3 final_shape = (4,) * n final_tensor = qml.math.reshape(intermediate_tensor, final_shape) # Step 4 change_of_basis = qml.math.convert_like(CHANGE_OF_BASIS, intermediate_tensor) for i in range(n_prep): axes = [[1], [i]] final_tensor = qml.math.tensordot(change_of_basis, final_tensor, axes=axes) axes = list(reversed(range(n_prep))) + list(range(n_prep, n)) # Use transpose to reorder indices. We must do this because tensordot returns a tensor whose # indices are ordered according to the uncontracted indices of the first tensor, followed # by the uncontracted indices of the second tensor. For example, calculating C_kj T_ij returns # a tensor T'_ki rather than T'_ik. final_tensor = qml.math.transpose(final_tensor, axes=axes) final_tensor *= qml.math.power(2, -(n_meas + n_prep) / 2) return final_tensor def _to_tensors( results, prepare_nodes: Sequence[Sequence[PrepareNode]], measure_nodes: Sequence[Sequence[MeasureNode]], ) -> List: """Process a flat list of execution results from all circuit fragments into the corresponding tensors. This function slices ``results`` according to the expected size of fragment tensors derived from the ``prepare_nodes`` and ``measure_nodes`` and then passes onto ``_process_tensor`` for further transformation. Args: results (tensor_like): A collection of execution results, provided as a flat tensor, corresponding to the expansion of circuit fragments in the communication graph over measurement and preparation node configurations. These results are processed into tensors by this function. prepare_nodes (Sequence[Sequence[PrepareNode]]): a sequence whose length is equal to the number of circuit fragments, with each element used here to determine the number of preparation nodes in a given fragment measure_nodes (Sequence[Sequence[MeasureNode]]): a sequence whose length is equal to the number of circuit fragments, with each element used here to determine the number of measurement nodes in a given fragment Returns: List[tensor_like]: the tensors for each circuit fragment in the communication graph """ ctr = 0 tensors = [] for p, m in zip(prepare_nodes, measure_nodes): n_prep = len(p) n_meas = len(m) n = n_prep + n_meas dim = 4**n results_slice = results[ctr : dim + ctr] tensors.append(_process_tensor(results_slice, n_prep, n_meas)) ctr += dim if results.shape[0] != ctr: raise ValueError(f"The results argument should be a flat list of length {ctr}") return tensors
[docs]def qcut_processing_fn( results: Sequence[Sequence], communication_graph: MultiDiGraph, prepare_nodes: Sequence[Sequence[PrepareNode]], measure_nodes: Sequence[Sequence[MeasureNode]], use_opt_einsum: bool = False, ): """Processing function for the :func:`cut_circuit() <pennylane.cut_circuit>` transform. .. note:: This function is designed for use as part of the circuit cutting workflow. Check out the :func:`qml.cut_circuit() <pennylane.cut_circuit>` transform for more details. Args: results (Sequence[Sequence]): A collection of execution results generated from the expansion of circuit fragments over measurement and preparation node configurations. These results are processed into tensors and then contracted. communication_graph (nx.MultiDiGraph): the communication graph determining connectivity between circuit fragments prepare_nodes (Sequence[Sequence[PrepareNode]]): a sequence of size ``len(communication_graph.nodes)`` that determines the order of preparation indices in each tensor measure_nodes (Sequence[Sequence[MeasureNode]]): a sequence of size ``len(communication_graph.nodes)`` that determines the order of measurement indices in each tensor use_opt_einsum (bool): Determines whether to use the `opt_einsum <https://dgasmith.github.io/opt_einsum/>`__ package. This package is useful for faster tensor contractions of large networks but must be installed separately using, e.g., ``pip install opt_einsum``. Both settings for ``use_opt_einsum`` result in a differentiable contraction. Returns: float or tensor_like: the output of the original uncut circuit arising from contracting the tensor network of circuit fragments """ flat_results = qml.math.concatenate(results) tensors = _to_tensors(flat_results, prepare_nodes, measure_nodes) result = contract_tensors( tensors, communication_graph, prepare_nodes, measure_nodes, use_opt_einsum ) return result
[docs]@batch_transform def cut_circuit( tape: QuantumTape, auto_cutter: Union[bool, Callable] = False, use_opt_einsum: bool = False, device_wires: Optional[Wires] = None, max_depth: int = 1, **kwargs, ) -> Tuple[Tuple[QuantumTape], Callable]: """ Cut up a quantum circuit into smaller circuit fragments. Following the approach outlined in Theorem 2 of `Peng et al. <https://journals.aps.org/prl/abstract/10.1103/PhysRevLett.125.150504>`__, strategic placement of :class:`~.WireCut` operations can allow a quantum circuit to be split into disconnected circuit fragments. Each circuit fragment is then executed multiple times by varying the state preparations and measurements at incoming and outgoing cut locations, respectively, resulting in a process tensor describing the action of the fragment. The process tensors are then contracted to provide the result of the original uncut circuit. .. note:: Only circuits that return a single expectation value are supported. Args: tape (QuantumTape): the tape of the full circuit to be cut auto_cutter (Union[bool, Callable]): Toggle for enabling automatic cutting with the default :func:`~.kahypar_cut` partition method. Can also pass a graph partitioning function that takes an input graph and returns a list of edges to be cut based on a given set of constraints and objective. The default :func:`~.kahypar_cut` function requires KaHyPar to be installed using ``pip install kahypar`` for Linux and Mac users or visiting the instructions `here <https://kahypar.org>`__ to compile from source for Windows users. use_opt_einsum (bool): Determines whether to use the `opt_einsum <https://dgasmith.github.io/opt_einsum/>`__ package. This package is useful for faster tensor contractions of large networks but must be installed separately using, e.g., ``pip install opt_einsum``. Both settings for ``use_opt_einsum`` result in a differentiable contraction. device_wires (Wires): Wires of the device that the cut circuits are to be run on. When transforming a QNode, this argument is optional and will be set to the QNode's device wires. Required when transforming a tape. max_depth (int): The maximum depth used to expand the circuit while searching for wire cuts. Only applicable when transforming a QNode. kwargs: Additional keyword arguments to be passed to a callable ``auto_cutter`` argument. For the default KaHyPar cutter, please refer to the docstring of functions :func:`~.find_and_place_cuts` and :func:`~.kahypar_cut` for the available arguments. Returns: Callable: Function which accepts the same arguments as the QNode. When called, this function will perform a process tomography of the partitioned circuit fragments and combine the results via tensor contractions. **Example** The following :math:`3`-qubit circuit contains a :class:`~.WireCut` operation. When decorated with ``@qml.cut_circuit``, we can cut the circuit into two :math:`2`-qubit fragments: .. code-block:: python dev = qml.device("default.qubit", wires=2) @qml.cut_circuit @qml.qnode(dev) def circuit(x): qml.RX(x, wires=0) qml.RY(0.9, wires=1) qml.RX(0.3, wires=2) qml.CZ(wires=[0, 1]) qml.RY(-0.4, wires=0) qml.WireCut(wires=1) qml.CZ(wires=[1, 2]) return qml.expval(qml.pauli.string_to_pauli_word("ZZZ")) Executing ``circuit`` will run multiple configurations of the :math:`2`-qubit fragments which are then postprocessed to give the result of the original circuit: >>> x = np.array(0.531, requires_grad=True) >>> circuit(x) 0.47165198882111165 Futhermore, the output of the cut circuit is also differentiable: >>> qml.grad(circuit)(x) -0.276982865449393 Alternatively, if the optimal wire-cut placement is unknown for an arbitrary circuit, the ``auto_cutter`` option can be enabled to make attempts in finding such an optimal cut. The following examples shows this capability on the same circuit as above but with the :class:`~.WireCut` removed: .. code-block:: python @qml.cut_circuit(auto_cutter=True) @qml.qnode(dev) def circuit(x): qml.RX(x, wires=0) qml.RY(0.9, wires=1) qml.RX(0.3, wires=2) qml.CZ(wires=[0, 1]) qml.RY(-0.4, wires=0) qml.CZ(wires=[1, 2]) return qml.expval(qml.pauli.string_to_pauli_word("ZZZ")) >>> x = np.array(0.531, requires_grad=True) >>> circuit(x) 0.47165198882111165 >>> qml.grad(circuit)(x) -0.276982865449393 .. details:: :title: Usage Details Manually placing :class:`~.WireCut` operations and decorating the QNode with the ``cut_circuit()`` batch transform is the suggested entrypoint into circuit cutting. However, advanced users also have the option to work directly with a :class:`~.QuantumTape` and manipulate the tape to perform circuit cutting using the below functionality: .. autosummary:: :toctree: ~transforms.qcut.tape_to_graph ~transforms.qcut.find_and_place_cuts ~transforms.qcut.replace_wire_cut_nodes ~transforms.qcut.fragment_graph ~transforms.qcut.graph_to_tape ~transforms.qcut.remap_tape_wires ~transforms.qcut.expand_fragment_tape ~transforms.qcut.qcut_processing_fn ~transforms.qcut.CutStrategy The following shows how these elementary steps are combined as part of the ``cut_circuit()`` transform. Consider the circuit below: .. code-block:: python with qml.tape.QuantumTape() as tape: qml.RX(0.531, wires=0) qml.RY(0.9, wires=1) qml.RX(0.3, wires=2) qml.CZ(wires=[0, 1]) qml.RY(-0.4, wires=0) qml.WireCut(wires=1) qml.CZ(wires=[1, 2]) qml.expval(qml.pauli.string_to_pauli_word("ZZZ")) >>> print(qml.drawer.tape_text(tape)) 0: ──RX─╭●──RY────┤ ╭<[email protected]@Z> 1: ──RY─╰Z──//─╭●─┤ ├<[email protected]@Z> 2: ──RX────────╰Z─┤ ╰<[email protected]@Z> To cut the circuit, we first convert it to its graph representation: >>> graph = qml.transforms.qcut.tape_to_graph(tape) .. figure:: ../../_static/qcut_graph.svg :align: center :width: 60% :target: javascript:void(0); If, however, the optimal location of the :class:`~.WireCut` is unknown, we can use :func:`~.find_and_place_cuts` to make attempts in automatically finding such a cut given the device constraints. Using the same circuit as above but with the :class:`~.WireCut` removed, the same (optimal) cut can be recovered with automatic cutting: .. code-block:: python with qml.tape.QuantumTape() as uncut_tape: qml.RX(0.531, wires=0) qml.RY(0.9, wires=1) qml.RX(0.3, wires=2) qml.CZ(wires=[0, 1]) qml.RY(-0.4, wires=0) qml.CZ(wires=[1, 2]) qml.expval(qml.pauli.string_to_pauli_word("ZZZ")) >>> cut_graph = qml.transforms.qcut.find_and_place_cuts( graph = qml.transforms.qcut.tape_to_graph(uncut_tape), cut_strategy = qml.transforms.qcut.CutStrategy(max_free_wires=2), ) >>> print(qml.transforms.qcut.graph_to_tape(cut_graph).draw()) 0: ──RX─╭●──RY────┤ ╭<[email protected]@Z> 1: ──RY─╰Z──//─╭●─┤ ├<[email protected]@Z> 2: ──RX────────╰Z─┤ ╰<[email protected]@Z> Our next step is to remove the :class:`~.WireCut` nodes in the graph and replace with :class:`~.MeasureNode` and :class:`~.PrepareNode` pairs. >>> qml.transforms.qcut.replace_wire_cut_nodes(graph) The :class:`~.MeasureNode` and :class:`~.PrepareNode` pairs are placeholder operations that allow us to cut the circuit graph and then iterate over measurement and preparation configurations at cut locations. First, the :func:`~.fragment_graph` function pulls apart the graph into disconnected components as well as returning the `communication_graph <https://en.wikipedia.org/wiki/Quotient_graph>`__ detailing the connectivity between the components. >>> fragments, communication_graph = qml.transforms.qcut.fragment_graph(graph) We now convert the ``fragments`` back to :class:`~.QuantumTape` objects >>> fragment_tapes = [qml.transforms.qcut.graph_to_tape(f) for f in fragments] The circuit fragments can now be visualized: >>> print(fragment_tapes[0].draw()) 0: ──RX(0.531)──╭●──RY(-0.4)─────┤ ⟨Z⟩ 1: ──RY(0.9)────╰Z──MeasureNode──┤ >>> print(fragment_tapes[1].draw()) 2: ──RX(0.3)──────╭Z──╭┤ ⟨Z ⊗ Z⟩ 1: ──PrepareNode──╰●──╰┤ ⟨Z ⊗ Z⟩ Additionally, we must remap the tape wires to match those available on our device. >>> dev = qml.device("default.qubit", wires=2) >>> fragment_tapes = [ ... qml.transforms.qcut.remap_tape_wires(t, dev.wires) for t in fragment_tapes ... ] Next, each circuit fragment is expanded over :class:`~.MeasureNode` and :class:`~.PrepareNode` configurations and a flat list of tapes is created: .. code-block:: expanded = [qml.transforms.qcut.expand_fragment_tape(t) for t in fragment_tapes] configurations = [] prepare_nodes = [] measure_nodes = [] for tapes, p, m in expanded: configurations.append(tapes) prepare_nodes.append(p) measure_nodes.append(m) tapes = tuple(tape for c in configurations for tape in c) Each configuration is drawn below: >>> for t in tapes: ... print(qml.drawer.tape_text(t)) .. code-block:: 0: ──RX(0.531)──╭●──RY(-0.4)──╭┤ ⟨Z ⊗ I⟩ ╭┤ ⟨Z ⊗ Z⟩ 1: ──RY(0.9)────╰Z────────────╰┤ ⟨Z ⊗ I⟩ ╰┤ ⟨Z ⊗ Z⟩ 0: ──RX(0.531)──╭●──RY(-0.4)──╭┤ ⟨Z ⊗ X⟩ 1: ──RY(0.9)────╰Z────────────╰┤ ⟨Z ⊗ X⟩ 0: ──RX(0.531)──╭●──RY(-0.4)──╭┤ ⟨Z ⊗ Y⟩ 1: ──RY(0.9)────╰Z────────────╰┤ ⟨Z ⊗ Y⟩ 0: ──RX(0.3)──╭Z──╭┤ ⟨Z ⊗ Z⟩ 1: ──I────────╰●──╰┤ ⟨Z ⊗ Z⟩ 0: ──RX(0.3)──╭Z──╭┤ ⟨Z ⊗ Z⟩ 1: ──X────────╰●──╰┤ ⟨Z ⊗ Z⟩ 0: ──RX(0.3)──╭Z──╭┤ ⟨Z ⊗ Z⟩ 1: ──H────────╰●──╰┤ ⟨Z ⊗ Z⟩ 0: ──RX(0.3)─────╭Z──╭┤ ⟨Z ⊗ Z⟩ 1: ──H────────S──╰●──╰┤ ⟨Z ⊗ Z⟩ The last step is to execute the tapes and postprocess the results using :func:`~.qcut_processing_fn`, which processes the results to the original full circuit output via a tensor network contraction >>> results = qml.execute(tapes, dev, gradient_fn=None) >>> qml.transforms.qcut.qcut_processing_fn( ... results, ... communication_graph, ... prepare_nodes, ... measure_nodes, ... ) 0.47165198882111165 """ # pylint: disable=unused-argument if len(tape.measurements) != 1: raise ValueError( "The circuit cutting workflow only supports circuits with a single output " "measurement" ) if not all(m.return_type is Expectation for m in tape.measurements): raise ValueError( "The circuit cutting workflow only supports circuits with expectation " "value measurements" ) if use_opt_einsum: try: import opt_einsum # pylint: disable=import-outside-toplevel,unused-import except ImportError as e: raise ImportError( "The opt_einsum package is required when use_opt_einsum is set to " "True in the cut_circuit function. This package can be " "installed using:\npip install opt_einsum" ) from e g = tape_to_graph(tape) if auto_cutter is True or callable(auto_cutter): cut_strategy = kwargs.pop("cut_strategy", None) or CutStrategy( max_free_wires=len(device_wires) ) g = find_and_place_cuts( graph=g, cut_method=auto_cutter if callable(auto_cutter) else kahypar_cut, cut_strategy=cut_strategy, **kwargs, ) replace_wire_cut_nodes(g) fragments, communication_graph = fragment_graph(g) fragment_tapes = [graph_to_tape(f) for f in fragments] fragment_tapes = [qml.map_wires(t, dict(zip(t.wires, device_wires))) for t in fragment_tapes] expanded = [expand_fragment_tape(t) for t in fragment_tapes] configurations = [] prepare_nodes = [] measure_nodes = [] for tapes, p, m in expanded: configurations.append(tapes) prepare_nodes.append(p) measure_nodes.append(m) tapes = tuple(tape for c in configurations for tape in c) return tapes, partial( qcut_processing_fn, communication_graph=communication_graph, prepare_nodes=prepare_nodes, measure_nodes=measure_nodes, use_opt_einsum=use_opt_einsum, )
@cut_circuit.custom_qnode_wrapper def qnode_execution_wrapper(self, qnode, targs, tkwargs): """Here, we overwrite the QNode execution wrapper in order to access the device wires.""" # pylint: disable=function-redefined tkwargs.setdefault("device_wires", qnode.device.wires) return self.default_qnode_wrapper(qnode, targs, tkwargs) def _qcut_expand_fn( tape: QuantumTape, max_depth: int = 1, auto_cutter: Union[bool, Callable] = False, ): """Expansion function for circuit cutting. Expands operations until reaching a depth that includes :class:`~.WireCut` operations. """ for op in tape.operations: if isinstance(op, WireCut): return tape if max_depth > 0: return _qcut_expand_fn(tape.expand(), max_depth=max_depth - 1, auto_cutter=auto_cutter) if not (auto_cutter is True or callable(auto_cutter)): raise ValueError( "No WireCut operations found in the circuit. Consider increasing the max_depth value if" " operations or nested tapes contain WireCut operations." ) return tape def _cut_circuit_expand( tape: QuantumTape, use_opt_einsum: bool = False, device_wires: Optional[Wires] = None, max_depth: int = 1, auto_cutter: Union[bool, Callable] = False, **kwargs, ): """Main entry point for expanding operations until reaching a depth that includes :class:`~.WireCut` operations.""" # pylint: disable=unused-argument return _qcut_expand_fn(tape, max_depth, auto_cutter) def _cut_circuit_mc_expand( tape: QuantumTape, classical_processing_fn: Optional[callable] = None, max_depth: int = 1, shots: Optional[int] = None, device_wires: Optional[Wires] = None, auto_cutter: Union[bool, Callable] = False, **kwargs, ): """Main entry point for expanding operations in sample-based tapes until reaching a depth that includes :class:`~.WireCut` operations.""" # pylint: disable=unused-argument, too-many-arguments return _qcut_expand_fn(tape, max_depth, auto_cutter) cut_circuit.expand_fn = _cut_circuit_expand cut_circuit_mc.expand_fn = _cut_circuit_mc_expand
[docs]def remap_tape_wires(tape: QuantumTape, wires: Sequence) -> QuantumTape: """Map the wires of a tape to a new set of wires. Given an :math:`n`-wire ``tape``, this function returns a new :class:`~.QuantumTape` with operations and measurements acting on the first :math:`n` wires provided in the ``wires`` argument. The input ``tape`` is left unmodified. .. note:: This function is designed for use as part of the circuit cutting workflow. Check out the :func:`qml.cut_circuit() <pennylane.cut_circuit>` transform for more details. Args: tape (QuantumTape): the quantum tape whose wires should be remapped wires (Sequence): the new set of wires to map to Returns: QuantumTape: A remapped copy of the input tape Raises: ValueError: if the number of wires in ``tape`` exceeds ``len(wires)`` **Example** Consider the following circuit that operates on wires ``[2, 3]``: .. code-block:: python with qml.tape.QuantumTape() as tape: qml.RX(0.5, wires=2) qml.RY(0.6, wires=3) qml.CNOT(wires=[2, 3]) qml.expval(qml.PauliZ(2) @ qml.PauliZ(3)) We can map from wires ``[2, 3]`` to ``[0, 1]`` using: >>> new_wires = [0, 1] >>> new_tape = qml.transforms.qcut.remap_tape_wires(tape, new_wires) >>> print(qml.drawer.tape_text(new_tape)) 0: ──RX(0.5)──╭●──╭┤ ⟨Z ⊗ Z⟩ 1: ──RY(0.6)──╰X──╰┤ ⟨Z ⊗ Z⟩ """ warnings.warn( "The method remap_tape_wires is deprecated. Use " "qml.map_wires(tape, dict(zip(tape.wires, wires))) instead.", UserWarning, ) if len(tape.wires) > len(wires): raise ValueError( f"Attempting to run a {len(tape.wires)}-wire circuit on a " f"{len(wires)}-wire device. Consider increasing the number of wires in " f"your device." ) wire_map = dict(zip(tape.wires, wires)) copy_ops = [copy.copy(op) for op in tape.operations] copy_meas = [copy.copy(op) for op in tape.measurements] with QuantumTape() as new_tape: for op in copy_ops: qml.map_wires(op, wire_map=wire_map, queue=True) for meas in copy_meas: qml.map_wires(meas, wire_map=wire_map, queue=True) return new_tape
[docs]@dataclass() class CutStrategy: """ A circuit-cutting distribution policy for executing (large) circuits on available (comparably smaller) devices. .. note:: This class is part of a work-in-progress feature to support automatic cut placement in the circuit cutting workflow. Currently only manual placement of cuts is supported, check out the :func:`qml.cut_circuit() <pennylane.cut_circuit>` transform for more details. Args: devices (Union[qml.Device, Sequence[qml.Device]]): Single, or Sequence of, device(s). Optional only when ``max_free_wires`` is provided. max_free_wires (int): Number of wires for the largest available device. Optional only when ``devices`` is provided where it defaults to the maximum number of wires among ``devices``. min_free_wires (int): Number of wires for the smallest available device, or, equivalently, the smallest max fragment-wire-size that the partitioning is allowed to explore. When provided, this parameter will be used to derive an upper-bound to the range of explored number of fragments. Optional, defaults to 2 which corresponds to attempting the most granular partitioning of max 2-wire fragments. num_fragments_probed (Union[int, Sequence[int]]): Single, or 2-Sequence of, number(s) specifying the potential (range of) number of fragments for the partitioner to attempt. Optional, defaults to probing all valid strategies derivable from the circuit and devices. When provided, has precedence over all other arguments affecting partitioning exploration, such as ``max_free_wires``, ``min_free_wires``, or ``exhaustive``. max_free_gates (int): Maximum allowed circuit depth for the deepest available device. Optional, defaults to unlimited depth. min_free_gates (int): Maximum allowed circuit depth for the shallowest available device. Optional, defaults to ``max_free_gates``. imbalance_tolerance (float): The global maximum allowed imbalance for all partition trials. Optional, defaults to unlimited imbalance. Used only if there's a known hard balancing constraint on the partitioning problem. trials_per_probe (int): Number of repeated partitioning trials for a random automatic cutting method to attempt per set of partitioning parameters. For a deterministic cutting method, this can be set to 1. Defaults to 4. **Example** The following cut strategy specifies that a circuit should be cut into between ``2`` to ``5`` fragments, with each fragment having at most ``6`` wires and at least ``4`` wires: >>> cut_strategy = qml.transforms.CutStrategy( ... max_free_wires=6, ... min_free_wires=4, ... num_fragments_probed=(2, 5), ... ) """ # pylint: disable=too-many-arguments, too-many-instance-attributes #: Initialization argument only, used to derive ``max_free_wires`` and ``min_free_wires``. devices: InitVar[Union[qml.Device, Sequence[qml.Device]]] = None #: Number of wires for the largest available device. max_free_wires: int = None #: Number of wires for the smallest available device. min_free_wires: int = None #: The potential (range of) number of fragments for the partitioner to attempt. num_fragments_probed: Union[int, Sequence[int]] = None #: Maximum allowed circuit depth for the deepest available device. max_free_gates: int = None #: Maximum allowed circuit depth for the shallowest available device. min_free_gates: int = None #: The global maximum allowed imbalance for all partition trials. imbalance_tolerance: float = None #: Number of trials to repeat for per set of partition parameters probed. trials_per_probe: int = 4 #: Class attribute, threshold for warning about too many fragments. HIGH_NUM_FRAGMENTS: ClassVar[int] = 20 #: Class attribute, threshold for warning about too many partition attempts. HIGH_PARTITION_ATTEMPTS: ClassVar[int] = 20 def __post_init__( self, devices, ): """Deriving cutting constraints from given devices and parameters.""" self.max_free_wires = self.max_free_wires if isinstance(self.num_fragments_probed, int): self.num_fragments_probed = [self.num_fragments_probed] if isinstance(self.num_fragments_probed, (list, tuple)): self.num_fragments_probed = sorted(self.num_fragments_probed) self.k_lower = self.num_fragments_probed[0] self.k_upper = self.num_fragments_probed[-1] if self.k_lower <= 0: raise ValueError("`num_fragments_probed` must be positive int(s)") else: self.k_lower, self.k_upper = None, None if devices is None and self.max_free_wires is None: raise ValueError("One of arguments `devices` and max_free_wires` must be provided.") if isinstance(devices, qml.Device): devices = (devices,) if devices is not None: if not isinstance(devices, SequenceType) or any( (not isinstance(d, qml.Device) for d in devices) ): raise ValueError( "Argument `devices` must be a list or tuple containing elements of type " "`qml.Device`" ) device_wire_sizes = [len(d.wires) for d in devices] self.max_free_wires = self.max_free_wires or max(device_wire_sizes) self.min_free_wires = self.min_free_wires or min(device_wire_sizes) if (self.imbalance_tolerance is not None) and not ( isinstance(self.imbalance_tolerance, (float, int)) and self.imbalance_tolerance >= 0 ): raise ValueError( "The overall `imbalance_tolerance` is expected to be a non-negative number, " f"got {type(self.imbalance_tolerance)} with value {self.imbalance_tolerance}." ) self.min_free_wires = self.min_free_wires or 1
[docs] def get_cut_kwargs( self, tape_dag: MultiDiGraph, max_wires_by_fragment: Sequence[int] = None, max_gates_by_fragment: Sequence[int] = None, exhaustive: bool = True, ) -> List[Dict[str, Any]]: """Derive the complete set of arguments, based on a given circuit, for passing to a graph partitioner. Args: tape_dag (nx.MultiDiGraph): Graph representing a tape, typically the output of :func:`tape_to_graph`. max_wires_by_fragment (Sequence[int]): User-predetermined list of wire limits by fragment. If supplied, the number of fragments will be derived from it and exploration of other choices will not be made. max_gates_by_fragment (Sequence[int]): User-predetermined list of gate limits by fragment. If supplied, the number of fragments will be derived from it and exploration of other choices will not be made. exhaustive (bool): Toggle for an exhaustive search which will attempt all potentially valid numbers of fragments into which the circuit is partitioned. If ``True``, for a circuit with N gates, N - 1 attempts will be made with ``num_fragments`` ranging from [2, N], i.e. from bi-partitioning to complete partitioning where each fragment has exactly a single gate. Defaults to ``True``. Returns: List[Dict[str, Any]]: A list of minimal kwargs being passed to a graph partitioner method. **Example** Deriving kwargs for a given circuit and feeding them to a custom partitioner, along with extra parameters specified using ``extra_kwargs``: >>> cut_strategy = qcut.CutStrategy(devices=dev) >>> cut_kwargs = cut_strategy.get_cut_kwargs(tape_dag) >>> cut_trials = [ ... my_partition_fn(tape_dag, **kwargs, **extra_kwargs) for kwargs in cut_kwargs ... ] """ wire_depths = {} for g in tape_dag.nodes: if not isinstance(g, WireCut): for w in g.wires: wire_depths[w] = wire_depths.get(w, 0) + 1 / len(g.wires) self._validate_input(max_wires_by_fragment, max_gates_by_fragment) probed_cuts = self._infer_probed_cuts( wire_depths=wire_depths, max_wires_by_fragment=max_wires_by_fragment, max_gates_by_fragment=max_gates_by_fragment, exhaustive=exhaustive, ) return probed_cuts
@staticmethod def _infer_imbalance(k, wire_depths, free_wires, free_gates, imbalance_tolerance=None) -> float: """Helper function for determining best imbalance limit.""" num_wires = len(wire_depths) num_gates = sum(wire_depths.values()) avg_fragment_wires = (num_wires - 1) // k + 1 avg_fragment_gates = (num_gates - 1) // k + 1 if free_wires < avg_fragment_wires: raise ValueError( "`free_wires` should be no less than the average number of wires per fragment. " f"Got {free_wires} >= {avg_fragment_wires} ." ) if free_gates < avg_fragment_gates: raise ValueError( "`free_gates` should be no less than the average number of gates per fragment. " f"Got {free_gates} >= {avg_fragment_gates} ." ) if free_gates > num_gates - k: # Case where gate depth not limited (`-k` since each fragments has to have >= 1 gates): free_gates = num_gates # A small adjustment is added to the imbalance factor to prevents small ks from resulting # in extremely unbalanced fragments. It will heuristically force the smallest fragment size # to be >= 3 if the average fragment size is greater than 5. In other words, tiny fragments # are only allowed when average fragmeng size is small in the first place. balancing_adjustment = 2 if avg_fragment_gates > 5 else 0 free_gates = free_gates - (k - 1 + balancing_adjustment) depth_imbalance = max(wire_depths.values()) * num_wires / num_gates - 1 max_imbalance = free_gates / avg_fragment_gates - 1 imbalance = min(depth_imbalance, max_imbalance) if imbalance_tolerance is not None: imbalance = min(imbalance, imbalance_tolerance) return imbalance @staticmethod def _validate_input( max_wires_by_fragment, max_gates_by_fragment, ): """Helper parameter checker.""" if max_wires_by_fragment is not None: if not isinstance(max_wires_by_fragment, (list, tuple)): raise ValueError( "`max_wires_by_fragment` is expected to be a list or tuple, but got " f"{type(max_gates_by_fragment)}." ) if any(not (isinstance(i, int) and i > 0) for i in max_wires_by_fragment): raise ValueError( "`max_wires_by_fragment` is expected to contain positive integers only." ) if max_gates_by_fragment is not None: if not isinstance(max_gates_by_fragment, (list, tuple)): raise ValueError( "`max_gates_by_fragment` is expected to be a list or tuple, but got " f"{type(max_gates_by_fragment)}." ) if any(not (isinstance(i, int) and i > 0) for i in max_gates_by_fragment): raise ValueError( "`max_gates_by_fragment` is expected to contain positive integers only." ) if max_wires_by_fragment is not None and max_gates_by_fragment is not None: if len(max_wires_by_fragment) != len(max_gates_by_fragment): raise ValueError( "The lengths of `max_wires_by_fragment` and `max_gates_by_fragment` should be " f"equal, but got {len(max_wires_by_fragment)} and {len(max_gates_by_fragment)}." ) def _infer_probed_cuts( self, wire_depths, max_wires_by_fragment=None, max_gates_by_fragment=None, exhaustive=True, ) -> List[Dict[str, Any]]: """ Helper function for deriving the minimal set of best default partitioning constraints for the graph partitioner. Args: num_tape_wires (int): Number of wires in the circuit tape to be partitioned. num_tape_gates (int): Number of gates in the circuit tape to be partitioned. max_wires_by_fragment (Sequence[int]): User-predetermined list of wire limits by fragment. If supplied, the number of fragments will be derived from it and exploration of other choices will not be made. max_gates_by_fragment (Sequence[int]): User-predetermined list of gate limits by fragment. If supplied, the number of fragments will be derived from it and exploration of other choices will not be made. exhaustive (bool): Toggle for an exhaustive search which will attempt all potentially valid numbers of fragments into which the circuit is partitioned. If ``True``, ``num_tape_gates - 1`` attempts will be made with ``num_fragments`` ranging from [2, ``num_tape_gates``], i.e. from bi-partitioning to complete partitioning where each fragment has exactly a single gate. Defaults to ``True``. Returns: List[Dict[str, Any]]: A list of minimal set of kwargs being passed to a graph partitioner method. """ num_tape_wires = len(wire_depths) num_tape_gates = int(sum(wire_depths.values())) # Assumes unlimited width/depth if not supplied. max_free_wires = self.max_free_wires or num_tape_wires max_free_gates = self.max_free_gates or num_tape_gates # Assumes same number of wires/gates across all devices if min_free_* not provided. min_free_wires = self.min_free_wires or max_free_wires min_free_gates = self.min_free_gates or max_free_gates # The lower bound of k corresponds to executing each fragment on the largest available # device. k_lb = 1 + max( (num_tape_wires - 1) // max_free_wires, # wire limited (num_tape_gates - 1) // max_free_gates, # gate limited ) # The upper bound of k corresponds to executing each fragment on the smallest available # device. k_ub = 1 + max( (num_tape_wires - 1) // min_free_wires, # wire limited (num_tape_gates - 1) // min_free_gates, # gate limited ) if exhaustive: k_lb = max(2, k_lb) k_ub = num_tape_gates # The global imbalance tolerance, if not given, defaults to a very loose upper bound: imbalance_tolerance = k_ub if self.imbalance_tolerance is None else self.imbalance_tolerance probed_cuts = [] if max_gates_by_fragment is None and max_wires_by_fragment is None: # k_lower, when supplied by a user, can be higher than k_lb if the the desired k is known: k_lower = self.k_lower if self.k_lower is not None else k_lb # k_upper, when supplied by a user, can be higher than k_ub to encourage exploration: k_upper = self.k_upper if self.k_upper is not None else k_ub if k_lower < k_lb: warnings.warn( f"The provided `k_lower={k_lower}` is less than the lowest allowed value, " f"will override and set `k_lower={k_lb}`." ) k_lower = k_lb if k_lower > self.HIGH_NUM_FRAGMENTS: warnings.warn( f"The attempted number of fragments seems high with lower bound at {k_lower}." ) # Prepare the list of ks to explore: ks = list(range(k_lower, k_upper + 1)) if len(ks) > self.HIGH_PARTITION_ATTEMPTS: warnings.warn(f"The numer of partition attempts seems high ({len(ks)}).") else: # When the by-fragment wire and/or gate limits are supplied, derive k and imbalance and # return a single partition config. ks = [len(max_wires_by_fragment or max_gates_by_fragment)] for k in ks: imbalance = self._infer_imbalance( k, wire_depths, max_free_wires if max_wires_by_fragment is None else max(max_wires_by_fragment), max_free_gates if max_gates_by_fragment is None else max(max_gates_by_fragment), imbalance_tolerance, ) cut_kwargs = { "num_fragments": k, "imbalance": imbalance, } if max_wires_by_fragment is not None: cut_kwargs["max_wires_by_fragment"] = max_wires_by_fragment if max_gates_by_fragment is not None: cut_kwargs["max_gates_by_fragment"] = max_gates_by_fragment probed_cuts.append(cut_kwargs) return probed_cuts
def _graph_to_hmetis( graph: MultiDiGraph, hyperwire_weight: int = 0, edge_weights: Sequence[int] = None, ) -> Tuple[List[int], List[int], List[Union[int, float]]]: """Converts a ``MultiDiGraph`` into the `hMETIS hypergraph input format <http://glaros.dtc.umn.edu/gkhome/fetch/sw/hmetis/manual.pdf>`__ conforming to KaHyPar's calling signature. Args: graph (MultiDiGraph): The original (tape-converted) graph to be cut. hyperwire_weight (int): Weight on the artificially appended hyperedges representing wires. Defaults to 0 which leads to no such insertion. If greater than 0, hyperedges will be appended with the provided weight, to encourage the resulting fragments to cluster gates on the same wire together. edge_weights (Sequence[int]): Weights for regular edges in the graph. Defaults to ``None``, which leads to unit-weighted edges. Returns: Tuple[List,List,List]: The 3 lists representing an (optionally weighted) hypergraph: - Flattened list of adjacent node indices. - List of starting indices for edges in the above adjacent-nodes-list. - Optional list of edge weights. ``None`` if ``hyperwire_weight`` is equal to 0. """ nodes = list(graph.nodes) edges = graph.edges(data="wire") wires = {w for _, _, w in edges} adj_nodes = [nodes.index(v) for ops in graph.edges(keys=False) for v in ops] edge_splits = qml.math.cumsum([0] + [len(e) for e in graph.edges(keys=False)]).tolist() edge_weights = ( edge_weights if edge_weights is not None and len(edges) == len(edge_weights) else None ) if hyperwire_weight: hyperwires = {w: set() for w in wires} num_wires = len(hyperwires) for v0, v1, wire in edges: hyperwires[wire].update([nodes.index(v0), nodes.index(v1)]) for wire, nodes_on_wire in hyperwires.items(): nwv = len(nodes_on_wire) edge_splits.append(nwv + edge_splits[-1]) adj_nodes = adj_nodes + list(nodes_on_wire) assert len(edge_splits) == len(edges) + num_wires + 1 if isinstance(hyperwire_weight, (int, float)): # assumes original edges having unit weights by default: edge_weights = edge_weights or ([1] * len(edges)) wire_weights = [hyperwire_weight] * num_wires edge_weights = edge_weights + wire_weights return adj_nodes, edge_splits, edge_weights
[docs]def kahypar_cut( graph: MultiDiGraph, num_fragments: int, imbalance: int = None, edge_weights: List[Union[int, float]] = None, node_weights: List[Union[int, float]] = None, fragment_weights: List[Union[int, float]] = None, hyperwire_weight: int = 1, seed: int = None, config_path: Union[str, Path] = None, trial: int = None, verbose: bool = False, ) -> List[Tuple[Operation, Operation, Any]]: """Calls `KaHyPar <https://kahypar.org/>`__ to partition a graph. .. warning:: Requires KaHyPar to be installed separately. For Linux and Mac users, KaHyPar can be installed using ``pip install kahypar``. Windows users can follow the instructions `here <https://kahypar.org>`__ to compile from source. Args: graph (nx.MultiDiGraph): The graph to be partitioned. num_fragments (int): Desired number of fragments. imbalance (int): Imbalance factor of the partitioning. Defaults to KaHyPar's determination. edge_weights (List[Union[int, float]]): Weights for edges. Defaults to unit-weighted edges. node_weights (List[Union[int, float]]): Weights for nodes. Defaults to unit-weighted nodes. fragment_weights (List[Union[int, float]]): Maximum size constraints by fragment. Defaults to no such constraints, with ``imbalance`` the only parameter affecting fragment sizes. hyperwire_weight (int): Weight on the artificially appended hyperedges representing wires. Setting it to 0 leads to no such insertion. If greater than 0, hyperedges will be appended with the provided weight, to encourage the resulting fragments to cluster gates on the same wire together. Defaults to 1. seed (int): KaHyPar's seed. Defaults to the seed in the config file which defaults to -1, i.e. unfixed seed. config_path (str): KaHyPar's ``.ini`` config file path. Defaults to its SEA20 paper config. trial (int): trial id for summary label creation. Defaults to ``None``. verbose (bool): Flag for printing KaHyPar's output summary. Defaults to ``False``. Returns: List[Union[int, Any]]: List of cut edges. **Example** Consider the following 2-wire circuit with one CNOT gate connecting the wires: .. code-block:: python with qml.tape.QuantumTape() as tape: qml.RX(0.432, wires=0) qml.RY(0.543, wires="a") qml.CNOT(wires=[0, "a"]) qml.RZ(0.240, wires=0) qml.RZ(0.133, wires="a") qml.RX(0.432, wires=0) qml.RY(0.543, wires="a") qml.expval(qml.PauliZ(wires=[0])) We can let KaHyPar automatically find the optimal edges to place cuts: >>> graph = qml.transforms.qcut.tape_to_graph(tape) >>> cut_edges = qml.transforms.qcut.kahypar_cut( graph=graph, num_fragments=2, ) >>> cut_edges [(CNOT(wires=[0, 'a']), RZ(0.24, wires=[0]), 0)] """ # pylint: disable=too-many-arguments, import-outside-toplevel try: import kahypar except ImportError as e: raise ImportError( "KaHyPar must be installed to use this method for automatic " "cut placement. Try pip install kahypar or visit " "https://kahypar.org/ for installation instructions." ) from e adjacent_nodes, edge_splits, edge_weights = _graph_to_hmetis( graph=graph, hyperwire_weight=hyperwire_weight, edge_weights=edge_weights ) trial = 0 if trial is None else trial ne = len(edge_splits) - 1 nv = max(adjacent_nodes) + 1 if edge_weights is not None or node_weights is not None: edge_weights = edge_weights or [1] * ne node_weights = node_weights or [1] * nv hypergraph = kahypar.Hypergraph( nv, ne, edge_splits, adjacent_nodes, num_fragments, edge_weights, node_weights, ) else: hypergraph = kahypar.Hypergraph(nv, ne, edge_splits, adjacent_nodes, num_fragments) context = kahypar.Context() config_path = config_path or str(Path(__file__).parent / "_cut_kKaHyPar_sea20.ini") context.loadINIconfiguration(config_path) context.setK(num_fragments) if isinstance(imbalance, float): context.setEpsilon(imbalance) if isinstance(fragment_weights, SequenceType) and (len(fragment_weights) == num_fragments): context.setCustomTargetBlockWeights(fragment_weights) if not verbose: context.suppressOutput(True) # KaHyPar fixes seed to 42 by default, need to manually sample seed to randomize: kahypar_seed = np.random.default_rng(seed).choice(2**15) context.setSeed(kahypar_seed) kahypar.partition(hypergraph, context) cut_edge_mask = [hypergraph.connectivity(e) > 1 for e in hypergraph.edges()] # compress() ignores the extra hyperwires at the end if there is any. cut_edges = list(compress(graph.edges, cut_edge_mask)) if verbose: fragment_sizes = [hypergraph.blockSize(p) for p in range(num_fragments)] print(len(fragment_sizes), fragment_sizes) return cut_edges
[docs]def place_wire_cuts( graph: MultiDiGraph, cut_edges: Sequence[Tuple[Operation, Operation, Any]] ) -> MultiDiGraph: """Inserts a :class:`~.WireCut` node for each provided cut edge into a circuit graph. Args: graph (nx.MultiDiGraph): The original (tape-converted) graph to be cut. cut_edges (Sequence[Tuple[Operation, Operation, Any]]): List of ``MultiDiGraph`` edges to be replaced with a :class:`~.WireCut` node. Each 3-tuple represents the source node, the target node, and the wire key of the (multi)edge. Returns: MultiDiGraph: Copy of the input graph with :class:`~.WireCut` nodes inserted. **Example** Consider the following 2-wire circuit with one CNOT gate connecting the wires: .. code-block:: python with qml.tape.QuantumTape() as tape: qml.RX(0.432, wires=0) qml.RY(0.543, wires="a") qml.CNOT(wires=[0, "a"]) qml.expval(qml.PauliZ(wires=[0])) >>> print(qml.drawer.tape_text(tape)) 0: ──RX(0.432)──╭●──┤ ⟨Z⟩ a: ──RY(0.543)──╰X──┤ If we know we want to place a :class:`~.WireCut` node between nodes ``RY(0.543, wires=["a"])`` and ``CNOT(wires=[0, 'a'])`` after the tape is constructed, we can first find the edge in the graph: >>> graph = qml.transforms.qcut.tape_to_graph(tape) >>> op0, op1 = tape.operations[1], tape.operations[2] >>> cut_edges = [e for e in graph.edges if e[0] is op0 and e[1] is op1] >>> cut_edges [(RY(0.543, wires=['a']), CNOT(wires=[0, 'a']), 0)] Then feed it to this function for placement: >>> cut_graph = qml.transforms.qcut.place_wire_cuts(graph=graph, cut_edges=cut_edges) >>> cut_graph <networkx.classes.multidigraph.MultiDiGraph at 0x7f7251ac1220> And visualize the cut by converting back to a tape: >>> print(qml.transforms.qcut.graph_to_tape(cut_graph).draw()) 0: ──RX(0.432)──────╭●──┤ ⟨Z⟩ a: ──RY(0.543)──//──╰X──┤ """ cut_graph = graph.copy() for op0, op1, wire_key in cut_edges: # Get info: order = cut_graph.nodes[op0]["order"] + 1 wire = cut_graph.edges[(op0, op1, wire_key)]["wire"] # Apply cut: cut_graph.remove_edge(op0, op1, wire_key) # Increment order for all subsequent gates: for op, o in cut_graph.nodes(data="order"): if o >= order: cut_graph.nodes[op]["order"] += 1 # Add WireCut wire_cut = WireCut(wires=wire) cut_graph.add_node(wire_cut, order=order) cut_graph.add_edge(op0, wire_cut, wire=wire) cut_graph.add_edge(wire_cut, op1, wire=wire) return cut_graph
def _remove_existing_cuts(graph: MultiDiGraph) -> MultiDiGraph: """Removes all existing, manually or automatically placed, cuts from a circuit graph, be it ``WireCut``s or ``MeasureNode``-``PrepareNode`` pairs. Args: graph (MultiDiGraph): The original (tape-converted) graph to be cut. Returns: (MultiDiGraph): Copy of the input graph with all its existing cuts removed. """ uncut_graph = graph.copy() for op in list(graph.nodes): if isinstance(op, WireCut): uncut_graph.remove_node(op) elif isinstance(op, MeasureNode): for op1 in graph.neighbors(op): if isinstance(op1, PrepareNode): uncut_graph.remove_node(op) uncut_graph.remove_node(op1) if len([n for n in uncut_graph.nodes if isinstance(n, (MeasureNode, PrepareNode))]) > 0: warnings.warn( "The circuit contains `MeasureNode` or `PrepareNode` operations that are " "not paired up correctly. Please check.", UserWarning, ) return uncut_graph
[docs]def find_and_place_cuts( graph: MultiDiGraph, cut_method: Callable = kahypar_cut, cut_strategy: CutStrategy = None, replace_wire_cuts=False, local_measurement=False, **kwargs, ) -> MultiDiGraph: """Automatically finds and places optimal :class:`~.WireCut` nodes into a given tape-converted graph using a customizable graph partitioning function. Preserves existing placed cuts. Args: graph (MultiDiGraph): The original (tape-converted) graph to be cut. cut_method (Callable): A graph partitioning function that takes an input graph and returns a list of edges to be cut based on a given set of constraints and objective. Defaults to :func:`kahypar_cut` which requires KaHyPar to be installed using ``pip install kahypar`` for Linux and Mac users or visiting the instructions `here <https://kahypar.org>`__ to compile from source for Windows users. cut_strategy (CutStrategy): Strategy for optimizing cutting parameters based on device constraints. Defaults to ``None`` in which case ``kwargs`` must be fully specified for passing to the ``cut_method``. replace_wire_cuts (bool): Whether to replace :class:`~.WireCut` nodes with :class:`~.MeasureNode` and :class:`~.PrepareNode` pairs. Defaults to ``False``. local_measurement (bool): Whether to use the local-measurement circuit-cutting objective, i.e. the maximum node-degree of the communication graph, for cut evaluation. Defaults to ``False`` which assumes global measurement and uses the total number of cuts as the cutting objective. kwargs: Additional keyword arguments to be passed to the callable ``cut_method``. Returns: nx.MultiDiGraph: Copy of the input graph with :class:`~.WireCut` nodes inserted. **Example** Consider the following 4-wire circuit with a single CNOT gate connecting the top (wires ``[0, 1]``) and bottom (wires ``["a", "b"]``) halves of the circuit. Note there's a :class:`~.WireCut` manually placed into the circuit already. .. code-block:: python with qml.tape.QuantumTape() as tape: qml.RX(0.1, wires=0) qml.RY(0.2, wires=1) qml.RX(0.3, wires="a") qml.RY(0.4, wires="b") qml.CNOT(wires=[0, 1]) qml.WireCut(wires=1) qml.CNOT(wires=["a", "b"]) qml.CNOT(wires=[1, "a"]) qml.CNOT(wires=[0, 1]) qml.CNOT(wires=["a", "b"]) qml.RX(0.5, wires="a") qml.RY(0.6, wires="b") qml.expval(qml.PauliX(wires=[0]) @ qml.PauliY(wires=["a"]) @ qml.PauliZ(wires=["b"])) >>> print(qml.drawer.tape.text(tape)) 0: ──RX(0.1)──╭●──────────╭●───────────╭┤ ⟨X ⊗ Y ⊗ Z⟩ 1: ──RY(0.2)──╰X──//──╭●──╰X───────────│┤ a: ──RX(0.3)──╭●──────╰X──╭●──RX(0.5)──├┤ ⟨X ⊗ Y ⊗ Z⟩ b: ──RY(0.4)──╰X──────────╰X──RY(0.6)──╰┤ ⟨X ⊗ Y ⊗ Z⟩ Since the existing :class:`~.WireCut` doesn't sufficiently fragment the circuit, we can find the remaining cuts using the default KaHyPar partitioner: >>> graph = qml.transforms.qcut.tape_to_graph(tape) >>> cut_graph = qml.transforms.qcut.find_and_place_cuts( graph=graph, num_fragments=2, imbalance=0.5, ) Visualizing the newly-placed cut: >>> print(qml.transforms.qcut.graph_to_tape(cut_graph).draw()) 0: ──RX(0.1)──╭●───────────────╭●────────╭┤ ⟨X ⊗ Y ⊗ Z⟩ 1: ──RY(0.2)──╰X──//──╭●───//──╰X────────│┤ a: ──RX(0.3)──╭●──────╰X──╭●────RX(0.5)──├┤ ⟨X ⊗ Y ⊗ Z⟩ b: ──RY(0.4)──╰X──────────╰X────RY(0.6)──╰┤ ⟨X ⊗ Y ⊗ Z⟩ We can then proceed with the usual process of replacing :class:`~.WireCut` nodes with pairs of :class:`~.MeasureNode` and :class:`~.PrepareNode`, and then break the graph into fragments. Or, alternatively, we can directly get such processed graph by passing ``replace_wire_cuts=True``: >>> cut_graph = qml.transforms.qcut.find_and_place_cuts( graph=graph, num_fragments=2, imbalance=0.5, replace_wire_cuts=True, ) >>> frags, comm_graph = qml.transforms.qcut.fragment_graph(cut_graph) >>> for t in frags: ... print(qml.transforms.qcut.graph_to_tape(t).draw()) .. code-block:: 0: ──RX(0.1)──────╭●───────────────╭●──┤ ⟨X⟩ 1: ──RY(0.2)──────╰X──MeasureNode──│───┤ 2: ──PrepareNode───────────────────╰X──┤ a: ──RX(0.3)──────╭●──╭X──╭●────────────RX(0.5)──╭┤ ⟨Y ⊗ Z⟩ b: ──RY(0.4)──────╰X──│───╰X────────────RY(0.6)──╰┤ ⟨Y ⊗ Z⟩ 1: ──PrepareNode──────╰●───MeasureNode────────────┤ Alternatively, if all we want to do is to find the optimal way to fit a circuit onto a smaller device, a :class:`~.CutStrategy` can be used to populate the necessary explorations of cutting parameters. As an extreme example, if the only device at our disposal is a 2-qubit device, a simple cut strategy is to simply specify the the ``max_free_wires`` argument (or equivalently directly passing a :class:`~.Device` to the ``device`` argument): >>> cut_strategy = qml.transforms.qcut.CutStrategy(max_free_wires=2) >>> print(cut_strategy.get_cut_kwargs(graph)) [{'num_fragments': 2, 'imbalance': 0.5714285714285714}, {'num_fragments': 3, 'imbalance': 1.4}, {'num_fragments': 4, 'imbalance': 1.75}, {'num_fragments': 5, 'imbalance': 2.3333333333333335}, {'num_fragments': 6, 'imbalance': 2.0}, {'num_fragments': 7, 'imbalance': 3.0}, {'num_fragments': 8, 'imbalance': 2.5}, {'num_fragments': 9, 'imbalance': 2.0}, {'num_fragments': 10, 'imbalance': 1.5}, {'num_fragments': 11, 'imbalance': 1.0}, {'num_fragments': 12, 'imbalance': 0.5}, {'num_fragments': 13, 'imbalance': 0.05}, {'num_fragments': 14, 'imbalance': 0.1}] The printed list above shows all the possible cutting configurations one can attempt to perform in order to search for the optimal cut. This is done by directly passing a :class:`~.CutStrategy` to :func:`~.find_and_place_cuts`: >>> cut_graph = qml.transforms.qcut.find_and_place_cuts( graph=graph, cut_strategy=cut_strategy, ) >>> print(qml.transforms.qcut.graph_to_tape(cut_graph).draw()) 0: ──RX──//─╭●──//────────╭●──//─────────┤ ╭<[email protected]@Z> 1: ──RY──//─╰X──//─╭●──//─╰X─────────────┤ │ a: ──RX──//─╭●──//─╰X──//─╭●──//──RX──//─┤ ├<[email protected]@Z> b: ──RY──//─╰X──//────────╰X──//──RY─────┤ ╰<[email protected]@Z> As one can tell, quite a few cuts have to be made in order to execute the circuit on solely 2-qubit devices. To verify, let's print the fragments: >>> qml.transforms.qcut.replace_wire_cut_nodes(cut_graph) >>> frags, comm_graph = qml.transforms.qcut.fragment_graph(cut_graph) >>> for t in frags: ... print(qml.transforms.qcut.graph_to_tape(t).draw()) .. code-block:: 0: ──RX──MeasureNode─┤ 1: ──RY──MeasureNode─┤ a: ──RX──MeasureNode─┤ b: ──RY──MeasureNode─┤ 0: ──PrepareNode─╭●──MeasureNode─┤ 1: ──PrepareNode─╰X──MeasureNode─┤ a: ──PrepareNode─╭●──MeasureNode─┤ b: ──PrepareNode─╰X──MeasureNode─┤ 1: ──PrepareNode─╭●──MeasureNode─┤ a: ──PrepareNode─╰X──MeasureNode─┤ 0: ──PrepareNode─╭●──MeasureNode─┤ 1: ──PrepareNode─╰X──────────────┤ b: ──PrepareNode─╭X──MeasureNode─┤ a: ──PrepareNode─╰●──MeasureNode─┤ a: ──PrepareNode──RX──MeasureNode─┤ b: ──PrepareNode──RY─┤ <Z> 0: ──PrepareNode─┤ <X> a: ──PrepareNode─┤ <Y> """ cut_graph = _remove_existing_cuts(graph) if isinstance(cut_strategy, CutStrategy): cut_kwargs_probed = cut_strategy.get_cut_kwargs(cut_graph) # Need to reseed if a seed is passed: seed = kwargs.pop("seed", None) seeds = np.random.default_rng(seed).choice(2**15, cut_strategy.trials_per_probe).tolist() cut_edges_probed = { (cut_kwargs["num_fragments"], trial_id): cut_method( cut_graph, **{ **cut_kwargs, **kwargs, "seed": seed, }, # kwargs has higher precedence for colliding keys ) for cut_kwargs in cut_kwargs_probed for trial_id, seed in zip(range(cut_strategy.trials_per_probe), seeds) } valid_cut_edges = {} for (num_partitions, _), cut_edges in cut_edges_probed.items(): # The easiest way to tell if a cut is valid is to just do the fragment graph. cut_graph = place_wire_cuts(graph=graph, cut_edges=cut_edges) num_cuts = sum(isinstance(n, WireCut) for n in cut_graph.nodes) replace_wire_cut_nodes(cut_graph) frags, comm = fragment_graph(cut_graph) max_frag_degree = max(dict(comm.degree()).values()) if _is_valid_cut( fragments=frags, num_cuts=num_cuts, max_frag_degree=max_frag_degree, num_fragments_requested=num_partitions, cut_candidates=valid_cut_edges, max_free_wires=cut_strategy.max_free_wires, ): key = (len(frags), max_frag_degree) valid_cut_edges[key] = cut_edges if len(valid_cut_edges) < 1: raise ValueError( "Unable to find a circuit cutting that satisfies all constraints. " "Are the constraints too strict?" ) cut_edges = _get_optim_cut(valid_cut_edges, local_measurement=local_measurement) else: cut_edges = cut_method(cut_graph, **kwargs) cut_graph = place_wire_cuts(graph=graph, cut_edges=cut_edges) if replace_wire_cuts: replace_wire_cut_nodes(cut_graph) return cut_graph
def _is_valid_cut( fragments, num_cuts, max_frag_degree, num_fragments_requested, cut_candidates, max_free_wires, ): """Helper function for determining if a cut is a valid canditate.""" # pylint: disable=too-many-arguments k = len(fragments) key = (k, max_frag_degree) correct_num_fragments = k <= num_fragments_requested best_candidate_yet = (key not in cut_candidates) or (len(cut_candidates[key]) > num_cuts) # pylint: disable=no-member all_fragments_fit = all( len(graph_to_tape(f).wires) <= max_free_wires for j, f in enumerate(fragments) ) return correct_num_fragments and best_candidate_yet and all_fragments_fit def _get_optim_cut(valid_cut_edges, local_measurement=False): """Picks out the best cut from a dict of valid candidate cuts.""" if local_measurement: min_max_node_degree = min(max_node_degree for _, max_node_degree in valid_cut_edges) optim_cuts = { k: cut_edges for (k, max_node_degree), cut_edges in valid_cut_edges.items() if (max_node_degree == min_max_node_degree) } else: min_cuts = min(len(cut_edges) for cut_edges in valid_cut_edges.values()) optim_cuts = { k: cut_edges for (k, _), cut_edges in valid_cut_edges.items() if (len(cut_edges) == min_cuts) } return optim_cuts[min(optim_cuts)] # choose the lowest num_fragments among best ones.