Source code for pennylane.devices.qubit.sampling
# Copyright 2018-2023 Xanadu Quantum Technologies Inc.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Functions to sample a state."""
from typing import Union
import numpy as np
import pennylane as qml
from pennylane.measurements import (
ClassicalShadowMP,
CountsMP,
ExpectationMP,
SampleMeasurement,
ShadowExpvalMP,
Shots,
)
from pennylane.ops import Hamiltonian, LinearCombination, Prod, SProd, Sum
from pennylane.typing import TensorLike
from .apply_operation import apply_operation
from .measure import flatten_state
def jax_random_split(prng_key, num: int = 2):
"""Get a new key with ``jax.random.split``."""
if prng_key is None:
return (None,) * num
# pylint: disable=import-outside-toplevel
from jax.random import split
return split(prng_key, num=num)
def _group_measurements(mps: list[Union[SampleMeasurement, ClassicalShadowMP, ShadowExpvalMP]]):
"""
Group the measurements such that:
- measurements with pauli observables pairwise-commute in each group
- measurements with observables that are not pauli words are all in different groups
- measurements without observables are all in the same group
- classical shadow measurements are all in different groups
"""
if len(mps) == 1:
return [mps], [[0]]
# measurements with pauli-word observables
mp_pauli_obs = []
# measurements with non pauli-word observables
mp_other_obs = []
mp_other_obs_indices = []
# measurements with no observables
mp_no_obs = []
mp_no_obs_indices = []
for i, mp in enumerate(mps):
if isinstance(mp.obs, (Sum, SProd, Prod)):
mps[i].obs = qml.simplify(mp.obs)
if isinstance(mp, (ClassicalShadowMP, ShadowExpvalMP)):
mp_other_obs.append([mp])
mp_other_obs_indices.append([i])
elif mp.obs is None:
mp_no_obs.append(mp)
mp_no_obs_indices.append(i)
elif qml.pauli.is_pauli_word(mp.obs):
mp_pauli_obs.append((i, mp))
else:
mp_other_obs.append([mp])
mp_other_obs_indices.append([i])
if mp_pauli_obs:
i_to_pauli_mp = dict(mp_pauli_obs)
_, group_indices = qml.pauli.group_observables(
[mp.obs for mp in i_to_pauli_mp.values()], list(i_to_pauli_mp.keys())
)
mp_pauli_groups = []
for indices in group_indices:
mp_group = [i_to_pauli_mp[i] for i in indices]
mp_pauli_groups.append(mp_group)
else:
mp_pauli_groups, group_indices = [], []
mp_no_obs_indices = [mp_no_obs_indices] if mp_no_obs else []
mp_no_obs = [mp_no_obs] if mp_no_obs else []
all_mp_groups = mp_pauli_groups + mp_no_obs + mp_other_obs
all_indices = group_indices + mp_no_obs_indices + mp_other_obs_indices
return all_mp_groups, all_indices
def _get_num_executions_for_expval_H(obs):
indices = obs.grouping_indices
if indices:
return len(indices)
return _get_num_wire_groups_for_expval_H(obs)
def _get_num_wire_groups_for_expval_H(obs):
_, obs_list = obs.terms()
wires_list = []
added_obs = []
num_groups = 0
for o in obs_list:
if o in added_obs:
continue
if isinstance(o, qml.Identity):
continue
added = False
for wires in wires_list:
if len(qml.wires.Wires.shared_wires([wires, o.wires])) == 0:
added_obs.append(o)
added = True
break
if not added:
added_obs.append(o)
wires_list.append(o.wires)
num_groups += 1
return num_groups
def _get_num_executions_for_sum(obs):
if obs.grouping_indices:
return len(obs.grouping_indices)
if not obs.pauli_rep:
return sum(int(not isinstance(o, qml.Identity)) for o in obs.terms()[1])
_, ops = obs.terms()
with qml.QueuingManager.stop_recording():
op_groups = qml.pauli.group_observables(ops)
return len(op_groups)
# pylint: disable=no-member
def get_num_shots_and_executions(tape: qml.tape.QuantumScript) -> tuple[int, int]:
"""Get the total number of qpu executions and shots.
Args:
tape (qml.tape.QuantumTape): the tape we want to get the number of executions and shots for
Returns:
int, int: the total number of QPU executions and the total number of shots
"""
groups, _ = _group_measurements(tape.measurements)
num_executions = 0
num_shots = 0
for group in groups:
if isinstance(group[0], ExpectationMP) and isinstance(
group[0].obs, (qml.ops.Hamiltonian, qml.ops.LinearCombination)
):
H_executions = _get_num_executions_for_expval_H(group[0].obs)
num_executions += H_executions
if tape.shots:
num_shots += tape.shots.total_shots * H_executions
elif isinstance(group[0], ExpectationMP) and isinstance(group[0].obs, qml.ops.Sum):
sum_executions = _get_num_executions_for_sum(group[0].obs)
num_executions += sum_executions
if tape.shots:
num_shots += tape.shots.total_shots * sum_executions
elif isinstance(group[0], (ClassicalShadowMP, ShadowExpvalMP)):
num_executions += tape.shots.total_shots
if tape.shots:
num_shots += tape.shots.total_shots
else:
num_executions += 1
if tape.shots:
num_shots += tape.shots.total_shots
if tape.batch_size:
num_executions *= tape.batch_size
if tape.shots:
num_shots *= tape.batch_size
return num_executions, num_shots
def _apply_diagonalizing_gates(
mps: list[SampleMeasurement], state: np.ndarray, is_state_batched: bool = False
):
if len(mps) == 1:
diagonalizing_gates = mps[0].diagonalizing_gates()
elif all(mp.obs for mp in mps):
diagonalizing_gates = qml.pauli.diagonalize_qwc_pauli_words([mp.obs for mp in mps])[0]
else:
diagonalizing_gates = []
for op in diagonalizing_gates:
state = apply_operation(op, state, is_state_batched=is_state_batched)
return state
# pylint:disable = too-many-arguments
[docs]def measure_with_samples(
measurements: list[Union[SampleMeasurement, ClassicalShadowMP, ShadowExpvalMP]],
state: np.ndarray,
shots: Shots,
is_state_batched: bool = False,
rng=None,
prng_key=None,
mid_measurements: dict = None,
) -> list[TensorLike]:
"""
Returns the samples of the measurement process performed on the given state.
This function assumes that the user-defined wire labels in the measurement process
have already been mapped to integer wires used in the device.
Args:
measurements (List[Union[SampleMeasurement, ClassicalShadowMP, ShadowExpvalMP]]):
The sample measurements to perform
state (np.ndarray[complex]): The state vector to sample from
shots (Shots): The number of samples to take
is_state_batched (bool): whether the state is batched or not
rng (Union[None, int, array_like[int], SeedSequence, BitGenerator, Generator]): A
seed-like parameter matching that of ``seed`` for ``numpy.random.default_rng``.
If no value is provided, a default RNG will be used.
prng_key (Optional[jax.random.PRNGKey]): An optional ``jax.random.PRNGKey``. This is
the key to the JAX pseudo random number generator. Only for simulation using JAX.
mid_measurements (None, dict): Dictionary of mid-circuit measurements
Returns:
List[TensorLike[Any]]: Sample measurement results
"""
# last N measurements are sampling MCMs in ``dynamic_one_shot`` execution mode
mps = measurements[0 : -len(mid_measurements)] if mid_measurements else measurements
groups, indices = _group_measurements(mps)
all_res = []
for group in groups:
if isinstance(group[0], ExpectationMP) and isinstance(
group[0].obs, (Hamiltonian, LinearCombination)
):
measure_fn = _measure_hamiltonian_with_samples
elif isinstance(group[0], ExpectationMP) and isinstance(group[0].obs, Sum):
measure_fn = _measure_sum_with_samples
elif isinstance(group[0], (ClassicalShadowMP, ShadowExpvalMP)):
measure_fn = _measure_classical_shadow
else:
# measure with the usual method (rotate into the measurement basis)
measure_fn = _measure_with_samples_diagonalizing_gates
prng_key, key = jax_random_split(prng_key)
all_res.extend(
measure_fn(
group, state, shots, is_state_batched=is_state_batched, rng=rng, prng_key=key
)
)
flat_indices = [_i for i in indices for _i in i]
# reorder results
sorted_res = tuple(
res for _, res in sorted(list(enumerate(all_res)), key=lambda r: flat_indices[r[0]])
)
# append MCM samples
if mid_measurements:
sorted_res += tuple(mid_measurements.values())
# put the shot vector axis before the measurement axis
if shots.has_partitioned_shots:
sorted_res = tuple(zip(*sorted_res))
return sorted_res
def _measure_with_samples_diagonalizing_gates(
mps: list[SampleMeasurement],
state: np.ndarray,
shots: Shots,
is_state_batched: bool = False,
rng=None,
prng_key=None,
) -> TensorLike:
"""
Returns the samples of the measurement process performed on the given state,
by rotating the state into the measurement basis using the diagonalizing gates
given by the measurement process.
Args:
mp (~.measurements.SampleMeasurement): The sample measurement to perform
state (np.ndarray[complex]): The state vector to sample from
shots (~.measurements.Shots): The number of samples to take
is_state_batched (bool): whether the state is batched or not
rng (Union[None, int, array_like[int], SeedSequence, BitGenerator, Generator]): A
seed-like parameter matching that of ``seed`` for ``numpy.random.default_rng``.
If no value is provided, a default RNG will be used.
prng_key (Optional[jax.random.PRNGKey]): An optional ``jax.random.PRNGKey``. This is
the key to the JAX pseudo random number generator. Only for simulation using JAX.
Returns:
TensorLike[Any]: Sample measurement results
"""
# apply diagonalizing gates
state = _apply_diagonalizing_gates(mps, state, is_state_batched)
total_indices = len(state.shape) - is_state_batched
wires = qml.wires.Wires(range(total_indices))
def _process_single_shot(samples):
processed = []
for mp in mps:
res = mp.process_samples(samples, wires)
if not isinstance(mp, CountsMP):
res = qml.math.squeeze(res)
processed.append(res)
return tuple(processed)
try:
prng_key, _ = jax_random_split(prng_key)
samples = sample_state(
state,
shots=shots.total_shots,
is_state_batched=is_state_batched,
wires=wires,
rng=rng,
prng_key=prng_key,
)
except ValueError as e:
if str(e) != "probabilities contain NaN":
raise e
samples = qml.math.full((shots.total_shots, len(wires)), 0)
processed_samples = []
for lower, upper in shots.bins():
shot = _process_single_shot(samples[..., lower:upper, :])
processed_samples.append(shot)
if shots.has_partitioned_shots:
return tuple(zip(*processed_samples))
return processed_samples[0]
def _measure_classical_shadow(
mp: list[Union[ClassicalShadowMP, ShadowExpvalMP]],
state: np.ndarray,
shots: Shots,
is_state_batched: bool = False,
rng=None,
prng_key=None,
):
"""
Returns the result of a classical shadow measurement on the given state.
A classical shadow measurement doesn't fit neatly into the current measurement API
since different diagonalizing gates are used for each shot. Here it's treated as a
state measurement with shots instead of a sample measurement.
Args:
mp (~.measurements.SampleMeasurement): The sample measurement to perform
state (np.ndarray[complex]): The state vector to sample from
shots (~.measurements.Shots): The number of samples to take
rng (Union[None, int, array_like[int], SeedSequence, BitGenerator, Generator]): A
seed-like parameter matching that of ``seed`` for ``numpy.random.default_rng``.
If no value is provided, a default RNG will be used.
Returns:
TensorLike[Any]: Sample measurement results
"""
# pylint: disable=unused-argument
# the list contains only one element based on how we group measurements
mp = mp[0]
wires = qml.wires.Wires(range(len(state.shape)))
if shots.has_partitioned_shots:
return [tuple(mp.process_state_with_shots(state, wires, s, rng=rng) for s in shots)]
return [mp.process_state_with_shots(state, wires, shots.total_shots, rng=rng)]
def _measure_hamiltonian_with_samples(
mp: list[SampleMeasurement],
state: np.ndarray,
shots: Shots,
is_state_batched: bool = False,
rng=None,
prng_key=None,
):
# the list contains only one element based on how we group measurements
mp = mp[0]
# if the measurement process involves a Hamiltonian, measure each
# of the terms separately and sum
def _sum_for_single_shot(s, prng_key=None):
results = measure_with_samples(
[ExpectationMP(t) for t in mp.obs.terms()[1]],
state,
s,
is_state_batched=is_state_batched,
rng=rng,
prng_key=prng_key,
)
return sum(c * res for c, res in zip(mp.obs.terms()[0], results))
keys = jax_random_split(prng_key, num=shots.num_copies)
unsqueezed_results = tuple(
_sum_for_single_shot(type(shots)(s), key) for s, key in zip(shots, keys)
)
return [unsqueezed_results] if shots.has_partitioned_shots else [unsqueezed_results[0]]
def _measure_sum_with_samples(
mp: list[SampleMeasurement],
state: np.ndarray,
shots: Shots,
is_state_batched: bool = False,
rng=None,
prng_key=None,
):
# the list contains only one element based on how we group measurements
mp = mp[0]
# if the measurement process involves a Sum, measure each
# of the terms separately and sum
def _sum_for_single_shot(s, prng_key=None):
results = measure_with_samples(
[ExpectationMP(t) for t in mp.obs],
state,
s,
is_state_batched=is_state_batched,
rng=rng,
prng_key=prng_key,
)
return sum(results)
keys = jax_random_split(prng_key, num=shots.num_copies)
unsqueezed_results = tuple(
_sum_for_single_shot(type(shots)(s), key) for s, key in zip(shots, keys)
)
return [unsqueezed_results] if shots.has_partitioned_shots else [unsqueezed_results[0]]
[docs]def sample_state(
state,
shots: int,
is_state_batched: bool = False,
wires=None,
rng=None,
prng_key=None,
) -> np.ndarray:
"""
Returns a series of samples of a state.
Args:
state (array[complex]): A state vector to be sampled
shots (int): The number of samples to take
is_state_batched (bool): whether the state is batched or not
wires (Sequence[int]): The wires to sample
rng (Union[None, int, array_like[int], SeedSequence, BitGenerator, Generator]):
A seed-like parameter matching that of ``seed`` for ``numpy.random.default_rng``.
If no value is provided, a default RNG will be used
prng_key (Optional[jax.random.PRNGKey]): An optional ``jax.random.PRNGKey``. This is
the key to the JAX pseudo random number generator. Only for simulation using JAX.
Returns:
ndarray[int]: Sample values of the shape (shots, num_wires)
"""
total_indices = len(state.shape) - is_state_batched
state_wires = qml.wires.Wires(range(total_indices))
wires_to_sample = wires or state_wires
num_wires = len(wires_to_sample)
flat_state = flatten_state(state, total_indices)
with qml.queuing.QueuingManager.stop_recording():
probs = qml.probs(wires=wires_to_sample).process_state(flat_state, state_wires)
# Keep same interface (e.g. jax) as in the device
return sample_probs(probs, shots, num_wires, is_state_batched, rng, prng_key)
[docs]def sample_probs(probs, shots, num_wires, is_state_batched, rng, prng_key=None):
"""
Sample from given probabilities, dispatching between JAX and NumPy implementations.
Args:
probs (array): The probabilities to sample from
shots (int): The number of samples to take
num_wires (int): The number of wires to sample
is_state_batched (bool): whether the state is batched or not
rng (Union[None, int, array_like[int], SeedSequence, BitGenerator, Generator]):
A seed-like parameter matching that of ``seed`` for ``numpy.random.default_rng``.
If no value is provided, a default RNG will be used
prng_key (Optional[jax.random.PRNGKey]): An optional ``jax.random.PRNGKey``. This is
the key to the JAX pseudo random number generator. Only for simulation using JAX.
"""
if qml.math.get_interface(probs) == "jax" or prng_key is not None:
return _sample_probs_jax(probs, shots, num_wires, is_state_batched, prng_key, seed=rng)
return _sample_probs_numpy(probs, shots, num_wires, is_state_batched, rng)
def _sample_probs_numpy(probs, shots, num_wires, is_state_batched, rng):
"""
Sample from given probabilities using NumPy's random number generator.
Args:
probs (array): The probabilities to sample from
shots (int): The number of samples to take
num_wires (int): The number of wires to sample
is_state_batched (bool): whether the state is batched or not
rng (Union[None, int, array_like[int], SeedSequence, BitGenerator, Generator]):
A seed-like parameter matching that of ``seed`` for ``numpy.random.default_rng``.
If no value is provided, a default RNG will be used
"""
rng = np.random.default_rng(rng)
norm = qml.math.sum(probs, axis=-1)
norm_err = qml.math.abs(norm - 1.0)
cutoff = 1e-07
norm_err = norm_err[..., np.newaxis] if not is_state_batched else norm_err
if qml.math.any(norm_err > cutoff):
raise ValueError("probabilities do not sum to 1")
basis_states = np.arange(2**num_wires)
if is_state_batched:
probs = probs / norm[:, np.newaxis] if norm.shape else probs / norm
samples = np.stack([rng.choice(basis_states, shots, p=p) for p in probs])
else:
probs = probs / norm
samples = rng.choice(basis_states, shots, p=probs)
powers_of_two = 1 << np.arange(num_wires, dtype=np.int64)[::-1]
states_sampled_base_ten = samples[..., None] & powers_of_two
return (states_sampled_base_ten > 0).astype(np.int64)
def _sample_probs_jax(probs, shots, num_wires, is_state_batched, prng_key=None, seed=None):
"""
Returns a series of samples of a state for the JAX interface based on the PRNG.
Args:
probs (array): The probabilities to sample from
shots (int): The number of samples to take
num_wires (int): The number of wires to sample
is_state_batched (bool): whether the state is batched or not
prng_key (Optional[jax.random.PRNGKey]): An optional ``jax.random.PRNGKey``. This is
the key to the JAX pseudo random number generator. Only for simulation using JAX.
seed (Optional[int]): A seed for the random number generator. This is only used if ``prng_key``
is not provided.
Returns:
ndarray[int]: Sample values of the shape (shots, num_wires)
"""
# pylint: disable=import-outside-toplevel
import jax
import jax.numpy as jnp
if prng_key is None:
prng_key = jax.random.PRNGKey(np.random.default_rng(seed).integers(100000))
basis_states = jnp.arange(2**num_wires)
if is_state_batched:
keys = jax_random_split(prng_key, num=probs.shape[0])
samples = jnp.array(
[
jax.random.choice(_key, basis_states, shape=(shots,), p=prob)
for _key, prob in zip(keys, probs)
]
)
else:
_, key = jax_random_split(prng_key)
samples = jax.random.choice(key, basis_states, shape=(shots,), p=probs)
powers_of_two = 1 << jnp.arange(num_wires, dtype=jnp.int64)[::-1]
states_sampled_base_ten = samples[..., None] & powers_of_two
return (states_sampled_base_ten > 0).astype(jnp.int64)
_modules/pennylane/devices/qubit/sampling
Download Python script
Download Notebook
View on GitHub