Source code for pennylane.labs.trotter_error.product_formulas.error

# Copyright 2025 Xanadu Quantum Technologies Inc.

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at

#     http://www.apache.org/licenses/LICENSE-2.0

# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Functions for retreiving effective error from fragments"""

import copy
from collections import Counter, defaultdict
from collections.abc import Hashable, Sequence

from pennylane import concurrency
from pennylane.labs.trotter_error import AbstractState, Fragment
from pennylane.labs.trotter_error.abstract import nested_commutator
from pennylane.labs.trotter_error.product_formulas.bch import bch_expansion
from pennylane.labs.trotter_error.product_formulas.product_formula import ProductFormula

# pylint: disable=too-many-arguments, too-many-positional-arguments


class _AdditiveIdentity:
    """Only used to initialize accumulators for summing Fragments"""

    def __add__(self, other):
        return other

    def __radd__(self, other):
        return other


[docs] def effective_hamiltonian( product_formula: ProductFormula, fragments: dict[Hashable, Fragment], order: int, timestep: float = 1.0, num_workers: int = 1, backend: str = "serial", ): r"""Compute the effective Hamiltonian :math:`\hat{H}_{eff} = \hat{H} + \hat{\epsilon}` that corresponds to a given product formula. Args: product_formula (ProductFormula): A product formula used to approximate the time-evolution operator for a Hamiltonian. fragments (Dict[Hashable, :class:`~.pennylane.labs.trotter_error.Fragment`): The fragments that sum to the Hamiltonian. The keys in the dictionary must match the labels used to build the :class:`~.pennylane.labs.trotter_error.ProductFormula` object. order (int): The order of the approximatation. timestep (float): The timestep for simulation. num_workers (int): the number of concurrent units used for the computation. Default value is set to 1. backend (string): the executor backend from the list of supported backends. Available options : "mp_pool", "cf_procpool", "cf_threadpool", "serial", "mpi4py_pool", "mpi4py_comm". Default value is set to "serial". **Example** >>> import numpy as np >>> from pennylane.labs.trotter_error.fragments import vibrational_fragments >>> from pennylane.labs.trotter_error.product_formulas import ProductFormula, effective_hamiltonian >>> >>> n_modes = 4 >>> r_state = np.random.RandomState(42) >>> freqs = r_state.random(4) >>> taylor_coeffs = [ >>> np.array(0), >>> r_state.random(size=(n_modes, )), >>> r_state.random(size=(n_modes, n_modes)), >>> r_state.random(size=(n_modes, n_modes, n_modes)) >>> ] >>> >>> delta = 0.001 >>> frag_labels = [0, 1, 1, 0] >>> frag_coeffs = [1/2, 1/2, 1/2, 1/2] >>> >>> pf = ProductFormula(frag_labels, coeffs=frag_coeffs) >>> frags = dict(enumerate(vibrational_fragments(n_modes, freqs, taylor_coeffs))) >>> type(effective_hamiltonian(pf, frags, order=5, timestep=delta)) <class 'pennylane.labs.trotter_error.realspace.realspace_operator.RealspaceSum'> """ if not product_formula.fragments.issubset(fragments.keys()): raise ValueError("Fragments do not match product formula") bch = bch_expansion(product_formula(1j * timestep), order) executor = concurrency.backends.get_executor(backend) with executor(max_workers=num_workers) as ex: partial_sum = ex.starmap( _eval_commutator, [ (commutator, coeff, fragments) for ith_order in bch for commutator, coeff in ith_order.items() ], ) eff = _AdditiveIdentity() for term in partial_sum: eff += term return eff
def _eval_commutator(commutator, coeff, fragments): r"""Computes a commutator after replacing symbols in the commutator with concrete fragments. Args: commutator: commutator to be evaluated coeff (complex): coefficient associated with the commutator fragments (dict): dictionary representing a sequence of fragments Returns: ndarray: the evaluated form of the commutator """ return coeff * nested_commutator(_insert_fragments(commutator, fragments)) def _insert_fragments( commutator: tuple[Hashable], fragments: dict[Hashable, Fragment] ) -> tuple[Fragment]: """This function transforms a commutator of labels to a commutator of concrete `Fragment` objects. The function recurses through the nested structure of the tuple replacing each hashable `label` with the concrete value `fragments[label]`.""" return tuple( _insert_fragments(term, fragments) if isinstance(term, tuple) else fragments[term] for term in commutator )
[docs] def perturbation_error( product_formula: ProductFormula, fragments: dict[Hashable, Fragment], states: Sequence[AbstractState], max_order: int, timestep: float = 1.0, num_workers: int = 1, backend: str = "serial", parallel_mode: str = "state", ) -> list[float]: r"""Computes the perturbation theory error using the effective Hamiltonian :math:`\hat{\epsilon} = \hat{H}_{eff} - \hat{H}` for a given product formula. For a state :math:`\left| \psi \right\rangle` the perturbation theory error is given by the expectation value :math:`\left\langle \psi \right| \hat{\epsilon} \left| \psi \right\rangle`. Args: product_formula (ProductFormula): the :class:`~.pennylane.labs.trotter_error.ProductFormula` used to obtain the effective Hamiltonian fragments (Sequence[Fragments]): the set of :class:`~.pennylane.labs.trotter_error.Fragment` objects to compute the perturbation error from states (Sequence[AbstractState]): the states to compute expectation values from max_order (float): the maximum commutator order to compute in BCH timestep (float): time step for the Trotter error operator. num_workers (int): the number of concurrent units used for the computation. Default value is set to 1. backend (string): the executor backend from the list of supported backends. Available options : "mp_pool", "cf_procpool", "cf_threadpool", "serial", "mpi4py_pool", "mpi4py_comm". Default value is set to "serial". parallel_mode (str): the mode of parallelization to use. Options are "state" or "commutator". "state" parallelizes the computation of expectation values per state, while "commutator" parallelizes the application of commutators to each state. Default value is set to "state". Returns: List[Dict[int, float]]: the list of dictionaries of expectation values computed from the Trotter error operator and the input states. The dictionary is indexed by the commutator orders and its value is the error obtained from the commutators of that order. **Example** >>> import numpy as np >>> from pennylane.labs.trotter_error import HOState, ProductFormula, vibrational_fragments, perturbation_error >>> >>> frag_labels = [0, 1, 1, 0] >>> frag_coeffs = [1/2, 1/2, 1/2, 1/2] >>> pf = ProductFormula(frag_labels, coeffs=frag_coeffs) >>> >>> n_modes = 2 >>> r_state = np.random.RandomState(42) >>> freqs = r_state.random(n_modes) >>> taylor_coeffs = [ >>> np.array(0), >>> r_state.random(size=(n_modes, )), >>> r_state.random(size=(n_modes, n_modes)), >>> r_state.random(size=(n_modes, n_modes, n_modes)) >>> ] >>> frags = dict(enumerate(vibrational_fragments(n_modes, freqs, taylor_coeffs))) >>> >>> gridpoints = 5 >>> state1 = HOState(n_modes, gridpoints, {(0, 0): 1}) >>> state2 = HOState(n_modes, gridpoints, {(1, 1): 1}) >>> >>> errors = perturbation_error(pf, frags, [state1, state2], max_order=3) >>> print(errors) [{3: 0.9189251160920876j}, {3: 4.7977166824268505j}] """ if not product_formula.fragments.issubset(fragments.keys()): raise ValueError("Fragments do not match product formula") commutator_lists = [ _group_sums(commutators) for commutators in bch_expansion(product_formula, max_order)[1:] ] if backend == "serial": assert num_workers == 1, "num_workers must be set to 1 for serial execution." expectations = [] for state in states: expectation = 0 for commutators in commutator_lists: if len(commutators) == 0: continue order = len(commutators[0]) for commutator in commutators: expectation += _compute_expectation(commutator, fragments, state) expectations.append({order: (1j * timestep) ** order * expectation}) return expectations if parallel_mode == "state": executor = concurrency.backends.get_executor(backend) with executor(max_workers=num_workers) as ex: expectations = ex.starmap( _get_expval_state, [(commutator_lists, fragments, state, timestep) for state in states], ) return expectations if parallel_mode == "commutator": executor = concurrency.backends.get_executor(backend) errors = [] commutators = [x for xs in commutator_lists for x in xs] for state in states: with executor(max_workers=num_workers) as ex: applied_commutators = ex.starmap( _compute_expectation_track_order, [(commutator, fragments, state) for commutator in commutators], ) expectations = defaultdict(int) for expectation, order in applied_commutators: expectations[order] += expectation errors.append( { order: (1j * timestep) ** order * expectation for order, expectation in expectations.items() } ) return errors raise ValueError("Invalid parallel mode. Choose 'state' or 'commutator'.")
def _get_expval_state(commutator_lists, fragments, state: AbstractState, timestep: float) -> float: """Returns the expectation value of ``state`` with respect to the operator obtained by substituting ``fragments`` into ``commutators``.""" expectations = {} for commutators in commutator_lists: if len(commutators) == 0: continue order = len(commutators[0]) expectation = sum( _compute_expectation(commutator, fragments, state) for commutator in commutators ) expectations[order] = (1j * timestep) ** order * expectation return expectations def _compute_expectation( commutator: tuple[Hashable], fragments: dict[Hashable, Fragment], state: AbstractState ) -> complex: """Returns the expectation value obtained from applying ``commutator`` to ``state``.""" new_state = _AdditiveIdentity() for term, coeff in _op_list(commutator).items(): tmp_state = copy.copy(state) for frag in reversed(term): if isinstance(frag, frozenset): frag = sum( (frag_coeff * fragments[x] for x, frag_coeff in frag), _AdditiveIdentity() ) else: frag = fragments[frag] tmp_state = frag.apply(tmp_state) new_state += coeff * tmp_state return state.dot(new_state) def _compute_expectation_track_order( commutator: tuple[Hashable], fragments: dict[Hashable, Fragment], state: AbstractState ) -> tuple[complex, int]: """Returns the expectation value obtained from applying ``commutator`` to ``state``.""" new_state = _AdditiveIdentity() for term, coeff in _op_list(commutator).items(): tmp_state = copy.copy(state) for frag in reversed(term): if isinstance(frag, frozenset): frag = sum( (frag_coeff * fragments[x] for x, frag_coeff in frag), _AdditiveIdentity() ) else: frag = fragments[frag] tmp_state = frag.apply(tmp_state) new_state += coeff * tmp_state return state.dot(new_state), len(commutator) def _op_list(commutator) -> dict[tuple[Hashable], complex]: """Returns the operations needed to apply the commutator to a state.""" if not commutator: return Counter() head, *tail = commutator if not tail: return Counter({(head,): 1}) tail_ops_coeffs = _op_list(tuple(tail)) ops1 = Counter({(head, *ops): coeff for ops, coeff in tail_ops_coeffs.items()}) ops2 = Counter({(*ops, head): -coeff for ops, coeff in tail_ops_coeffs.items()}) ops1.update(ops2) return ops1 def _group_sums(term_dict: dict[tuple[Hashable], complex]) -> list[tuple[Hashable | set]]: """Reduce the number of commutators by grouping them using linearity in the first argument. For example, two commutators a*[X, A, B] and b*Y[A, B] will be merged into one commutator [a*X + b*Y, A, B]. """ grouped_comms = defaultdict(set) for commutator, coeff in term_dict.items(): head, *tail = commutator tail = tuple(tail) grouped_comms[tail].add((head, coeff)) return [(frozenset(heads), *tail) for tail, heads in grouped_comms.items()]