Files
qibotn/src/qibotn/backends/quimb.py
jaunatisblue 72f95599bb
Some checks failed
Build wheels / build (ubuntu-latest, 3.11) (push) Has been cancelled
Build wheels / build (ubuntu-latest, 3.12) (push) Has been cancelled
Build wheels / build (ubuntu-latest, 3.13) (push) Has been cancelled
Tests / check (push) Has been cancelled
Tests / build (ubuntu-latest, 3.11) (push) Has been cancelled
Tests / build (ubuntu-latest, 3.12) (push) Has been cancelled
Tests / build (ubuntu-latest, 3.13) (push) Has been cancelled
完善mps的vidal机制,多节点并行;补充tn搜索时dask集群搜索的方式
2026-05-12 15:44:19 +08:00

685 lines
22 KiB
Python

from collections import Counter
from typing import Optional
import quimb as qu
import quimb.tensor as qtn
from qibo.config import raise_error
from qibo.gates.abstract import ParametrizedGate
from qibo.models import Circuit
from qibotn.backends.abstract import QibotnBackend
from qibotn.result import TensorNetworkResult
GATE_MAP = {
"h": "H",
"x": "X",
"y": "Y",
"z": "Z",
"s": "S",
"t": "T",
"rx": "RX",
"ry": "RY",
"rz": "RZ",
"u3": "U3",
"cx": "CX",
"cnot": "CNOT",
"cy": "CY",
"cz": "CZ",
"iswap": "ISWAP",
"swap": "SWAP",
"ccx": "CCX",
"ccy": "CCY",
"ccz": "CCZ",
"toffoli": "TOFFOLI",
"cswap": "CSWAP",
"fredkin": "FREDKIN",
"fsim": "fsim",
"measure": "measure",
}
PAULI_DENSE_MAX_QUBITS = 8
def _torch_cpu_array(data, dtype=None):
"""Convert array-like data to a contiguous CPU torch tensor."""
import numpy as np
import torch
if isinstance(data, torch.Tensor):
x = data
else:
array = np.asarray(data)
if any(stride < 0 for stride in array.strides):
array = np.ascontiguousarray(array)
x = torch.from_numpy(array)
if x.device.type != "cpu":
x = x.cpu()
if dtype is not None and x.dtype != dtype:
x = x.to(dtype)
if not x.is_contiguous():
x = x.contiguous()
return x
def _arrays_to_backend(arrays, backend, engine):
if backend == "torch":
import torch
return [_torch_cpu_array(array, dtype=torch.complex128) for array in arrays]
return [engine.asarray(array) for array in arrays]
def _pauli_term_to_dense_operator(factors):
op = None
where = []
for qubit, gate_name in factors:
pauli = qu.pauli(gate_name.lower())
op = pauli if op is None else op & pauli
where.append(qubit)
return op, tuple(where)
def pauli_product_expectation_tn(
quimb_circuit,
factors,
simplify_sequence="ADCRS",
simplify_atol=1e-12,
simplify_equalize_norms=True,
):
"""Build the scalar TN for ``<psi|P|psi>`` without dense Pauli strings."""
import numpy as np
op_by_site = {
int(qubit): qu.pauli(str(gate_name).lower())
for qubit, gate_name in factors
if str(gate_name).upper() != "I"
}
ket = quimb_circuit.get_psi_simplified(
seq=simplify_sequence,
atol=simplify_atol,
equalize_norms=simplify_equalize_norms,
)
bra = ket.conj().reindex(
{
quimb_circuit.ket_site_ind(qubit): quimb_circuit.bra_site_ind(qubit)
for qubit in range(quimb_circuit.N)
}
)
tn = bra | ket
identity = np.eye(2, dtype=complex)
for qubit in range(quimb_circuit.N):
data = op_by_site.get(qubit, identity)
tn |= qtn.Tensor(
data=data,
inds=(
quimb_circuit.bra_site_ind(qubit),
quimb_circuit.ket_site_ind(qubit),
),
)
tn.full_simplify_(
output_inds=(),
seq=simplify_sequence,
atol=simplify_atol,
equalize_norms=simplify_equalize_norms,
)
return tn
def pauli_product_expectation(
quimb_circuit,
factors,
backend,
optimize,
simplify_sequence="ADCRS",
simplify_atol=1e-12,
):
tn = pauli_product_expectation_tn(
quimb_circuit,
factors,
simplify_sequence=simplify_sequence,
simplify_atol=simplify_atol,
)
return tn.contract(all, output_inds=(), optimize=optimize, backend=backend)
def __init__(self, quimb_backend="numpy", contraction_optimizer="auto-hq"):
super(self.__class__, self).__init__()
self.name = "qibotn"
self.platform = "quimb"
self.backend = quimb_backend
self.ansatz = None
self.max_bond_dimension = None
self.svd_cutoff = None
self.n_most_frequent_states = None
self.configure_tn_simulation()
self.setup_backend_specifics(
quimb_backend=quimb_backend, contractions_optimizer=contraction_optimizer
)
def configure_tn_simulation(
self,
ansatz: str = "mps",
max_bond_dimension: Optional[int] = None,
svd_cutoff: Optional[float] = 1e-10,
n_most_frequent_states: int = 100,
):
"""
Configure tensor network simulation.
Args:
ansatz : str, optional
The tensor network ansatz to use. Default is `None` and, in this case, a
generic Circuit Quimb class is used.
max_bond_dimension : int, optional
The maximum bond dimension for the MPS ansatz. Default is 10.
Notes:
- The ansatz determines the tensor network structure used for simulation. Currently, only "MPS" is supported.
- The `max_bond_dimension` parameter controls the maximum allowed bond dimension for the MPS ansatz.
"""
self.ansatz = ansatz
self.max_bond_dimension = max_bond_dimension
self.svd_cutoff = svd_cutoff
self.n_most_frequent_states = n_most_frequent_states
@property
def circuit_ansatz(self):
if self.ansatz == "mps":
return qtn.CircuitMPS
return qtn.Circuit
def setup_backend_specifics(
self, quimb_backend="numpy", contractions_optimizer="auto-hq"
):
"""Setup backend specifics.
Args:
quimb_backend: str
The backend to use for the quimb tensor network simulation.
contractions_optimizer: str, optional
The contractions_optimizer to use for the quimb tensor network simulation.
"""
# this is not really working because it does not change the inheritance
if quimb_backend == "jax":
import jax.numpy as jnp
self.engine = jnp
elif quimb_backend == "numpy":
import numpy as np
self.engine = np
elif quimb_backend == "torch":
import torch
self.engine = torch
else:
raise_error(ValueError, f"Unsupported quimb backend: {quimb_backend}")
self.backend = quimb_backend
self.contractions_optimizer = contractions_optimizer
def execute_circuit(
self,
circuit: Circuit,
initial_state=None,
nshots=None,
return_array=False,
):
"""
Execute a quantum circuit using the specified tensor network ansatz and initial state.
Args:
circuit : QuantumCircuit
The quantum circuit to be executed.
initial_state : array-like, optional
The initial state of the quantum system. Only supported for Matrix Product States (MPS) ansatz.
nshots : int, optional
The number of shots for sampling the circuit. If None, no sampling is performed, and the full statevector is used.
return_array : bool, optional
If True, returns the statevector as a dense array. Default is False.
Returns:
TensorNetworkResult
An object containing the results of the circuit execution, including:
- nqubits: Number of qubits in the circuit.
- backend: The backend used for execution.
- measures: The measurement frequencies if nshots is specified, otherwise None.
- measured_probabilities: A dictionary of computational basis states and their probabilities.
- prob_type: The type of probability computation used (currently "default").
- statevector: The final statevector as a dense array if return_array is True, otherwise None.
Raises:
ValueError
If an initial state is provided but the ansatz is not "MPS".
Notes:
- The ansatz determines the tensor network structure used for simulation. Currently, only "MPS" is supported.
- If `initial_state` is provided, it must be compatible with the MPS ansatz.
- The `nshots` parameter enables sampling from the circuit's output distribution. If not specified, the full statevector is computed.
"""
if initial_state is not None and self.ansatz == "MPS":
initial_state = qtn.tensor_1d.MatrixProductState.from_dense(
initial_state, 2
) # 2 is the physical dimension
elif initial_state is not None:
raise_error(ValueError, "Initial state not None supported only for MPS ansatz.")
circ_quimb = self.circuit_ansatz.from_openqasm2_str(
circuit.to_qasm(), psi0=initial_state, gate_opts={"max_bond": self.max_bond_dimension, "cutoff": self.svd_cutoff}
)
if nshots:
frequencies = Counter(circ_quimb.sample(nshots))
main_frequencies = {
state: count
for state, count in frequencies.most_common(self.n_most_frequent_states)
}
computational_states = list(main_frequencies.keys())
amplitudes = {
state: circ_quimb.amplitude(state) for state in computational_states
}
measured_probabilities = {
state: abs(amplitude) ** 2 for state, amplitude in amplitudes.items()
}
else:
frequencies = None
measured_probabilities = None
'''
if return_array:
if self.ansatz == "mps":
psi = circ_quimb.psi
statevector = psi.to_dense().reshape(-1)
else:
statevector = circ_quimb.to_dense(backend=self.backend, optimize=self.contractions_optimizer)
else:
statevector = None
'''
statevector = (
circ_quimb.to_dense(backend=self.backend, optimize=self.contractions_optimizer)
if return_array
else None
)
return TensorNetworkResult(
nqubits=circuit.nqubits,
backend=self,
measures=frequencies,
measured_probabilities=measured_probabilities,
prob_type="default",
statevector=statevector,
)
def exp_value_observable_symbolic(
self, circuit, operators_list, sites_list, coeffs_list, nqubits
):
"""
Compute the expectation value of a symbolic Hamiltonian on a quantum circuit using tensor network contraction.
This method takes a Qibo circuit, converts it to a Quimb tensor network circuit, and evaluates the expectation value
of a Hamiltonian specified by three lists of strings: operators, sites, and coefficients.
The expectation value is computed by summing the contributions from each term in the Hamiltonian, where each term's
expectation is calculated using Quimb's `local_expectation` function.
Each operator string must act on all different qubits, i.e., for each term, the corresponding sites tuple must contain unique qubit indices.
Example: operators_list = ['xyz', 'xyz'], sites_list = [(1,2,3), (1,2,3)], coeffs_list = [1, 2]
Parameters
----------
circuit : qibo.models.Circuit
The quantum circuit to evaluate, provided as a Qibo circuit object.
operators_list : list of str
List of operator strings representing the symbolic Hamiltonian terms.
sites_list : list of tuple of int
Tuples each specifying the qubits (sites) the corresponding operator acts on.
coeffs_list : list of real/complex
The coefficients for each Hamiltonian term.
Returns
-------
float
The real part of the expectation value of the Hamiltonian on the given circuit state.
"""
# Validate that no term acts multiple times on the same qubit (no repeated indices in a sites tuple)
for sites in sites_list:
if len(sites) != len(set(sites)):
raise_error(
ValueError,
f"Invalid Hamiltonian term sites {sites}: repeated qubit indices are not allowed "
"within a single term (e.g. (0,0,0) is invalid).",
)
quimb_circuit = self._qibo_circuit_to_quimb(
circuit,
quimb_circuit_type=self.circuit_ansatz,
gate_opts={"max_bond": self.max_bond_dimension, "cutoff": self.svd_cutoff},
)
expectation_value = 0.0
for opstr, sites, coeff in zip(operators_list, sites_list, coeffs_list):
ops = self._string_to_quimb_operator(opstr)
coeff = coeff.real
exp_values = quimb_circuit.local_expectation(
ops,
where=sites,
backend=self.backend,
optimize=self.contractions_optimizer,
simplify_sequence="R",
)
expectation_value = expectation_value + coeff * exp_values
return self.real(expectation_value)
def _qibo_circuit_to_quimb(
self, qibo_circ, quimb_circuit_type=qtn.Circuit, **circuit_kwargs
):
"""
Convert a Qibo Circuit to a Quimb Circuit. Measurement gates are ignored. If are given gates not supported by Quimb, an error is raised.
Parameters
----------
qibo_circ : qibo.models.circuit.Circuit
The circuit to convert.
quimb_circuit_type : type
The Quimb circuit class to use (Circuit, CircuitMPS, etc).
circuit_kwargs : dict
Extra arguments to pass to the Quimb circuit constructor.
Returns
-------
circ : quimb.tensor.circuit.Circuit
The converted circuit.
"""
nqubits = qibo_circ.nqubits
circ = quimb_circuit_type(nqubits, **circuit_kwargs)
for gate in qibo_circ.queue:
gate_name = getattr(gate, "name", None)
quimb_gate_name = GATE_MAP.get(gate_name, None)
if quimb_gate_name == "measure":
continue
if gate_name == "cu1":
theta = gate.parameters[0]
c, t = gate.qubits
circ.apply_gate("RZ", theta / 2, c)
circ.apply_gate("RZ", theta / 2, t)
circ.apply_gate("CNOT", c, t)
circ.apply_gate("RZ", -theta / 2, t)
circ.apply_gate("CNOT", c, t)
continue
if quimb_gate_name is None:
if hasattr(gate, "matrix"):
circ.apply_gate_raw(gate.matrix(), getattr(gate, "qubits", ()))
continue
raise_error(ValueError, f"Gate {gate_name} not supported in Quimb backend.")
params = getattr(gate, "parameters", ())
qubits = getattr(gate, "qubits", ())
is_parametrized = isinstance(gate, ParametrizedGate) and getattr(
gate, "trainable", True
)
if is_parametrized:
circ.apply_gate(
quimb_gate_name, *params, *qubits, parametrized=is_parametrized
)
else:
circ.apply_gate(
quimb_gate_name,
*params,
*qubits,
)
return circ
def _string_to_quimb_operator(self, op_str):
"""
Convert a Pauli string (e.g. 'xzy') to a Quimb operator using '&' chaining.
Parameters
----------
op_str : str
A string like 'xzy', where each character is one of 'x', 'y', 'z', 'i'.
Returns
-------
qu_op : quimb.Qarray
The corresponding Quimb operator.
"""
op_str = op_str.lower()
op = qu.pauli(op_str[0])
for c in op_str[1:]:
op = op & qu.pauli(c)
return op
def expectation(self, circuit, observable, parallel=None, parallel_opts=None):
"""
Compute expectation value with optional parallel acceleration.
Parameters
----------
circuit : qibo.models.Circuit
The quantum circuit.
observable : qibo.hamiltonians.SymbolicHamiltonian or form
The observable to measure.
parallel : str, optional
Parallelization method: 'mpi', 'processpool', or None (default).
parallel_opts : dict, optional
Options for parallel execution:
- max_repeats: int (default 1024)
- max_time: int (default 300)
- search_workers: int (default 48, processpool only)
- mpi_contract: bool (default False, use MPI for contraction)
Returns
-------
float
The expectation value.
"""
from qibotn.observables import check_observable, extract_gates_and_qubits
if parallel_opts is None:
parallel_opts = {}
observable = check_observable(observable, circuit.nqubits)
if parallel is None:
# Use original implementation
from qibotn.observables import extract_gates_and_qubits
all_terms = extract_gates_and_qubits(observable)
qc = self._qibo_circuit_to_quimb(
circuit,
quimb_circuit_type=self.circuit_ansatz,
gate_opts={"max_bond": self.max_bond_dimension, "cutoff": self.svd_cutoff},
)
exp_val = 0.0
for coeff, factors in all_terms:
if len(factors) > PAULI_DENSE_MAX_QUBITS:
val = pauli_product_expectation(
qc,
factors,
backend=self.backend,
optimize=self.contractions_optimizer,
simplify_sequence="ADCRS",
simplify_atol=1e-12,
)
else:
op, where = _pauli_term_to_dense_operator(factors)
val = qc.local_expectation(
op, where,
backend=self.backend,
optimize=self.contractions_optimizer,
simplify_sequence="ADCRS",
simplify_atol=1e-12,
)
exp_val += coeff * val
return self.real(exp_val)
else:
# Use parallel implementation
return self._expectation_parallel(circuit, observable, parallel, parallel_opts)
def _expectation_parallel(self, circuit, observable, method, opts):
"""Parallel expectation value computation."""
from qibotn.observables import extract_gates_and_qubits
from qibotn.parallel import parallel_path_search, parallel_contract
import torch
try:
from mpi4py import MPI
comm = MPI.COMM_WORLD if method == 'mpi' else None
rank = comm.Get_rank() if comm else 0
size = comm.Get_size() if comm else 1
except ImportError:
comm, rank, size = None, 0, 1
max_repeats = opts.get('max_repeats', 1024)
max_time = opts.get('max_time', 300)
search_workers = opts.get('search_workers', 48)
mpi_contract = opts.get('mpi_contract', False)
torch_threads = opts.get('torch_threads', None)
slicing_opts = opts.get('slicing_opts', None)
trial_timeout = opts.get('trial_timeout', None)
qc = self._qibo_circuit_to_quimb(
circuit,
quimb_circuit_type=self.circuit_ansatz,
gate_opts={"max_bond": self.max_bond_dimension, "cutoff": self.svd_cutoff},
)
all_terms = extract_gates_and_qubits(observable)
my_terms = all_terms[rank::size]
if method == 'mpi' and comm:
torch.set_num_threads(max(1, 96 // size))
elif torch_threads:
torch.set_num_threads(torch_threads)
my_exp = 0.0
for coeff, factors in my_terms:
if len(factors) > PAULI_DENSE_MAX_QUBITS:
tn = pauli_product_expectation_tn(qc, factors)
else:
op, where = _pauli_term_to_dense_operator(factors)
tn = qc.local_expectation(op, where, rehearse='tn')
tree = parallel_path_search(
tn, tn.outer_inds(),
method=method,
total_repeats=max_repeats,
max_time=max_time,
n_workers=search_workers,
slicing_opts=slicing_opts,
trial_timeout=trial_timeout,
)
if tree is None:
continue
if mpi_contract and comm and size > 1:
arrays = _arrays_to_backend(tn.arrays, self.backend, self.engine)
val = parallel_contract(tree, arrays, method='mpi', comm=comm)
else:
if self.backend == "torch":
for tensor in tn.tensors:
tensor._data = _torch_cpu_array(
tensor._data, dtype=torch.complex128
)
val = complex(
tn.contract(
all,
output_inds=(),
optimize=tree,
backend="torch",
)
)
else:
val = complex(
tn.contract(
all,
output_inds=(),
optimize=tree,
backend=self.backend,
)
)
my_exp += coeff * complex(val)
if comm:
all_exp = comm.gather(my_exp, root=0)
if rank == 0:
total_exp = sum(all_exp)
return self.real(total_exp)
return 0.0
return self.real(my_exp)
CLASSES_ROOTS = {"numpy": "Numpy", "torch": "PyTorch", "jax": "Jax"}
METHODS = {
"__init__": __init__,
"configure_tn_simulation": configure_tn_simulation,
"setup_backend_specifics": setup_backend_specifics,
"execute_circuit": execute_circuit,
"exp_value_observable_symbolic": exp_value_observable_symbolic,
"_qibo_circuit_to_quimb": _qibo_circuit_to_quimb,
"_string_to_quimb_operator": _string_to_quimb_operator,
"expectation": expectation,
"_expectation_parallel": _expectation_parallel,
"circuit_ansatz": circuit_ansatz,
}
def _generate_backend(quimb_backend: str = "numpy"):
bases = (QibotnBackend,)
if quimb_backend == "numpy":
from qibo.backends import NumpyBackend
bases += (NumpyBackend,)
elif quimb_backend == "torch":
from qiboml.backends import PyTorchBackend
bases += (PyTorchBackend,)
elif quimb_backend == "jax":
from qiboml.backends import JaxBackend
bases += (JaxBackend,)
else:
raise_error(ValueError, f"Unsupported quimb backend: {quimb_backend}")
return type(f"Quimb{CLASSES_ROOTS[quimb_backend]}Backend", bases, METHODS)
BACKENDS = {}
for k, v in CLASSES_ROOTS.items():
backend_name = f"Quimb{v}Backend"
try:
backend = _generate_backend(k)
BACKENDS[k] = backend
globals()[backend_name] = backend
except ImportError:
continue
def __getattr__(name):
try:
return BACKENDS[name]
except KeyError:
raise AttributeError(f"module {__name__!r} has no attribute {name!r}") from None