"""Benchmark scripts.""" import time from benchmarks.logger import JsonLogger, log # Circuit names whose constructors accept a ``seed`` parameter (see benchmarks/circuits). _QIBOTN_CIRCUITS_WITH_SEED_PARAM = frozenset( { "variational", "variational-circuit", "qaoa", "supremacy", "basis-change", "bc", "quantum-volume", "qv", } ) def _normalize_nqubits_range(nqubits): """Return (min, max) inclusive; accept a single int or a length-2 sequence.""" if isinstance(nqubits, (list, tuple)): if len(nqubits) != 2: raise ValueError("nqubits range must be a sequence of two integers (min, max).") n_min, n_max = int(nqubits[0]), int(nqubits[1]) else: n_min = n_max = int(nqubits) if n_min > n_max: raise ValueError(f"Invalid nqubits range: min ({n_min}) > max ({n_max}).") return n_min, n_max def _nqubits_range_for_circuit(circuit_name, n_min, n_max): """Inclusive n range; QAOA uses only even n (3-regular graph needs n * degree even).""" for n in range(n_min, n_max + 1): if circuit_name.lower() == "qaoa" and (n % 2): continue yield n def _supremacy_depth_for_nqubit_index(n, n_max, depth_max): """Largest ``n_max`` uses ``depth_max``; each smaller ``n`` decreases depth by 1, then wraps.""" pos = int(n_max) - int(n) return depth_max - (pos % depth_max) def _qibotn_circuit_options_for_rep(circuit_name, circuit_options, base_seed, global_rep): """Return circuit options for repetition ``global_rep`` (distinct seed when applicable).""" from benchmarks import circuits kwargs = circuits.parse(circuit_options) if circuit_options else {} name = circuit_name.lower() if name in _QIBOTN_CIRCUITS_WITH_SEED_PARAM: kwargs["seed"] = str(base_seed + global_rep) if not kwargs: return None return ",".join(f"{k}={v}" for k, v in kwargs.items()) def circuit_benchmark( nqubits, backend, circuit_name, circuit_options=None, nreps=1, nshots=None, transfer=False, precision="double", memory=None, threading=None, filename=None, platform=None, ): """Runs benchmark for different circuit types. See ``benchmarks/main.py`` for documentation of each argument. """ if backend == "qibojit" and threading is not None: from benchmarks.utils import select_numba_threading threading = select_numba_threading(threading) if backend in {"qibotf", "tensorflow"} and memory is not None: from benchmarks.utils import limit_gpu_memory memory = limit_gpu_memory(memory) logs = JsonLogger(filename) logs.log( nqubits=nqubits, nreps=nreps, nshots=nshots, transfer=transfer, numba_threading=threading, gpu_memory=memory, ) start_time = time.time() import qibo logs.log(import_time=time.time() - start_time) qibo.set_backend(backend=backend, platform=platform) qibo.set_precision(precision) logs.log( backend=qibo.get_backend(), platform=qibo.K.get_platform(), precision=qibo.get_precision(), device=qibo.get_device(), version=qibo.__version__, ) from benchmarks import circuits gates = circuits.get(circuit_name, nqubits, circuit_options, qibo=True) logs.log(circuit=circuit_name, circuit_options=str(gates)) start_time = time.time() circuit = qibo.models.Circuit(nqubits) circuit.add(gates) if nshots is not None: # add measurement gates circuit.add(qibo.gates.M(*range(nqubits))) logs.log(creation_time=time.time() - start_time) start_time = time.time() result = circuit(nshots=nshots) logs.log(dry_run_time=time.time() - start_time) start_time = time.time() if transfer: result = result.numpy() logs.log(dry_run_transfer_time=time.time() - start_time) dtype = str(result.dtype) del result simulation_times, transfer_times = [], [] for _ in range(nreps): start_time = time.time() result = circuit(nshots=nshots) simulation_times.append(time.time() - start_time) start_time = time.time() if transfer: result = result.numpy() transfer_times.append(time.time() - start_time) del result logs.log( dtype=dtype, simulation_times=simulation_times, transfer_times=transfer_times ) logs.average("simulation_times") logs.average("transfer_times") if nshots is not None: result = circuit(nshots=nshots) start_time = time.time() freqs = result.frequencies() logs.log(measurement_time=time.time() - start_time) del result else: logs.log(measurement_time=0) logs.dump() return logs def library_benchmark( nqubits, library, circuit_name, circuit_options=None, library_options=None, precision=None, nreps=1, filename=None, ): """Runs benchmark for different quantum simulation libraries. See ``benchmarks/compare.py`` for documentation of each argument. """ logs = JsonLogger(filename) logs.log(nqubits=nqubits, nreps=nreps) start_time = time.time() from benchmarks import libraries backend = libraries.get(library, library_options) logs.log(import_time=time.time() - start_time) logs.log(library_options=library_options) if precision is not None: backend.set_precision(precision) logs.log( library=backend.name, precision=backend.get_precision(), device=backend.get_device(), version=backend.__version__, ) from benchmarks import circuits gates = circuits.get(circuit_name, nqubits, circuit_options) logs.log(circuit=circuit_name, circuit_options=str(gates)) start_time = time.time() circuit = backend.from_qasm(gates.to_qasm()) logs.log(creation_time=time.time() - start_time) start_time = time.time() result = backend(circuit) logs.log(dry_run_time=time.time() - start_time) dtype = str(result.dtype) del result simulation_times = [] for _ in range(nreps): start_time = time.time() result = backend(circuit) simulation_times.append(time.time() - start_time) del result logs.log(dtype=dtype, simulation_times=simulation_times) logs.average("simulation_times") logs.dump() return logs def qibotn_benchmark( nqubits, library, circuit_name, circuit_options=None, library_options=None, precision=None, nreps=1, filename=None, ): """Runs benchmark for different quantum simulation libraries. See ``benchmarks/compare.py`` for documentation of each argument. """ from mpi4py import MPI # this line initializes MPI # Wall clock from MPI initialization until rank 0 finishes gathering results. t_mpi_wall_start = time.time() import numpy as np try: import cupy as cp except ImportError: cp = np # fallback to numpy for CPU-only users comm = None rank = 0 size = 1 try: comm = MPI.COMM_WORLD rank = comm.Get_rank() size = comm.Get_size() except MPI.Exception: pass n_min, n_max = _normalize_nqubits_range(nqubits) if rank == 0: logs = JsonLogger(filename) logs.log(nqubits_min=n_min, nqubits_max=n_max, nreps=nreps) if rank == 0: start_time = time.time() from benchmarks import libraries backend = libraries.get(library, library_options) if rank == 0: logs.log(import_time=time.time() - start_time) logs.log(library_options=library_options) if precision is not None: backend.set_precision(precision) backend.expectation_flag if rank == 0: logs.log( library=backend.name, precision=backend.get_precision(), device=backend.get_device(), version=backend.__version__, ) from benchmarks.libraries.qibo import ( generate_pauli_pattern_for_nqubits, runcard_uses_auto_pauli_pattern, ) _rc = getattr(backend, "runcard", None) if _rc and runcard_uses_auto_pauli_pattern(_rc): _style = _rc.get("pauli_pattern_style") or "mixed" log.info( "Automatic pauli_pattern (style=%s) for nqubits %s..%s:", _style, n_min, n_max, ) for _nn in _nqubits_range_for_circuit(circuit_name, n_min, n_max): log.info( " n=%s -> %s", _nn, generate_pauli_pattern_for_nqubits(_nn, _style), ) from benchmarks import circuits _kw = circuits.parse(circuit_options) if circuit_options else {} base_seed = int(_kw.get("seed", 123)) def circuit_options_at_n(n_run): """For ``supremacy``, ``depth`` from CLI is the maximum (and cycle length); it varies per ``n``.""" if circuit_name.lower() != "supremacy": return circuit_options depth_max = int(_kw.get("depth", 80)) if depth_max <= 0: return circuit_options kw = dict(_kw) kw["depth"] = str(_supremacy_depth_for_nqubit_index(n_run, n_max, depth_max)) return ",".join(f"{k}={v}" for k, v in kw.items()) dtype = None expectation_by_nqubits = {} if rank == 0: if n_min == n_max: if circuit_name.lower() == "qaoa" and (n_min % 2): logs.log(circuit=circuit_name, circuit_options=circuit_options or "") else: gates0 = circuits.get(circuit_name, n_min, circuit_options_at_n(n_min)) logs.log(circuit=circuit_name, circuit_options=str(gates0)) else: logs.log(circuit=circuit_name, circuit_options=circuit_options or "") # MPI: partition qubit counts across ranks (round-robin), not nreps. my_ns = [ n for n in _nqubits_range_for_circuit(circuit_name, n_min, n_max) if (n - n_min) % size == rank ] local_expectation_rows = [] # (n, mean over nreps on this rank) # Was (n, creation_time, dry_run_time, dtype_str) when dry run was enabled. local_meta_rows = [] # (n, creation_time, dtype_str) for n in my_ns: gates = circuits.get(circuit_name, n, circuit_options_at_n(n)) start_time = time.time() circuit = backend.from_qasm(gates.to_qasm()) creation_time = time.time() - start_time # Optional dry run (warm-up); disabled — uncomment to enable. # start_time = time.time() # result = backend(circuit) # dry_run_time = time.time() - start_time # if hasattr(result, "dtype"): # dtype_n = str(result.dtype) # else: # dtype_n = str(np.array([result]).dtype) # del result rep_magnitudes = [] dtype_n = None for g in range(nreps): np.random.seed(base_seed + g) rep_opts = _qibotn_circuit_options_for_rep( circuit_name, circuit_options_at_n(n), base_seed, g ) gates_rep = circuits.get(circuit_name, n, rep_opts) circuit_rep = backend.from_qasm(gates_rep.to_qasm()) result = backend(circuit_rep) # Use cp.asnumpy if available (cupy), otherwise fallback for numpy if hasattr(cp, "asnumpy") and isinstance(result, cp.ndarray): result = cp.asnumpy(result) else: result = np.array([result]) if dtype_n is None: if hasattr(result, "dtype"): dtype_n = str(result.dtype) else: dtype_n = str(np.array([result]).dtype) if backend.expectation_flag is not None: rep_magnitudes.append(float(abs(result))) del result if backend.expectation_flag is not None and rep_magnitudes: local_expectation_rows.append( (n, float(np.mean(rep_magnitudes))) ) local_meta_rows.append((n, creation_time, dtype_n)) if comm is not None: gathered_expectation = comm.gather(local_expectation_rows, root=0) gathered_meta = comm.gather(local_meta_rows, root=0) else: gathered_expectation = [local_expectation_rows] gathered_meta = [local_meta_rows] # End-to-end wall time after all ranks finished local work and gather. if comm is not None: comm.Barrier() t_sim_end = time.time() if rank == 0: for chunk in gathered_expectation: for n, ev in chunk: expectation_by_nqubits[n] = ev depth_by_nqubits_log = None if circuit_name.lower() == "supremacy": _depth_max = int(_kw.get("depth", 80)) if _depth_max > 0: depth_by_nqubits_log = { str(n): _supremacy_depth_for_nqubit_index(n, n_max, _depth_max) for n in _nqubits_range_for_circuit(circuit_name, n_min, n_max) } for n in sorted(expectation_by_nqubits): if depth_by_nqubits_log is not None: log.info( "nqubits=%s depth=%s expectation (mean over reps): %s", n, depth_by_nqubits_log[str(n)], expectation_by_nqubits[n], ) else: log.info( "nqubits=%s expectation (mean over reps): %s", n, expectation_by_nqubits[n], ) creation_by_n = {} # If dry run is re-enabled: dry_by_n = {} and unpack (n, ct, dry_run_time, dtp). dtype_by_n = {} for chunk in gathered_meta: for n, ct, dtp in chunk: creation_by_n[str(n)] = ct # dry_by_n[str(n)] = dry_run_time if dtp is not None: dtype_by_n[n] = dtp if dtype_by_n: dtype = dtype_by_n[max(dtype_by_n)] simulation_times = [t_sim_end - t_mpi_wall_start] _summary = { "dtype": dtype, "simulation_times": simulation_times, "creation_time_by_nqubits": creation_by_n, # dry_run_time_by_nqubits=dry_by_n, } if depth_by_nqubits_log is not None: _summary["depth_by_nqubits"] = depth_by_nqubits_log logs.log(**_summary) logs.average("simulation_times") if backend.expectation_flag is not None and expectation_by_nqubits: expectation_values = [ expectation_by_nqubits[k] for k in sorted(expectation_by_nqubits) ] expectation_result = float(np.mean(expectation_values)) logs.log( expectation_by_nqubits={str(k): v for k, v in expectation_by_nqubits.items()}, expectation_result=expectation_result, ) logs.dump() return logs def evolution_benchmark( nqubits, dt, solver, backend, platform=None, nreps=1, precision="double", dense=False, filename=None, ): """Performs adiabatic evolution with critical TFIM as the hard Hamiltonian.""" logs = JsonLogger(filename) logs.log(nqubits=nqubits, nreps=nreps, dt=dt, solver=solver, dense=dense) start_time = time.time() import qibo logs.log(import_time=time.time() - start_time) qibo.set_backend(backend=backend, platform=platform) qibo.set_precision(precision) logs.log( backend=qibo.get_backend(), platform=qibo.K.get_platform(), precision=qibo.get_precision(), device=qibo.get_device(), threads=qibo.get_threads(), version=qibo.__version__, ) from qibo import hamiltonians, models start_time = time.time() h0 = hamiltonians.X(nqubits, dense=dense) h1 = hamiltonians.TFIM(nqubits, h=1.0, dense=dense) logs.log(hamiltonian_creation_time=time.time() - start_time) start_time = time.time() evolution = models.AdiabaticEvolution(h0, h1, lambda t: t, dt=dt, solver=solver) logs.log(evolution_creation_time=time.time() - start_time) start_time = time.time() result = evolution(final_time=1.0) logs.log(dry_run_time=time.time() - start_time) dtype = str(result.dtype) del result simulation_times = [] for _ in range(nreps): start_time = time.time() result = evolution(final_time=1.0) simulation_times.append(time.time() - start_time) logs.log(dtype=dtype, simulation_times=simulation_times) logs.average("simulation_times") logs.dump() return logs