"""Benchmark and profile the qibotn/quimb CPU backend. This script is intended to be the stable baseline for quimb backend optimization work. It supports: - multiple circuit families - MPS or generic TN execution - statevector, sampling, conversion, and local expectation workloads - warmup/repeat timing - optional correctness checks against qibojit - optional cProfile output """ from __future__ import annotations import argparse import cProfile import json import math import os import pstats import time from pathlib import Path from statistics import mean, pstdev import numpy as np import qibo from qibo import Circuit, gates def configure_runtime_env(quimb_num_procs: int | None, blas_threads: int | None): """Pin process-level thread settings before heavy work starts.""" if quimb_num_procs is not None: os.environ["QUIMB_NUM_PROCS"] = str(quimb_num_procs) if blas_threads is not None: value = str(blas_threads) os.environ["OMP_NUM_THREADS"] = value os.environ["OPENBLAS_NUM_THREADS"] = value os.environ["MKL_NUM_THREADS"] = value os.environ["NUMEXPR_NUM_THREADS"] = value def make_circuit( circuit_type: str, nqubits: int, nlayers: int, seed: int, add_measurements: bool = False, ) -> Circuit: """Construct repeatable workloads covering low/high entanglement cases.""" rng = np.random.default_rng(seed) circuit = Circuit(nqubits) if circuit_type == "qft": from qibo.models import QFT circuit = QFT(nqubits) elif circuit_type == "variational": for layer in range(nlayers): for qubit in range(nqubits): circuit.add(gates.RY(qubit, theta=rng.uniform(0.0, 2.0 * np.pi))) offset = layer % 2 for qubit in range(offset, nqubits - 1, 2): circuit.add(gates.CZ(qubit, qubit + 1)) elif circuit_type == "ghz": circuit.add(gates.H(0)) for qubit in range(nqubits - 1): circuit.add(gates.CNOT(qubit, qubit + 1)) elif circuit_type == "qaoa": for _ in range(nlayers): for qubit in range(nqubits): circuit.add(gates.RZ(qubit, theta=rng.uniform(0.0, 2.0 * np.pi))) for qubit in range(0, nqubits - 1, 2): circuit.add(gates.CZ(qubit, qubit + 1)) for qubit in range(nqubits): circuit.add(gates.RX(qubit, theta=rng.uniform(0.0, 2.0 * np.pi))) elif circuit_type == "ising1d": for _ in range(nlayers): for qubit in range(nqubits): circuit.add(gates.RX(qubit, theta=rng.uniform(0.0, 2.0 * np.pi))) for qubit in range(0, nqubits - 1, 2): circuit.add(gates.CZ(qubit, qubit + 1)) for qubit in range(1, nqubits - 1, 2): circuit.add(gates.CZ(qubit, qubit + 1)) elif circuit_type == "rcs": cols = math.ceil(math.sqrt(nqubits)) rows = math.ceil(nqubits / cols) single_qubit_gates = [gates.T, gates.X, gates.Y] for layer in range(nlayers): for qubit in range(nqubits): gate_cls = single_qubit_gates[rng.integers(0, len(single_qubit_gates))] circuit.add(gate_cls(qubit)) if layer % 2 == 0: for row in range(rows): for col in range(0, cols - 1, 2): q1, q2 = row * cols + col, row * cols + col + 1 if q2 < nqubits: circuit.add(gates.CZ(q1, q2)) else: for row in range(0, rows - 1, 2): for col in range(cols): q1, q2 = row * cols + col, (row + 1) * cols + col if q2 < nqubits: circuit.add(gates.CZ(q1, q2)) else: raise ValueError(f"Unknown circuit type: {circuit_type}") if add_measurements: circuit.add(gates.M(*range(nqubits))) return circuit def prepare_quimb_backend( ansatz: str, max_bond: int | None, svd_cutoff: float, optimizer: str, n_most_frequent_states: int, ): """Create and configure the qibotn/quimb backend once.""" qibo.set_backend("qibotn", platform="quimb") backend = qibo.get_backend() backend.configure_tn_simulation( ansatz=ansatz, max_bond_dimension=max_bond, svd_cutoff=svd_cutoff, n_most_frequent_states=n_most_frequent_states, ) backend.contractions_optimizer = optimizer return backend def run_qibojit_state(circuit: Circuit): qibo.set_backend("qibojit", platform="numba") t0 = time.perf_counter() result = circuit() elapsed = time.perf_counter() - t0 state = np.asarray(result.state(), dtype=complex).reshape(-1) return state, elapsed def run_qibojit_shots(circuit: Circuit, nshots: int): qibo.set_backend("qibojit", platform="numba") t0 = time.perf_counter() result = circuit(nshots=nshots) elapsed = time.perf_counter() - t0 return dict(result.frequencies()), elapsed def z_expectation_from_statevector(statevector: np.ndarray, nqubits: int, qubit: int): probs = np.abs(np.asarray(statevector).reshape(-1)) ** 2 bit_index = nqubits - qubit - 1 bits = (np.arange(len(probs)) >> bit_index) & 1 return float(np.dot(probs, 1.0 - 2.0 * bits)) def fidelity_and_l2(reference: np.ndarray, candidate: np.ndarray): ref = np.asarray(reference, dtype=complex).reshape(-1) cand = np.asarray(candidate, dtype=complex).reshape(-1) fidelity = abs(np.vdot(ref, cand)) ** 2 l2_error = np.linalg.norm(ref - cand) return float(fidelity), float(l2_error) def total_variation_distance(reference: dict[str, int], candidate: dict[str, int], nshots: int): keys = set(reference) | set(candidate) return 0.5 * sum(abs(reference.get(key, 0) - candidate.get(key, 0)) for key in keys) / nshots def profile_callable(func, output_path: Path, sort_by: str): """Profile a single invocation and dump textual stats.""" profiler = cProfile.Profile() profiler.enable() result = func() profiler.disable() output_path.parent.mkdir(parents=True, exist_ok=True) with output_path.open("w", encoding="utf-8") as stream: stats = pstats.Stats(profiler, stream=stream) stats.strip_dirs().sort_stats(sort_by).print_stats(80) stats.print_callers(30) return result def time_callable(func, repeats: int, warmup: int, profile_output: Path | None, profile_sort: str): for _ in range(warmup): func() profiled_payload = None if profile_output is not None: profiled_payload = profile_callable(func, profile_output, profile_sort) samples = [] payloads = [] for _ in range(repeats): t0 = time.perf_counter() payload = func() elapsed = time.perf_counter() - t0 samples.append(elapsed) payloads.append(payload) final_payload = payloads[-1] if payloads else profiled_payload return samples, final_payload def summarize_samples(samples: list[float]): return { "min_s": min(samples), "mean_s": mean(samples), "max_s": max(samples), "std_s": pstdev(samples) if len(samples) > 1 else 0.0, "repeats": len(samples), } def workload_state(args): circuit = make_circuit(args.circuit, args.nqubits, args.nlayers, args.seed) backend = prepare_quimb_backend( ansatz=args.ansatz, max_bond=args.max_bond, svd_cutoff=args.svd_cutoff, optimizer=args.optimizer, n_most_frequent_states=args.topk, ) def run_once(): result = backend.execute_circuit(circuit, return_array=True) return np.asarray(result.statevector).reshape(-1) samples, statevector = time_callable( run_once, args.repeats, args.warmup, args.profile_output, args.profile_sort ) summary = summarize_samples(samples) correctness = None if not args.no_compare: ref_state, ref_time = run_qibojit_state(circuit) fidelity, l2_error = fidelity_and_l2(ref_state, statevector) correctness = { "qibojit_time_s": ref_time, "fidelity": fidelity, "l2_error": l2_error, } return summary, correctness def workload_shots(args): circuit = make_circuit( args.circuit, args.nqubits, args.nlayers, args.seed, add_measurements=True ) backend = prepare_quimb_backend( ansatz=args.ansatz, max_bond=args.max_bond, svd_cutoff=args.svd_cutoff, optimizer=args.optimizer, n_most_frequent_states=args.topk, ) def run_once(): result = backend.execute_circuit(circuit, nshots=args.nshots) return dict(result.frequencies()) samples, frequencies = time_callable( run_once, args.repeats, args.warmup, args.profile_output, args.profile_sort ) summary = summarize_samples(samples) correctness = None if not args.no_compare: ref_freq, ref_time = run_qibojit_shots(circuit, args.nshots) correctness = { "qibojit_time_s": ref_time, "tvd": total_variation_distance(ref_freq, frequencies, args.nshots), } return summary, correctness def workload_convert(args): circuit = make_circuit(args.circuit, args.nqubits, args.nlayers, args.seed) backend = prepare_quimb_backend( ansatz=args.ansatz, max_bond=args.max_bond, svd_cutoff=args.svd_cutoff, optimizer=args.optimizer, n_most_frequent_states=args.topk, ) def run_once(): quimb_circuit = backend._qibo_circuit_to_quimb( # pylint: disable=protected-access circuit, quimb_circuit_type=backend.circuit_ansatz, gate_opts={"max_bond": backend.max_bond_dimension, "cutoff": backend.svd_cutoff}, ) return len(quimb_circuit.gates) samples, gate_count = time_callable( run_once, args.repeats, args.warmup, args.profile_output, args.profile_sort ) summary = summarize_samples(samples) summary["gate_count"] = gate_count return summary, None def workload_expectation(args): circuit = make_circuit(args.circuit, args.nqubits, args.nlayers, args.seed) backend = prepare_quimb_backend( ansatz=args.ansatz, max_bond=args.max_bond, svd_cutoff=args.svd_cutoff, optimizer=args.optimizer, n_most_frequent_states=args.topk, ) operators = ["z"] sites = [(args.observable_qubit,)] coeffs = [1.0] def run_once(): return float( backend.exp_value_observable_symbolic( circuit, operators, sites, coeffs, args.nqubits ) ) samples, expval = time_callable( run_once, args.repeats, args.warmup, args.profile_output, args.profile_sort ) summary = summarize_samples(samples) correctness = None if not args.no_compare: ref_state, ref_time = run_qibojit_state(circuit) correctness = { "qibojit_time_s": ref_time, "reference_expval": z_expectation_from_statevector( ref_state, args.nqubits, args.observable_qubit ), "abs_error": abs( z_expectation_from_statevector(ref_state, args.nqubits, args.observable_qubit) - expval ), } return summary, correctness def workload_raw_local_exp(args): circuit = make_circuit(args.circuit, args.nqubits, args.nlayers, args.seed) backend = prepare_quimb_backend( ansatz=args.ansatz, max_bond=args.max_bond, svd_cutoff=args.svd_cutoff, optimizer=args.optimizer, n_most_frequent_states=args.topk, ) def run_once(): metrics = {} t0 = time.perf_counter() quimb_circuit = backend._qibo_circuit_to_quimb( # pylint: disable=protected-access circuit, quimb_circuit_type=backend.circuit_ansatz, gate_opts={"max_bond": backend.max_bond_dimension, "cutoff": backend.svd_cutoff}, ) metrics["convert_s"] = time.perf_counter() - t0 operator = backend._string_to_quimb_operator("z") # pylint: disable=protected-access if args.rehearse: t1 = time.perf_counter() rehearsal = quimb_circuit.local_expectation( operator, where=(args.observable_qubit,), backend=backend.backend, optimize=backend.contractions_optimizer, simplify_sequence="R", rehearse=True, ) metrics["rehearse_s"] = time.perf_counter() - t1 optimize = rehearsal["tree"] else: metrics["rehearse_s"] = 0.0 optimize = backend.contractions_optimizer t2 = time.perf_counter() expval = quimb_circuit.local_expectation( operator, where=(args.observable_qubit,), backend=backend.backend, optimize=optimize, simplify_sequence="R", ) metrics["contract_s"] = time.perf_counter() - t2 metrics["total_inner_s"] = ( metrics["convert_s"] + metrics["rehearse_s"] + metrics["contract_s"] ) metrics["expval"] = float(np.real(expval)) return metrics samples, metrics = time_callable( run_once, args.repeats, args.warmup, args.profile_output, args.profile_sort ) summary = summarize_samples(samples) summary.update( { "convert_s": metrics["convert_s"], "rehearse_s": metrics["rehearse_s"], "contract_s": metrics["contract_s"], "total_inner_s": metrics["total_inner_s"], } ) correctness = None if not args.no_compare: ref_state, ref_time = run_qibojit_state(circuit) ref_expval = z_expectation_from_statevector( ref_state, args.nqubits, args.observable_qubit ) correctness = { "qibojit_time_s": ref_time, "reference_expval": ref_expval, "abs_error": abs(ref_expval - metrics["expval"]), } return summary, correctness WORKLOADS = { "state": workload_state, "shots": workload_shots, "convert": workload_convert, "expectation": workload_expectation, "raw-local-exp": workload_raw_local_exp, } def build_parser(): parser = argparse.ArgumentParser(description=__doc__) parser.add_argument( "--mode", choices=sorted(WORKLOADS), default="raw-local-exp", help="Workload to benchmark.", ) parser.add_argument( "--circuit", choices=["ghz", "ising1d", "qaoa", "qft", "rcs", "variational"], default="variational", ) parser.add_argument("--nqubits", type=int, default=10) parser.add_argument("--nlayers", type=int, default=3) parser.add_argument("--ansatz", choices=["mps", "tn"], default="tn") parser.add_argument("--max-bond", type=int, default=None) parser.add_argument("--svd-cutoff", type=float, default=1e-10) parser.add_argument("--optimizer", type=str, default="auto-hq") parser.add_argument("--observable-qubit", type=int, default=0) parser.add_argument("--nshots", type=int, default=1024) parser.add_argument("--topk", type=int, default=100) parser.add_argument("--warmup", type=int, default=1) parser.add_argument("--repeats", type=int, default=3) parser.add_argument("--seed", type=int, default=42) parser.add_argument("--quimb-num-procs", type=int, default=None) parser.add_argument("--blas-threads", type=int, default=None) parser.add_argument("--rehearse", action="store_true") parser.add_argument("--no-compare", action="store_true") parser.add_argument("--profile-output", type=Path, default=None) parser.add_argument("--profile-sort", type=str, default="cumulative") parser.add_argument("--json-output", type=Path, default=None) return parser def main(): parser = build_parser() args = parser.parse_args() configure_runtime_env(args.quimb_num_procs, args.blas_threads) print( f"mode={args.mode} circuit={args.circuit} nqubits={args.nqubits} " f"nlayers={args.nlayers} ansatz={args.ansatz} optimizer={args.optimizer}" ) if args.quimb_num_procs is not None or args.blas_threads is not None: print( "threads:" f" QUIMB_NUM_PROCS={os.environ.get('QUIMB_NUM_PROCS')}" f" OMP_NUM_THREADS={os.environ.get('OMP_NUM_THREADS')}" ) workload = WORKLOADS[args.mode] summary, correctness = workload(args) print("\nTiming") for key, value in summary.items(): if isinstance(value, float): print(f"{key:>16}: {value:.6f}") else: print(f"{key:>16}: {value}") if correctness is not None: print("\nCorrectness") for key, value in correctness.items(): if isinstance(value, float): print(f"{key:>16}: {value:.6e}") else: print(f"{key:>16}: {value}") if args.profile_output is not None: print(f"\nProfile written to: {args.profile_output}") if args.json_output is not None: payload = {"timing": summary, "correctness": correctness, "args": vars(args)} args.json_output.parent.mkdir(parents=True, exist_ok=True) args.json_output.write_text(json.dumps(payload, indent=2, default=str), encoding="utf-8") print(f"JSON written to: {args.json_output}") if __name__ == "__main__": main()