Source code for pennylane.optimize.qnspsa
# Copyright 2018-2021 Xanadu Quantum Technologies Inc.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Quantum natural SPSA optimizer"""
import warnings
from scipy.linalg import sqrtm
from pennylane import numpy as pnp
from pennylane.measurements import probs
from pennylane.ops import adjoint
from pennylane.tape import QuantumScript
from pennylane.workflow import construct_tape, execute
[docs]
class QNSPSAOptimizer:
r"""Quantum natural SPSA (QNSPSA) optimizer. QNSPSA is a second-order SPSA algorithm, which
updates the ansatz parameters with the following equation:
.. math::
\mathbf{x}^{(t + 1)} = \mathbf{x}^{(t)} -
\eta \widehat{\mathbf{g}}^{-1}(\mathbf{x}^{(t)})\widehat{\nabla f}(\mathbf{x}^{(t)}),
where :math:`f(\mathbf{x})` is the objective function with input parameters :math:`\mathbf{x}`,
while :math:`\nabla f` is the gradient, :math:`\mathbf{g}` is the second-order Fubini-Study metric
tensor. With QNSPSA algorithm, both the gradient and the metric tensor are estimated
stochastically, with :math:`\widehat{\nabla f}` and :math:`\widehat{\mathbf{g}}`. This stochastic
approach requires only a fixed number of circuit executions per optimization step,
independent of the problem size. This preferred scaling makes
it a promising candidate for the optimization tasks for high-dimensional ansatzes. On the
other hand, the introduction of the Fubini-Study metric into the optimization helps to find
better minima and allows for faster convergence.
The gradient is estimated similarly as the `SPSA optimizer
<https://pennylane.readthedocs.io/en/stable/code/api/pennylane.SPSAOptimizer.html>`_, with a
pair of perturbations:
.. math::
\widehat{\nabla f}(\mathbf{x}) = \widehat{\nabla f}(\mathbf{x}, \mathbf{h})
\approx \frac{1}{2\epsilon}\big(f(\mathbf{x} + \epsilon \mathbf{h}) - f(\mathbf{x} - \epsilon \mathbf{h})\big),
where :math:`\epsilon` is the finite-difference step size specified by the user, and
:math:`\mathbf{h}` is a randomly sampled direction vector to perform the perturbation.
The Fubini-Study metric tensor is estimated with another two pairs of perturbations along
randomly sampled directions :math:`\mathbf{h_1}` and :math:`\mathbf{h_2}`:
.. math::
\widehat{\mathbf{g}}(\mathbf{x}) = \widehat{\mathbf{g}}(\mathbf{x}, \mathbf{h}_1, \mathbf{h}_2)
\approx \frac{\delta F}{8 \epsilon^2}\Big(\mathbf{h}_1 \mathbf{h}_2^\intercal + \mathbf{h}_2 \mathbf{h}_1^\intercal\Big),
where :math:`F(\mathbf{x}', \mathbf{x}) = \bigr\rvert\langle \phi(\mathbf{x}') | \phi(\mathbf{x}) \rangle \bigr\rvert ^ 2`
measures the state overlap between :math:`\phi(\mathbf{x}')` and :math:`\phi(\mathbf{x})`,
where :math:`\phi` is the parametrized ansatz. The finite difference :math:`\delta F` is
computed from the two perturbations:
.. math::
\delta F = F(\mathbf{x, \mathbf{x} + \epsilon \mathbf{h}_1} + \epsilon \mathbf{h}_2)
- F (\mathbf{x, \mathbf{x} + \epsilon \mathbf{h}_1}) - F(\mathbf{x, \mathbf{x}
- \epsilon \mathbf{h}_1} + \epsilon \mathbf{h}_2)
+ F(\mathbf{x, \mathbf{x} - \epsilon \mathbf{h}_1}).
For more details, see:
Julien Gacon, Christa Zoufal, Giuseppe Carleo, and Stefan Woerner.
"Simultaneous Perturbation Stochastic Approximation of the Quantum Fisher Information."
`Quantum, 5, 567 <https://quantum-journal.org/papers/q-2021-10-20-567/>`_, 2021.
You can also find a walkthrough of the implementation in this `tutorial <demos/qnspsa>`__.
**Examples:**
For VQE/VQE-like problems, the objective function can be defined within a qnode:
>>> num_qubits = 2
>>> dev = qml.device("default.qubit", wires=num_qubits)
>>> @qml.qnode(dev)
... def cost(params):
... qml.RX(params[0], wires=0)
... qml.CRY(params[1], wires=[0, 1])
... return qml.expval(qml.Z(0) @ qml.Z(1))
Once constructed, the qnode can be passed directly to the ``step`` or ``step_and_cost``
function of the optimizer.
>>> from pennylane import numpy as np
>>> params = np.random.rand(2)
>>> opt = QNSPSAOptimizer(stepsize=5e-2)
>>> for i in range(51):
>>> params, loss = opt.step_and_cost(cost, params)
>>> if i % 10 == 0:
... print(f"Step {i}: cost = {loss:.4f}")
Step 0: cost = 0.9987
Step 10: cost = 0.9841
Step 20: cost = 0.8921
Step 30: cost = 0.0910
Step 40: cost = -0.9369
Step 50: cost = -0.9984
Keyword Args:
stepsize (float): the user-defined hyperparameter :math:`\eta` for learning rate (default: 1e-3)
regularization (float): regularization term :math:`\beta` to the Fubini-Study metric tensor
for numerical stability (default: 1e-3)
finite_diff_step (float): step size :math:`\epsilon` to compute the finite difference
gradient and the Fubini-Study metric tensor (default: 1e-2)
resamplings (int): the number of samples to average for each parameter update (default: 1)
blocking (boolean): when set to be True, the optimizer only accepts updates that lead to a
loss value no larger than the loss value before update, plus a tolerance. The tolerance
is set with the hyperparameter ``history_length``. The ``blocking`` option is
observed to help the optimizer to converge significantly faster (default: True)
history_length (int): when ``blocking`` is True, the tolerance is set to be the average of
the cost values in the last ``history_length`` steps (default: 5)
seed (int): seed for the random sampling (default: None)
"""
# pylint: disable=too-many-arguments
# pylint: disable=too-many-instance-attributes
# pylint: disable=too-many-positional-arguments
def __init__(
self,
stepsize=1e-3,
regularization=1e-3,
finite_diff_step=1e-2,
resamplings=1,
blocking=True,
history_length=5,
seed=None,
):
self.stepsize = stepsize
self.reg = regularization
self.finite_diff_step = finite_diff_step
self.metric_tensor = None
self.k = 1
self.resamplings = resamplings
self.blocking = blocking
self.last_n_steps = pnp.zeros(history_length)
self.rng = pnp.random.default_rng(seed)
[docs]
def step(self, cost, *args, **kwargs):
"""Update trainable arguments with one step of the optimizer.
.. note::
When blocking is set to be True, ``step`` calls ``step_and_cost`` on the backend, as loss
measurements are required by the algorithm in this scenario.
Args:
cost (QNode): the QNode wrapper for the objective function for optimization
args : variable length argument list for qnode
kwargs : variable length of keyword arguments for the qnode
Returns:
pnp.ndarray: the new variable values after step-wise update :math:`x^{(t+1)}`
"""
if self.blocking:
warnings.warn(
"step_and_cost() instead of step() is called when "
"blocking is turned on, as the step-wise loss value "
"is required by the algorithm.",
stacklevel=2,
)
return self.step_and_cost(cost, *args, **kwargs)[0]
return self._step_core(cost, args, kwargs)
[docs]
def step_and_cost(self, cost, *args, **kwargs):
r"""Update trainable parameters with one step of the optimizer and return
the corresponding objective function value after the step.
Args:
cost (QNode): the QNode wrapper for the objective function for optimization
args : variable length argument list for qnode
kwargs : variable length of keyword arguments for the qnode
Returns:
(np.array, float): the new variable values :math:`x^{(t+1)}` and the objective
function output prior to the step
"""
params_next = self._step_core(cost, args, kwargs)
if not self.blocking:
loss_curr = cost(*args, **kwargs)
return params_next, loss_curr
params_next, loss_curr = self._apply_blocking(cost, args, kwargs, params_next)
return params_next, loss_curr
def _step_core(self, cost, args, kwargs):
"""Core step function that returns the updated parameter before blocking condition
is applied.
Args:
cost (QNode): the QNode wrapper for the objective function for optimization
args : variable length argument list for qnode
kwargs : variable length of keyword arguments for the qnode
Returns:
pnp.ndarray: the new variable values :math:`x^{(t+1)}` before the blocking condition
is applied.
"""
all_grad_tapes = []
all_grad_dirs = []
all_metric_tapes = []
all_tensor_dirs = []
for _ in range(self.resamplings):
# grad_tapes contains 2 tapes for the gradient estimation
grad_tapes, grad_dirs = self._get_spsa_grad_tapes(cost, args, kwargs)
# metric_tapes contains 4 tapes for tensor estimation
metric_tapes, tensor_dirs = self._get_tensor_tapes(cost, args, kwargs)
all_grad_tapes += grad_tapes
all_metric_tapes += metric_tapes
all_grad_dirs.append(grad_dirs)
all_tensor_dirs.append(tensor_dirs)
# nosemgrep
raw_results = execute(all_grad_tapes + all_metric_tapes, cost.device)
grads = [
self._post_process_grad(raw_results[2 * i : 2 * i + 2], all_grad_dirs[i])
for i in range(self.resamplings)
]
grads = pnp.array(grads)
metric_tensors = [
self._post_process_tensor(
raw_results[2 * self.resamplings + 4 * i : 2 * self.resamplings + 4 * i + 4],
all_tensor_dirs[i],
)
for i in range(self.resamplings)
]
metric_tensors = pnp.array(metric_tensors)
grad_avg = pnp.mean(grads, axis=0)
tensor_avg = pnp.mean(metric_tensors, axis=0)
self._update_tensor(tensor_avg)
params_next = self._get_next_params(args, grad_avg)
return params_next[0] if len(params_next) == 1 else params_next
def _post_process_grad(self, grad_raw_results, grad_dirs):
r"""Post process the gradient tape results to get the SPSA gradient estimation.
Args:
grad_raw_results list[np.array]: list of the two qnode results with input parameters
perturbed along the ``grad_dirs`` directions
grad_dirs list[np.array]: list of perturbation arrays along which the SPSA
gradients are estimated
Returns:
list[np.array]: list of gradient arrays. Each gradient array' dimension matches
the shape of the corresponding input parameter
"""
loss_plus, loss_minus = grad_raw_results
return [
(loss_plus - loss_minus) / (2 * self.finite_diff_step) * grad_dir
for grad_dir in grad_dirs
]
def _post_process_tensor(self, tensor_raw_results, tensor_dirs):
r"""Post process the corresponding tape results to get the metric tensor estimation.
Args:
tensor_raw_results list[np.array]: list of the four perturbed qnode results to compute
the estimated metric tensor
tensor_dirs list[np.array]: list of the two perturbation directions used to compute
the metric tensor estimation. Perturbations on the different input parameters have
been concatenated
Returns:
pnp.array: estimated Fubini-Study metric tensor
"""
tensor_raw_results = [result.squeeze() for result in tensor_raw_results]
# For each element of tensor_raw_results, the first dimension is the measured probability in
# the computational ket{0} state, which equals the state overlap between the perturbed and
# unperturbed circuits.
tensor_finite_diff = (
tensor_raw_results[0][0]
- tensor_raw_results[1][0]
- tensor_raw_results[2][0]
+ tensor_raw_results[3][0]
)
return (
-(
pnp.tensordot(tensor_dirs[0], tensor_dirs[1], axes=0)
+ pnp.tensordot(tensor_dirs[1], tensor_dirs[0], axes=0)
)
* tensor_finite_diff
/ (8 * self.finite_diff_step**2)
)
def _get_next_params(self, args, gradient):
params = []
non_trainable_indices = []
for idx, arg in enumerate(args):
if not getattr(arg, "requires_grad", False):
non_trainable_indices.append(idx)
continue
# if an input parameter is a zero-dimension array, add one dimension to form an array.
# The returned result will not drop this added dimension
if arg.shape == ():
arg = arg.reshape(-1)
params.append(arg)
# params_vec and grad_vec group multiple inputs into the same vector to solve the
# linear equation
params_vec = pnp.concatenate([param.reshape(-1) for param in params])
grad_vec = pnp.concatenate([grad.reshape(-1) for grad in gradient])
new_params_vec = pnp.matmul(
pnp.linalg.pinv(self.metric_tensor),
(-self.stepsize * grad_vec + pnp.matmul(self.metric_tensor, params_vec)),
)
# reshape single-vector new_params_vec into new_params, to match the input params
params_split_indices = []
tmp = 0
for param in params:
tmp += param.size
params_split_indices.append(tmp)
new_params = pnp.split(new_params_vec, params_split_indices)
new_params_reshaped = [new_params[i].reshape(params[i].shape) for i in range(len(params))]
next_args = []
non_trainable_idx = 0
trainable_idx = 0
# merge trainables and non-trainables into the original order
for idx, arg in enumerate(args):
if (
non_trainable_idx < len(non_trainable_indices)
and idx == non_trainable_indices[non_trainable_idx]
):
next_args.append(arg)
non_trainable_idx += 1
continue
next_args.append(new_params_reshaped[trainable_idx])
trainable_idx += 1
return next_args
def _get_spsa_grad_tapes(self, cost, args, kwargs):
dirs = []
args_plus = list(args)
args_minus = list(args)
for index, arg in enumerate(args):
if not getattr(arg, "requires_grad", False):
continue
direction = self.rng.choice([-1, 1], size=arg.shape)
dirs.append(direction)
args_plus[index] = arg + self.finite_diff_step * direction
args_minus[index] = arg - self.finite_diff_step * direction
tape = construct_tape(cost)(*args_plus, **kwargs)
tape_plus = tape.copy(copy_operations=True)
tape = construct_tape(cost)(*args_minus, **kwargs)
tape_minus = tape.copy(copy_operations=True)
return [tape_plus, tape_minus], dirs
def _update_tensor(self, tensor_raw):
def get_tensor_moving_avg(metric_tensor):
if self.metric_tensor is None:
self.metric_tensor = pnp.identity(metric_tensor.shape[0])
return self.k / (self.k + 1) * self.metric_tensor + 1 / (self.k + 1) * metric_tensor
def regularize_tensor(metric_tensor):
tensor_reg = pnp.real(sqrtm(pnp.matmul(metric_tensor, metric_tensor)))
return (tensor_reg + self.reg * pnp.identity(metric_tensor.shape[0])) / (1 + self.reg)
tensor_avg = get_tensor_moving_avg(tensor_raw)
tensor_regularized = regularize_tensor(tensor_avg)
self.metric_tensor = tensor_regularized
self.k += 1
def _get_tensor_tapes(self, cost, args, kwargs):
dir1_list = []
dir2_list = []
args_list = [list(args) for _ in range(4)]
for index, arg in enumerate(args):
if not getattr(arg, "requires_grad", False):
continue
dir1 = self.rng.choice([-1, 1], size=arg.shape)
dir2 = self.rng.choice([-1, 1], size=arg.shape)
dir1_list.append(dir1.reshape(-1))
dir2_list.append(dir2.reshape(-1))
args_list[0][index] = arg + self.finite_diff_step * (dir1 + dir2)
args_list[1][index] = arg + self.finite_diff_step * dir1
args_list[2][index] = arg + self.finite_diff_step * (-dir1 + dir2)
args_list[3][index] = arg - self.finite_diff_step * dir1
dir_vecs = (pnp.concatenate(dir1_list), pnp.concatenate(dir2_list))
tapes = [
self._get_overlap_tape(cost, args, args_finite_diff, kwargs)
for args_finite_diff in args_list
]
return tapes, dir_vecs
def _get_overlap_tape(self, cost, args1, args2, kwargs):
# the returned tape effectively measure the fidelity between the two parametrized circuits
# with input args1 and args2. The measurement results of the tape are an array of probabilities
# in the computational basis. The first element of the array represents the probability in
# \ket{0}, which equals the fidelity.
op_forward = self._get_operations(cost, args1, kwargs)
op_inv = self._get_operations(cost, args2, kwargs)
new_ops = op_forward + [adjoint(op) for op in reversed(op_inv)]
tape = construct_tape(cost)(*args1, **kwargs)
return QuantumScript(new_ops, [probs(wires=tape.wires.labels)])
@staticmethod
def _get_operations(cost, args, kwargs):
tape = construct_tape(cost)(*args, **kwargs)
return tape.operations
def _apply_blocking(self, cost, args, kwargs, params_next):
tape = construct_tape(cost)(*args, **kwargs)
tape_loss_curr = tape.copy(copy_operations=True)
if not isinstance(params_next, list):
params_next = [params_next]
tape = construct_tape(cost)(*params_next, **kwargs)
tape_loss_next = tape.copy(copy_operations=True)
program = cost.device.preprocess_transforms()
loss_curr, loss_next = execute(
[tape_loss_curr, tape_loss_next], cost.device, None, transform_program=program
)
# self.k has been updated earlier
ind = (self.k - 2) % self.last_n_steps.size
self.last_n_steps[ind] = loss_curr
tol = (
2 * self.last_n_steps.std()
if self.k > self.last_n_steps.size
else 2 * self.last_n_steps[: self.k - 1].std()
)
if loss_curr + tol < loss_next:
params_next = args
if len(params_next) == 1:
return params_next[0], loss_curr
return params_next, loss_curr
_modules/pennylane/optimize/qnspsa
Download Python script
Download Notebook
View on GitHub