Source code for pennylane.workflow.interfaces.tensorflow_autograph

# Copyright 2018-2021 Xanadu Quantum Technologies Inc.

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at

#     http://www.apache.org/licenses/LICENSE-2.0

# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
This module contains functions for adding the TensorFlow Autograph interface
to a PennyLane Device class.
"""
# pylint: disable=too-many-arguments,too-many-branches,too-many-statements
from functools import reduce

import numpy as np
import tensorflow as tf

import pennylane as qml
from pennylane.measurements import SampleMP, StateMP

from .tensorflow import (
    _res_restructured,
    _to_tensors,
    set_parameters_on_copy,
)


def _compute_vjp(dy, jacs, multi_measurements, has_partitioned_shots):
    # compute the vector-Jacobian product dy @ jac
    # for a list of dy's and Jacobian matrices.
    vjps = []

    for dy_, jac_, multi in zip(dy, jacs, multi_measurements):
        dy_ = dy_ if has_partitioned_shots else (dy_,)
        jac_ = jac_ if has_partitioned_shots else (jac_,)

        shot_vjps = []
        for d, j in zip(dy_, jac_):
            # see xfail test: test_tensorflow_qnode_default_qubit_2.py:test_autograph_adjoint_multi_out
            # And Issue #5078
            # pragma: no cover
            if multi:  # pragma: no cover
                shot_vjps.append(qml.gradients.compute_vjp_multi(d, j))
            else:
                shot_vjps.append(qml.gradients.compute_vjp_single(d, j))

        vjp = qml.math.sum(qml.math.stack(shot_vjps), 0)

        vjps.extend(vjp)

    return vjps


def _flatten_nested_list(x):
    """
    Recursively flatten the list
    """
    if not isinstance(x, (tuple, list)):
        return [x]

    return reduce(lambda a, y: a + _flatten_nested_list(y), x, [])


def _jac_restructured(jacs, tapes):
    """
    Reconstruct the nested tuple structure of the jacobian of a list of tapes
    """
    start = 0
    jacs_nested = []
    for tape in tapes:
        num_meas = len(tape.measurements)
        num_params = len(tape.trainable_params)

        tape_jacs = tuple(jacs[start : start + num_meas * num_params])
        tape_jacs = tuple(
            tuple(tape_jacs[i * num_params : (i + 1) * num_params]) for i in range(num_meas)
        )

        while isinstance(tape_jacs, tuple) and len(tape_jacs) == 1:
            tape_jacs = tape_jacs[0]

        jacs_nested.append(tape_jacs)
        start += num_meas * num_params

    return tuple(jacs_nested)


[docs]def execute( tapes, device, execute_fn, gradient_fn, gradient_kwargs, _n=1, max_diff=2, grad_on_execution=None, ): """Execute a batch of tapes with TensorFlow parameters on a device. Args: tapes (Sequence[.QuantumTape]): batch of tapes to execute device (pennylane.Device): Device to use to execute the batch of tapes. If the device does not provide a ``batch_execute`` method, by default the tapes will be executed in serial. execute_fn (callable): The execution function used to execute the tapes during the forward pass. This function must return a tuple ``(results, jacobians)``. If ``jacobians`` is an empty list, then ``gradient_fn`` is used to compute the gradients during the backwards pass. gradient_kwargs (dict): dictionary of keyword arguments to pass when determining the gradients of tapes gradient_fn (callable): the gradient function to use to compute quantum gradients _n (int): a positive integer used to track nesting of derivatives, for example if the nth-order derivative is requested. max_diff (int): If ``gradient_fn`` is a gradient transform, this option specifies the maximum number of derivatives to support. Increasing this value allows for higher order derivatives to be extracted, at the cost of additional (classical) computational overhead during the backwards pass. grad_on_execution (bool): Whether the gradients should be computed on execution or not. Returns: list[list[tf.Tensor]]: A nested list of tape results. Each element in the returned list corresponds in order to the provided tapes. """ all_params = [] parameters = [] lens = [] trainable = [] output_types = [] # assumes all tapes have the same shot vector has_partitioned_shots = tapes[0].shots.has_partitioned_shots num_shot_copies = tapes[0].shots.num_copies or 1 for tape in tapes: # store the trainable parameters params = tape.get_parameters(trainable_only=False) tape.trainable_params = qml.math.get_trainable_indices(params) parameters += [p for i, p in enumerate(params) if i in tape.trainable_params] all_params += params trainable += (np.array(list(tape.trainable_params)) + sum(lens)).tolist() lens.append(len(params)) o_types = [] for m in tape.measurements: if isinstance(m, SampleMP): o_types.append(tf.int64) elif isinstance(m, StateMP): o_types.append(tf.complex128) else: o_types.append(tf.float64) output_types.extend(o_types * num_shot_copies) total_measurements = sum(len(tape.measurements) for tape in tapes) if grad_on_execution: output_types += [tf.float64] * len(trainable) output_types += [tf.int32] * (total_measurements * num_shot_copies) def _nest_params(all_params): count = 0 params_unwrapped = [] for s in lens: params_unwrapped.append(all_params[count : count + s]) count += s return params_unwrapped def _forward(*all_params): params_unwrapped = _nest_params(all_params) output_sizes = [] new_tapes = set_parameters_on_copy(tapes, params_unwrapped) # Forward pass: execute the tapes res, jacs = execute_fn(new_tapes, **gradient_kwargs) # flatten the results res = _flatten_nested_list(res) for i, r in enumerate(res): # convert output to TensorFlow tensors res[i] = _to_tensors(r) output_sizes.append(tf.size(res[i])) # flatten the jacobians if jacs: jacs = _flatten_nested_list(jacs) for i, jac in enumerate(jacs): jacs[i] = tf.convert_to_tensor(jac) else: jacs = [] return res + jacs + output_sizes @tf.custom_gradient def _execute(*all_params): # pylint:disable=unused-argument res = tf.numpy_function(func=_forward, inp=all_params, Tout=output_types) output_sizes = res[-total_measurements * num_shot_copies :] if grad_on_execution: jacs = res[total_measurements * num_shot_copies : -total_measurements * num_shot_copies] res = res[: total_measurements * num_shot_copies] # reconstruct the nested structure of res res = _res_restructured(res, tapes) def grad_fn(*dy, **tfkwargs): """Returns the vector-Jacobian product with given parameter values and output gradient dy""" # whether the tapes contain multiple measurements multi_measurements = [len(tape.measurements) > 1 for tape in tapes] dy = list(dy[: total_measurements * num_shot_copies]) if grad_on_execution: # Jacobians were computed on the forward pass (grad_on_execution=True) # No additional quantum evaluations needed; simply compute the VJPs directly. def _backward(*args): for tape in tapes: for m in tape.measurements: if m.numeric_type == complex: raise NotImplementedError( f"Tensorflow autograph only supports real valued measurements. Got {m}" ) dy = args[: total_measurements * num_shot_copies] jacs = args[total_measurements * num_shot_copies : -len(tapes)] dy = _res_restructured(dy, tapes) jacs = _jac_restructured(jacs, tapes) return _compute_vjp(dy, jacs, multi_measurements, has_partitioned_shots) vjps = tf.numpy_function( func=_backward, inp=dy + list(jacs) + multi_measurements, Tout=[tf.float64] * len(parameters), ) else: # Need to compute the Jacobians on the backward pass (accumulation="backward") if isinstance(gradient_fn, qml.transforms.core.TransformDispatcher): # Gradient function is a gradient transform. # Generate and execute the required gradient tapes if _n == max_diff: len_all_params = len(all_params) def _backward(*all_params): dy = all_params[len_all_params:] all_params = all_params[:len_all_params] params_unwrapped = _nest_params(all_params) dy = _res_restructured(dy, tapes) new_tapes = set_parameters_on_copy(tapes, params_unwrapped) vjp_tapes, processing_fn = qml.gradients.batch_vjp( new_tapes, dy, gradient_fn, reduction=lambda vjps, x: vjps.extend(qml.math.unstack(x)), gradient_kwargs=gradient_kwargs, ) return processing_fn(execute_fn(vjp_tapes)[0]) vjps = tf.py_function( func=_backward, inp=list(all_params) + dy, Tout=[tf.float64] * len(parameters), ) else: dy = _res_restructured(dy, tapes) vjp_tapes, processing_fn = qml.gradients.batch_vjp( tapes, dy, gradient_fn, reduction="append", gradient_kwargs=gradient_kwargs, ) # This is where the magic happens. Note that we call ``execute``. # This recursion, coupled with the fact that the gradient transforms # are differentiable, allows for arbitrary order differentiation. vjps = processing_fn( execute( vjp_tapes, device, execute_fn, gradient_fn, gradient_kwargs, _n=_n + 1, max_diff=max_diff, grad_on_execution=grad_on_execution, ), nums=output_sizes, ) vjps = tf.unstack(tf.concat(vjps, 0), num=len(parameters)) else: # Gradient function is not a gradient transform # (e.g., it might be a device method). # Note that unlike the previous branch: # # - there is no recursion here # - gradient_fn is not differentiable # # so we cannot support higher-order derivatives. len_all_params = len(all_params) def _backward(*all_params): dy = all_params[len_all_params : -len(tapes)] multi_measurements = all_params[-len(tapes) :] all_params = all_params[:len_all_params] params_unwrapped = _nest_params(all_params) new_tapes = set_parameters_on_copy(tapes, params_unwrapped) jac = gradient_fn(new_tapes, **gradient_kwargs) vjps = _compute_vjp(dy, jac, multi_measurements, has_partitioned_shots) return vjps vjps = tf.numpy_function( func=_backward, inp=list(all_params) + dy + multi_measurements, Tout=[tf.float64] * len(parameters), ) if not isinstance(vjps, (list, tuple)): vjps = [vjps] vjps = iter(vjps) vjps = [next(vjps) if x in trainable else None for x in range(len(all_params))] variables = tfkwargs.get("variables") return (vjps, variables) if variables is not None else vjps return res, grad_fn return _execute(*all_params)