Files
qibotn/baseline_mps_expectation.py
jaunatisblue aa122964b4
Some checks failed
Build wheels / build (ubuntu-latest, 3.11) (push) Has been cancelled
Build wheels / build (ubuntu-latest, 3.12) (push) Has been cancelled
Build wheels / build (ubuntu-latest, 3.13) (push) Has been cancelled
Tests / check (push) Has been cancelled
Tests / build (ubuntu-latest, 3.11) (push) Has been cancelled
Tests / build (ubuntu-latest, 3.12) (push) Has been cancelled
Tests / build (ubuntu-latest, 3.13) (push) Has been cancelled
numpy换为torch;修复torch后端计算方式不支持的问题,加速svd;发现并行化err,尝试修复;当前加速比11x,但是最新的并行方式不稳定,可能有精度问题
2026-05-10 22:33:46 +08:00

165 lines
5.8 KiB
Python

"""Baseline MPS expectation scan with the qmatchatea backend."""
import argparse
import json
import logging
import math
import time
import numpy as np
from qibo import Circuit, gates, hamiltonians
from qibo.symbols import X, Z
from qibotn.backends.qmatchatea import QMatchaTeaBackend
from qibotn.backends.vidal_tebd import run_vidal_ring_xz
def build_circuit(nqubits, nlayers, seed):
import numpy as np
rng = np.random.default_rng(seed)
circuit = Circuit(nqubits)
for _ in range(nlayers):
for qubit in range(nqubits):
circuit.add(gates.RY(qubit, theta=rng.uniform(-math.pi, math.pi)))
circuit.add(gates.RZ(qubit, theta=rng.uniform(-math.pi, math.pi)))
for qubit in range(0, nqubits - 1, 2):
circuit.add(gates.CNOT(qubit, qubit + 1))
for qubit in range(1, nqubits - 1, 2):
circuit.add(gates.CNOT(qubit, qubit + 1))
return circuit
def build_observable(nqubits):
form = 0
for qubit in range(nqubits):
form += 0.5 * X(qubit) * Z((qubit + 1) % nqubits)
return hamiltonians.SymbolicHamiltonian(form=form)
def exact_expectation(circuit, nqubits):
import numpy as np
state = circuit().state(numpy=True).reshape(-1)
value = 0.0
chunk_size = 1 << 20
for qubit in range(nqubits):
next_qubit = (qubit + 1) % nqubits
x_flip = 1 << (nqubits - 1 - qubit)
z_shift = nqubits - 1 - next_qubit
term = 0.0
for start in range(0, state.size, chunk_size):
stop = min(start + chunk_size, state.size)
indices = np.arange(start, stop, dtype=np.int64)
z_bit = (indices >> z_shift) & 1
z_phase = 1 - 2 * z_bit
term += np.vdot(state[indices ^ x_flip], z_phase * state[start:stop]).real
value += 0.5 * term
return float(value)
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--nqubits", type=int, default=40)
parser.add_argument("--nlayers", type=int, default=30)
parser.add_argument("--bond", "--bonds", dest="bond", type=int, default=512)
parser.add_argument("--seed", type=int, default=42)
parser.add_argument("--tensor-module", choices=("numpy", "torch"), default="torch")
parser.add_argument("--torch-threads", type=int, default=32)
parser.add_argument(
"--executor", choices=("qmatchatea", "vidal"), default="qmatchatea"
)
parser.add_argument("--vidal-workers", type=int, default=1)
parser.add_argument("--vidal-batched", action="store_true")
parser.add_argument("--mpi-ct", action="store_true")
parser.add_argument("--mpi-barriers", type=int, default=-1)
parser.add_argument("--mpi-isometrization", type=int, default=-1)
parser.add_argument("--exact", action="store_true")
parser.add_argument("--exact-max-qubits", type=int, default=24)
parser.add_argument("--reference-file")
args = parser.parse_args()
logging.getLogger("qibo.config").setLevel(logging.ERROR)
logging.getLogger("qtealeaves").setLevel(logging.ERROR)
import torch
torch.set_num_threads(args.torch_threads)
rank = 0
size = 1
if args.mpi_ct:
from mpi4py import MPI
rank = MPI.COMM_WORLD.Get_rank()
size = MPI.COMM_WORLD.Get_size()
circuit = build_circuit(args.nqubits, args.nlayers, args.seed)
observable = build_observable(args.nqubits)
exact = None
if args.reference_file:
with open(args.reference_file, "r", encoding="utf-8") as f:
exact = float(json.load(f)["expectation"])
elif args.exact:
if args.nqubits > args.exact_max_qubits:
raise ValueError(
f"--exact is limited to {args.exact_max_qubits} qubits by default."
)
exact = exact_expectation(circuit, args.nqubits)
if rank == 0:
mpi_label = f"MPIMPS/{size}" if args.mpi_ct else "SR"
print(
f"nqubits={args.nqubits} nlayers={args.nlayers} "
f"bond={args.bond} seed={args.seed} "
f"tensor_module={args.tensor_module} svd_control=E! "
f"compile_circuit=True mpi={mpi_label} executor={args.executor} "
f"vidal_workers={args.vidal_workers} vidal_batched={args.vidal_batched}"
)
if exact is not None:
print(f"exact={exact:.16e}")
print("expval abs_error rel_error seconds")
start = time.perf_counter()
if args.executor == "vidal":
if args.mpi_ct:
raise ValueError("--executor vidal is a single-process executor.")
value = run_vidal_ring_xz(
circuit,
max_bond=args.bond,
cut_ratio=1e-12,
tensor_module=args.tensor_module,
workers=args.vidal_workers,
use_batched=args.vidal_batched,
)
else:
backend = QMatchaTeaBackend()
backend.configure_tn_simulation(
ansatz="MPS",
max_bond_dimension=args.bond,
cut_ratio=1e-12,
svd_control="E!",
tensor_module=args.tensor_module,
compile_circuit=True,
track_memory=False,
mpi_approach="CT" if args.mpi_ct else "SR",
mpi_num_procs=size,
mpi_where_barriers=args.mpi_barriers if args.mpi_ct else -1,
mpi_isometrization=args.mpi_isometrization,
)
value = backend.expectation(
circuit,
observable,
preprocess=False,
compile_circuit=True,
)
if rank != 0:
return
value = float(np.real(value))
elapsed = time.perf_counter() - start
abs_error = float("nan") if exact is None else abs(value - exact)
rel_error = float("nan") if exact is None else abs_error / max(abs(exact), 1e-15)
print(f"{value:.16e} {abs_error:.6e} {rel_error:.6e} {elapsed:.3f}")
if __name__ == "__main__":
main()