# Source code for pennylane.circuit_graph

```
# Copyright 2018-2021 Xanadu Quantum Technologies Inc.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
This module contains the CircuitGraph class which is used to generate a DAG (directed acyclic graph)
representation of a quantum circuit from an Operator queue.
"""
# pylint: disable=too-many-branches,too-many-arguments,too-many-instance-attributes
from numbers import Number
from collections import namedtuple
import numpy as np
import rustworkx as rx
from pennylane.measurements import MeasurementProcess
from pennylane.resource import ResourcesOperation
def _by_idx(x):
"""Sorting key for Operators: queue index aka temporal order.
Args:
x (Operator): node in the circuit graph
Returns:
int: sorting key for the node
"""
return x.queue_idx
def _is_observable(x):
"""Predicate for deciding if an Operator instance is an observable.
.. note::
Currently some :class:`Observable` instances are not observables in this sense,
since they can be used as gates as well.
Args:
x (Operator): node in the circuit graph
Returns:
bool: True iff x is an observable
"""
return isinstance(x, MeasurementProcess)
Layer = namedtuple("Layer", ["ops", "param_inds"])
"""Parametrized layer of the circuit.
Args:
ops (list[Operator]): parametrized operators in the layer
param_inds (list[int]): corresponding free parameter indices
"""
# TODO define what a layer is
LayerData = namedtuple("LayerData", ["pre_ops", "ops", "param_inds", "post_ops"])
"""Parametrized layer of the circuit.
Args:
pre_ops (list[Operator]): operators that precede the layer
ops (list[Operator]): parametrized operators in the layer
param_inds (tuple[int]): corresponding free parameter indices
post_ops (list[Operator]): operators that succeed the layer
"""
[docs]class CircuitGraph:
"""Represents a quantum circuit as a directed acyclic graph.
In this representation the :class:`~.Operator` instances are the nodes of the graph,
and each directed edge represent a subsystem (or a group of subsystems) on which the two
Operators act subsequently. This representation can describe the causal relationships
between arbitrary quantum channels and measurements, not just unitary gates.
Args:
ops (Iterable[.Operator]): quantum operators constituting the circuit, in temporal order
obs (Iterable[.MeasurementProcess]): terminal measurements, in temporal order
wires (.Wires): The addressable wire registers of the device that will be executing this graph
par_info (list[dict]): Parameter information. For each index, the entry is a dictionary containing an operation
and an index into that operation's parameters.
trainable_params (set[int]): A set containing the indices of parameters that support
differentiability. The indices provided match the order of appearence in the
quantum circuit.
"""
# pylint: disable=too-many-public-methods
def __init__(self, ops, obs, wires, par_info=None, trainable_params=None):
self._operations = ops
self._observables = obs
self.par_info = par_info
self.trainable_params = trainable_params
queue = ops + obs
self._depth = None
self._grid = {}
"""dict[int, list[Operator]]: dictionary representing the quantum circuit as a grid.
Here, the key is the wire number, and the value is a list containing the operators on that wire.
"""
self._indices = {}
# Store indices for the nodes of the DAG here
self.wires = wires
"""Wires: wires that are addressed in the operations.
Required to translate between wires and indices of the wires on the device."""
self.num_wires = len(wires)
"""int: number of wires the circuit contains"""
for k, op in enumerate(queue):
# meas_wires = wires or None # cannot use empty wire list in MeasurementProcess
op.queue_idx = k # store the queue index in the Operator
for w in wires if len(op.wires) == 0 else op.wires:
# get the index of the wire on the device
wire = wires.index(w)
# add op to the grid, to the end of wire w
self._grid.setdefault(wire, []).append(op)
# TODO: State preparations demolish the incoming state entirely, and therefore should have no incoming edges.
self._graph = rx.PyDiGraph(
multigraph=False
) #: rx.PyDiGraph: DAG representation of the quantum circuit
# Iterate over each (populated) wire in the grid
for wire in self._grid.values():
# Add the first operator on the wire to the graph
# This operator does not depend on any others
# Check if wire[0] in self._grid.values()
# is already added to the graph; this
# condition avoids adding new nodes with
# the same value but different indexes
if all(wire[0] is not op for op in self._graph.nodes()):
_ind = self._graph.add_node(wire[0])
self._indices.setdefault(id(wire[0]), _ind)
for i in range(1, len(wire)):
# For subsequent operators on the wire:
if all(wire[i] is not op for op in self._graph.nodes()):
# Add them to the graph if they are not already
# in the graph (multi-qubit operators might already have been placed)
_ind = self._graph.add_node(wire[i])
self._indices.setdefault(id(wire[i]), _ind)
# Create an edge between this and the previous operator
# There isn't any default value for the edge-data in
# rx.PyDiGraph.add_edge(); this is set to an empty string
self._graph.add_edge(self._indices[id(wire[i - 1])], self._indices[id(wire[i])], "")
# For computing depth; want only a graph with the operations, not
# including the observables
self._operation_graph = None
# Required to keep track if we need to handle multiple returned
# observables per wire
self._max_simultaneous_measurements = None
[docs] def print_contents(self):
"""Prints the contents of the quantum circuit."""
print("Operations")
print("==========")
for op in self.operations:
print(repr(op))
print("\nObservables")
print("===========")
for op in self.observables:
print(repr(op))
[docs] def serialize(self):
"""Serialize the quantum circuit graph based on the operations and
observables in the circuit graph and the index of the variables
used by them.
The string that is produced can be later hashed to assign a unique value to the circuit graph.
Returns:
string: serialized quantum circuit graph
"""
serialization_string = ""
delimiter = "!"
for op in self.operations_in_order:
serialization_string += op.name
for param in op.data:
serialization_string += delimiter
serialization_string += str(param)
serialization_string += delimiter
serialization_string += str(op.wires.tolist())
# Adding a distinct separating string that could not occur by any combination of the
# name of the operation and wires
serialization_string += "|||"
for mp in self.observables_in_order:
obs = mp.obs or mp
data, name = ([], "Identity") if obs is mp else (obs.data, str(obs.name))
serialization_string += str(mp.return_type)
serialization_string += delimiter
serialization_string += name
for param in data:
serialization_string += delimiter
serialization_string += str(param)
serialization_string += delimiter
serialization_string += str(obs.wires.tolist())
return serialization_string
@property
def hash(self):
"""Creating a hash for the circuit graph based on the string generated by serialize.
Returns:
int: the hash of the serialized quantum circuit graph
"""
return hash(self.serialize())
@property
def observables_in_order(self):
"""Observables in the circuit, in a fixed topological order.
The topological order used by this method is guaranteed to be the same
as the order in which the measured observables are returned by the quantum function.
Currently the topological order is determined by the queue index.
Returns:
list[Observable]: observables
"""
nodes = [node for node in self._graph.nodes() if _is_observable(node)]
return sorted(nodes, key=_by_idx)
@property
def observables(self):
"""Observables in the circuit."""
return self._observables
@property
def operations_in_order(self):
"""Operations in the circuit, in a fixed topological order.
Currently the topological order is determined by the queue index.
The complement of :meth:`QNode.observables`. Together they return every :class:`Operator`
instance in the circuit.
Returns:
list[Operation]: operations
"""
nodes = [node for node in self._graph.nodes() if not _is_observable(node)]
return sorted(nodes, key=_by_idx)
@property
def operations(self):
"""Operations in the circuit."""
return self._operations
@property
def graph(self):
"""The graph representation of the quantum circuit.
The graph has nodes representing :class:`.Operator` instances,
and directed edges pointing from nodes to their immediate dependents/successors.
Returns:
rustworkx.PyDiGraph: the directed acyclic graph representing the quantum circuit
"""
return self._graph
[docs] def wire_indices(self, wire):
"""Operator indices on the given wire.
Args:
wire (int): wire to examine
Returns:
list[int]: indices of operators on the wire, in temporal order
"""
return [op.queue_idx for op in self._grid[wire]]
[docs] def ancestors(self, ops):
"""Ancestors of a given set of operators.
Args:
ops (Iterable[Operator]): set of operators in the circuit
Returns:
list[Operator]: ancestors of the given operators
"""
# rx.ancestors() returns node indices instead of node-values
all_indices = set().union(*(rx.ancestors(self._graph, self._indices[id(o)]) for o in ops))
double_op_indices = set(self._indices[id(o)] for o in ops)
ancestor_indices = all_indices - double_op_indices
return list(self._graph.get_node_data(n) for n in ancestor_indices)
[docs] def descendants(self, ops):
"""Descendants of a given set of operators.
Args:
ops (Iterable[Operator]): set of operators in the circuit
Returns:
list[Operator]: descendants of the given operators
"""
# rx.descendants() returns node indices instead of node-values
all_indices = set().union(*(rx.descendants(self._graph, self._indices[id(o)]) for o in ops))
double_op_indices = set(self._indices[id(o)] for o in ops)
ancestor_indices = all_indices - double_op_indices
return list(self._graph.get_node_data(n) for n in ancestor_indices)
def _in_topological_order(self, ops):
"""Sorts a set of operators in the circuit in a topological order.
Args:
ops (Iterable[Operator]): set of operators in the circuit
Returns:
Iterable[Operator]: same set of operators, topologically ordered
"""
G = self._graph.subgraph(list(self._indices[id(o)] for o in ops))
indexes = rx.topological_sort(G)
return list(G[x] for x in indexes)
[docs] def ancestors_in_order(self, ops):
"""Operator ancestors in a topological order.
Currently the topological order is determined by the queue index.
Args:
ops (Iterable[Operator]): set of operators in the circuit
Returns:
list[Operator]: ancestors of the given operators, topologically ordered
"""
return sorted(self.ancestors(ops), key=_by_idx) # an abitrary topological order
[docs] def descendants_in_order(self, ops):
"""Operator descendants in a topological order.
Currently the topological order is determined by the queue index.
Args:
ops (Iterable[Operator]): set of operators in the circuit
Returns:
list[Operator]: descendants of the given operators, topologically ordered
"""
return sorted(self.descendants(ops), key=_by_idx)
[docs] def nodes_between(self, a, b):
r"""Nodes on all the directed paths between the two given nodes.
Returns the set of all nodes ``s`` that fulfill :math:`a \le s \le b`.
There is a directed path from ``a`` via ``s`` to ``b`` iff the set is nonempty.
The endpoints belong to the path.
Args:
a (Operator): initial node
b (Operator): final node
Returns:
list[Operator]: nodes on all the directed paths between a and b
"""
A = self.descendants([a])
A.append(a)
B = self.ancestors([b])
B.append(b)
return [B.pop(i) for op1 in A for i, op2 in enumerate(B) if op1 is op2]
@property
def parametrized_layers(self):
"""Identify the parametrized layer structure of the circuit.
Returns:
list[Layer]: layers of the circuit
"""
# FIXME maybe layering should be greedier, for example [a0 b0 c1 d1] should layer as [a0
# c1], [b0, d1] and not [a0], [b0 c1], [d1] keep track of the current layer
current = Layer([], [])
layers = [current]
for idx, info in enumerate(self.par_info):
if idx in self.trainable_params:
op = info["op"]
# get all predecessor ops of the op
sub = self.ancestors((op,))
# check if any of the dependents are in the
# currently assembled layer
if any(o1 is o2 for o1 in current.ops for o2 in sub):
# operator depends on current layer, start a new layer
current = Layer([], [])
layers.append(current)
# store the parameters and ops indices for the layer
current.ops.append(op)
current.param_inds.append(idx)
return layers
[docs] def iterate_parametrized_layers(self):
"""Parametrized layers of the circuit.
Returns:
Iterable[LayerData]: layers with extra metadata
"""
# iterate through each layer
for ops, param_inds in self.parametrized_layers:
pre_queue = self.ancestors_in_order(ops)
post_queue = self.descendants_in_order(ops)
yield LayerData(pre_queue, ops, tuple(param_inds), post_queue)
[docs] def update_node(self, old, new):
"""Replaces the given circuit graph node with a new one.
Args:
old (Operator): node to replace
new (Operator): replacement
Raises:
ValueError: if the new :class:`~.Operator` does not act on the same wires as the old one
"""
# NOTE Does not alter the graph edges in any way. variable_deps is not changed, _grid is not changed. Dangerous!
if new.wires != old.wires:
raise ValueError("The new Operator must act on the same wires as the old one.")
new.queue_idx = old.queue_idx
self._graph[self._indices[id(old)]] = new
index = self._indices.pop(id(old))
self._indices[id(new)] = index
self._operations = self.operations_in_order
self._observables = self.observables_in_order
[docs] def get_depth(self):
"""Depth of the quantum circuit (longest path in the DAG)."""
# If there are no operations in the circuit, the depth is 0
if not self.operations:
self._depth = 0
# If there are operations but depth is uncomputed, compute the truncated graph
# with only the operations, and return the longest path + 1 (since the path is
# expressed in terms of edges, and we want it in terms of nodes).
if self._depth is None and self.operations:
if self._operation_graph is None:
self._operation_graph = self._graph.subgraph(
list(self._indices[id(node)] for node in self.operations)
)
self._extend_graph(self._operation_graph)
self._depth = (
rx.dag_longest_path_length(
self._operation_graph, weight_fn=lambda _, __, w: self._weight_func(w)
)
+ 1
)
return self._depth
@staticmethod
def _weight_func(weight):
"""If weight is a number, use it!"""
if isinstance(weight, Number):
return weight
return 1
def _extend_graph(self, graph: rx.PyDiGraph) -> rx.PyDiGraph:
"""Extend graph to account for custom depth operations"""
custom_depth_node_dict = {}
for op in self.operations:
if isinstance(op, ResourcesOperation) and (d := op.resources().depth) > 1:
custom_depth_node_dict[graph.nodes().index(op)] = d
def _link_graph(target_index, sub_graph, node_index):
"""Link incoming and outgoing edges for the initial node to the sub-graph"""
if target_index == node_index:
return sub_graph.nodes().index(f"{node_index}.0")
return sub_graph.nodes().index(f"{node_index}.1")
for node_index, depth in custom_depth_node_dict.items():
# Construct sub_graph:
sub_graph = rx.PyDiGraph()
source_node, target_node = (f"{node_index}.0", f"{node_index}.1")
sub_graph.add_node(source_node)
sub_graph.add_node(target_node)
sub_graph.add_edge(
sub_graph.nodes().index(source_node),
sub_graph.nodes().index(target_node),
depth - 1, # set edge weight as depth - 1
)
graph.substitute_node_with_subgraph(
node_index,
sub_graph,
lambda _, t, __: _link_graph(
t, sub_graph, node_index # pylint: disable=cell-var-from-loop
),
)
[docs] def has_path(self, a, b):
"""Checks if a path exists between the two given nodes.
Args:
a (Operator): initial node
b (Operator): final node
Returns:
bool: returns ``True`` if a path exists
"""
if a is b:
return True
return (
len(
rx.digraph_dijkstra_shortest_paths(
self._graph,
self._indices[id(a)],
self._indices[id(b)],
weight_fn=None,
default_weight=1.0,
as_undirected=False,
)
)
!= 0
)
@property
def max_simultaneous_measurements(self):
"""Returns the maximum number of measurements on any wire in the circuit graph.
This method counts the number of measurements for each wire and returns
the maximum.
**Examples**
>>> dev = qml.device('default.qubit', wires=3)
>>> def circuit_measure_max_once():
... return qml.expval(qml.X(0))
>>> qnode = qml.QNode(circuit_measure_max_once, dev)
>>> qnode()
>>> qnode.qtape.graph.max_simultaneous_measurements
1
>>> def circuit_measure_max_twice():
... return qml.expval(qml.X(0)), qml.probs(wires=0)
>>> qnode = qml.QNode(circuit_measure_max_twice, dev)
>>> qnode()
>>> qnode.qtape.graph.max_simultaneous_measurements
2
Returns:
int: the maximum number of measurements
"""
if self._max_simultaneous_measurements is None:
all_wires = []
for obs in self.observables:
all_wires.extend(obs.wires.tolist())
a = np.array(all_wires)
_, counts = np.unique(a, return_counts=True)
self._max_simultaneous_measurements = (
counts.max() if counts.size != 0 else 1
) # qml.state() will result in an empty array
return self._max_simultaneous_measurements
```

_modules/pennylane/circuit_graph

Download Python script

Download Notebook

View on GitHub