并行化支持完善
Some checks failed
Build wheels / build (ubuntu-latest, 3.11) (push) Has been cancelled
Build wheels / build (ubuntu-latest, 3.12) (push) Has been cancelled
Build wheels / build (ubuntu-latest, 3.13) (push) Has been cancelled
Tests / check (push) Has been cancelled
Tests / build (ubuntu-latest, 3.11) (push) Has been cancelled
Tests / build (ubuntu-latest, 3.12) (push) Has been cancelled
Tests / build (ubuntu-latest, 3.13) (push) Has been cancelled

This commit is contained in:
2026-05-07 23:26:53 +08:00
parent fbae48eb3d
commit 2f5c863952
9 changed files with 195 additions and 93 deletions

2
.gitignore vendored
View File

@@ -8,7 +8,7 @@ data/
bak/
path/
profiles/
vtune_expval/
perf*
# Distribution / packaging
.Python

View File

@@ -0,0 +1,53 @@
"""MPI parallel sliced contraction using pre-sliced tree."""
import time, pickle, os
import numpy as np
from mpi4py import MPI
NQUBITS, NLAYERS, NCORES = 25, 10, 96
comm = MPI.COMM_WORLD
rank, size = comm.Get_rank(), comm.Get_size()
os.environ['OMP_NUM_THREADS'] = str(max(1, NCORES // size))
os.environ['MKL_NUM_THREADS'] = str(max(1, NCORES // size))
import torch
import qibo, quimb as qu
from qibotn.observables import build_random_circuit
torch.set_num_threads(max(1, NCORES // size))
circuit = build_random_circuit(NQUBITS, NLAYERS)
qibo.set_backend("qibotn", platform="quimb")
backend = qibo.get_backend()
backend.configure_tn_simulation(ansatz="tn")
qc = backend._qibo_circuit_to_quimb(circuit, backend.circuit_ansatz)
tn = qc.local_expectation(qu.pauli('x') & qu.pauli('z'), (0, 1), rehearse='tn')
with open(f"data/tree_q{NQUBITS}_l{NLAYERS}_sliced.pkl", 'rb') as f:
#with open(f"data/tree_q{NQUBITS}_l{NLAYERS}.pkl", 'rb') as f:
tree = pickle.load(f)
arrays = [torch.from_numpy(np.ascontiguousarray(t._data, dtype=np.complex128)) for t in tn.tensors]
n_slices = tree.multiplicity
if rank == 0:
print(f"Slices: {n_slices}, Ranks: {size}, "
f"Peak: {tree.max_size() * 16 / 1e9:.2f} GB, "
f"Threads/rank: {max(1, NCORES // size)}, Backend: torch")
t0 = time.time()
result = None
for i in range(rank, n_slices, size):
val = tree.contract_slice(arrays, i, backend='torch')
val_np = val.cpu().numpy().reshape(-1)
result = val_np if result is None else result + val_np
if result is None:
result = np.zeros(1, dtype=np.complex128)
total = np.zeros_like(result) if rank == 0 else None
comm.Reduce(result, total, root=0)
if rank == 0:
print(f"Contract: {time.time() - t0:.4f}s Expectation: {0.5 * total[0].real:.10f}")

View File

@@ -1,56 +0,0 @@
"""Expectation value benchmark with timing breakdown."""
import time
import numpy as np
import torch
import qibo
from qibo import Circuit, gates, hamiltonians
from qibo.symbols import X, Z
import quimb as qu
from qibotn.parallel import parallel_path_search
# Config
NQUBITS = 25
NLAYERS = 10
WORKERS = 96
# Build circuit
np.random.seed(42)
circuit = Circuit(NQUBITS)
for _ in range(NLAYERS):
for q in range(NQUBITS):
circuit.add(gates.RY(q, theta=np.random.uniform(0, 2*np.pi)))
circuit.add(gates.RZ(q, theta=np.random.uniform(0, 2*np.pi)))
for q in range(NQUBITS):
circuit.add(gates.CNOT(q % NQUBITS, (q + 1) % NQUBITS))
# Setup backend
qibo.set_backend("qibotn", platform="quimb")
backend = qibo.get_backend()
backend.configure_tn_simulation(ansatz="tn")
torch.set_num_threads(WORKERS)
# Prepare TN
t0 = time.time()
qc = backend._qibo_circuit_to_quimb(circuit, backend.circuit_ansatz)
op = qu.pauli('x') & qu.pauli('z')
tn = qc.local_expectation(op, (0, 1), rehearse='tn')
t_prepare = time.time() - t0
# Search path
t0 = time.time()
tree = parallel_path_search(tn, tn.outer_inds(), 'processpool', 1024, 300, WORKERS)
t_search = time.time() - t0
# Contract
t0 = time.time()
for tensor in tn.tensors:
tensor._data = torch.from_numpy(np.asarray(tensor._data)).to(torch.complex128)
val = tn.contract(all, output_inds=(), optimize=tree)
t_contract = time.time() - t0
# Results
print(f"Prepare: {t_prepare:.4f}s")
print(f"Search: {t_search:.4f}s")
print(f"Contract: {t_contract:.4f}s")
print(f"Total: {t_prepare+t_search+t_contract:.4f}s")
print(f"Expectation: {0.5 * complex(val).real:.10f}")

34
benchmark_search.py Normal file
View File

@@ -0,0 +1,34 @@
"""Search contraction path and save."""
import time, os, pickle
from qibotn.parallel import parallel_path_search
from qibotn.observables import build_random_circuit
import qibo, quimb as qu
from mpi4py import MPI
NQUBITS, NLAYERS, WORKERS = 30, 10, 96
comm = MPI.COMM_WORLD
rank, size = comm.Get_rank(), comm.Get_size()
method = 'mpi' if size > 1 else 'processpool'
circuit = build_random_circuit(NQUBITS, NLAYERS)
qibo.set_backend("qibotn", platform="quimb")
backend = qibo.get_backend()
backend.configure_tn_simulation(ansatz="tn")
qc = backend._qibo_circuit_to_quimb(circuit, backend.circuit_ansatz)
tn = qc.local_expectation(qu.pauli('x') & qu.pauli('z'), (0, 1), rehearse='tn')
if rank == 0:
print(f"Searching {NQUBITS}q {NLAYERS}l, method={method}, ranks={size}, workers/rank={WORKERS}...")
t0 = time.time()
tree = parallel_path_search(tn, tn.outer_inds(), method=method,
total_repeats=1024, max_time=300, n_workers=WORKERS)
t_search = time.time() - t0
if rank == 0:
os.makedirs('data', exist_ok=True)
path = f"data/tree_q{NQUBITS}_l{NLAYERS}.pkl"
with open(path, 'wb') as f:
pickle.dump(tree, f)
print(f"Search: {t_search:.2f}s Peak: {tree.max_size() * 16 / 1e9:.2f} GB Saved: {path}")

16
benchmark_slice.py Normal file
View File

@@ -0,0 +1,16 @@
"""Slice saved tree and save."""
import pickle
NQUBITS, NLAYERS = 25, 10
with open(f"data/tree_q{NQUBITS}_l{NLAYERS}.pkl", 'rb') as f:
tree = pickle.load(f)
print(f"Original peak: {tree.max_size() * 16 / 1e9:.2f} GB")
tree_sliced = tree.slice_and_reconfigure(target_size=2**30) # 2^29 = 8 GB
with open(f"data/tree_q{NQUBITS}_l{NLAYERS}_sliced.pkl", 'wb') as f:
pickle.dump(tree_sliced, f)
print(f"Sliced peak: {tree_sliced.max_size() * 16 / 1e9:.2f} GB Slices: {tree_sliced.multiplicity}")

13
check_tree.py Normal file
View File

@@ -0,0 +1,13 @@
"""Check contraction tree statistics."""
import pickle, sys
path = sys.argv[1] if len(sys.argv) > 1 else "data/tree_q25_l10.pkl"
with open(path, 'rb') as f:
tree = pickle.load(f)
print(f"File: {path}")
print(f"Peak memory elements: {tree.max_size():.2e}")
print(f"Peak memory (GB): {tree.max_size() * 16 / 1e9:.2f}") # complex128 = 16 bytes
print(f"Total FLOPs: {tree.total_flops():.2e}")
print(f"Contraction width: {tree.contraction_width()}")
print(f"Multiplicity (slices): {tree.multiplicity}")

View File

@@ -438,6 +438,7 @@ def _expectation_parallel(self, circuit, observable, method, opts):
search_workers = opts.get('search_workers', 48)
mpi_contract = opts.get('mpi_contract', False)
torch_threads = opts.get('torch_threads', None)
slicing_opts = opts.get('slicing_opts', None)
qc = self._qibo_circuit_to_quimb(
circuit,
@@ -469,7 +470,8 @@ def _expectation_parallel(self, circuit, observable, method, opts):
method=method,
total_repeats=max_repeats,
max_time=max_time,
n_workers=search_workers
n_workers=search_workers,
slicing_opts=slicing_opts,
)
if tree is None:

View File

@@ -51,6 +51,21 @@ def create_hamiltonian_from_dict(data, circuit_nqubit):
return hamiltonians.SymbolicHamiltonian(sum(terms))
def build_random_circuit(nqubits, nlayers, seed=42):
"""Build a random circuit with RY+RZ+CNOT layers for benchmarks."""
import numpy as np
from qibo import Circuit, gates
np.random.seed(seed)
c = Circuit(nqubits)
for _ in range(nlayers):
for q in range(nqubits):
c.add(gates.RY(q, theta=np.random.uniform(0, 2*np.pi)))
c.add(gates.RZ(q, theta=np.random.uniform(0, 2*np.pi)))
for q in range(nqubits):
c.add(gates.CNOT(q % nqubits, (q + 1) % nqubits))
return c
def extract_gates_and_qubits(hamiltonian):
"""Extract per-term Pauli factors from a Qibo SymbolicHamiltonian.

View File

@@ -13,7 +13,7 @@ except ImportError:
MPI = None
def _serial_search(tn_bytes, output_inds, repeats, seed, max_time):
def _serial_search(tn_bytes, output_inds, repeats, seed, max_time, slicing_opts=None):
"""Single-process path search with cotengra."""
import random
import cotengra as ctg
@@ -26,44 +26,20 @@ def _serial_search(tn_bytes, output_inds, repeats, seed, max_time):
minimize="combo-256",
max_time=max_time,
optlib="random",
slicing_opts=slicing_opts,
progbar=False,
)
tree = tn.contraction_tree(optimize=opt, output_inds=output_inds)
return tree.combo_cost(factor=256), tree
def parallel_path_search(tn, output_inds, method='processpool', total_repeats=1024,
max_time=300, n_workers=48):
"""Parallel contraction path search.
Args:
tn: Tensor network (quimb TensorNetwork)
output_inds: Output indices for contraction
method: 'processpool' or 'mpi'
total_repeats: Total optimization repeats
max_time: Timeout per worker (seconds)
n_workers: Number of workers (processpool only)
Returns:
Best contraction tree found
"""
if method == 'mpi':
if not _HAVE_MPI:
raise ImportError("mpi4py not available")
return _mpi_search(tn, output_inds, total_repeats, max_time)
elif method == 'processpool':
return _processpool_search(tn, output_inds, total_repeats, n_workers, max_time)
else:
raise ValueError(f"Unknown method: {method}")
def _processpool_search(tn, output_inds, total_repeats, n_workers, max_time):
def _processpool_search(tn, output_inds, total_repeats, n_workers, max_time, slicing_opts=None):
"""ProcessPool-based parallel search."""
tn_bytes = pickle.dumps(tn)
repeats_per = max(1, total_repeats // n_workers)
pool = ProcessPoolExecutor(max_workers=n_workers)
futures = [
pool.submit(_serial_search, tn_bytes, output_inds, repeats_per, seed, max_time)
pool.submit(_serial_search, tn_bytes, output_inds, repeats_per, seed, max_time, slicing_opts)
for seed in range(n_workers)
]
best_cost, best_tree = float("inf"), None
@@ -89,15 +65,30 @@ def _processpool_search(tn, output_inds, total_repeats, n_workers, max_time):
return best_tree
def _mpi_search(tn, output_inds, total_repeats, max_time):
"""MPI-based parallel search across ranks."""
def _mpi_search(tn, output_inds, total_repeats, max_time, n_workers=None, slicing_opts=None):
"""MPI+ProcessPool hybrid search.
Each MPI rank uses a local ProcessPool for parallel search,
then the best tree is gathered and broadcast.
"""
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
tn_bytes = pickle.dumps(tn)
repeats_per = max(1, total_repeats // size)
local_cost, local_tree = _serial_search(tn_bytes, output_inds, repeats_per, rank, max_time)
all_results = comm.gather((local_cost, local_tree), root=0)
if n_workers and n_workers > 1:
# Hybrid: each MPI rank uses ProcessPool
local_tree = _processpool_search(
tn, output_inds, repeats_per, n_workers, max_time, slicing_opts
)
else:
# Pure MPI: each rank runs serial
local_cost, local_tree = _serial_search(
tn_bytes, output_inds, repeats_per, rank, max_time, slicing_opts
)
all_results = comm.gather((local_tree.combo_cost(factor=256), local_tree), root=0)
best_tree = None
if rank == 0:
best_cost = float("inf")
@@ -108,6 +99,36 @@ def _mpi_search(tn, output_inds, total_repeats, max_time):
return best_tree
def parallel_path_search(tn, output_inds, method='processpool', total_repeats=1024,
max_time=300, n_workers=48, slicing_opts=None):
"""Parallel contraction path search.
Args:
tn: Tensor network (quimb TensorNetwork)
output_inds: Output indices
method: 'processpool' | 'mpi' | 'serial'
total_repeats: Total optimization repeats
max_time: Timeout per worker (seconds)
n_workers: Number of workers (processpool only, or per-MPI-rank if MPI)
slicing_opts: dict for cotengra slicing_opts (memory control)
Returns:
Best contraction tree
"""
if method == 'serial':
tn_bytes = pickle.dumps(tn)
_, tree = _serial_search(tn_bytes, output_inds, total_repeats, 0, max_time, slicing_opts)
return tree
elif method == 'mpi':
if not _HAVE_MPI:
raise ImportError("mpi4py not available")
return _mpi_search(tn, output_inds, total_repeats, max_time, n_workers, slicing_opts)
elif method == 'processpool':
return _processpool_search(tn, output_inds, total_repeats, n_workers, max_time, slicing_opts)
else:
raise ValueError(f"Unknown method: {method}")
def parallel_contract(tree, arrays, method='mpi', comm=None):
"""Parallel sliced contraction.
@@ -115,10 +136,10 @@ def parallel_contract(tree, arrays, method='mpi', comm=None):
tree: Contraction tree
arrays: List of tensor arrays
method: 'mpi' (only MPI supported for now)
comm: MPI communicator (required for MPI)
comm: MPI communicator
Returns:
Contracted result (on root rank for MPI)
Contracted result (on root rank for MPI, otherwise for all)
"""
if method == 'mpi':
if not _HAVE_MPI or comm is None:
@@ -129,7 +150,7 @@ def parallel_contract(tree, arrays, method='mpi', comm=None):
def _contract_mpi(tree, arrays, comm, root=0):
"""Distribute contraction slices across MPI ranks."""
"""Distribute contraction slices across MPI ranks with Reduce."""
rank = comm.Get_rank()
size = comm.Get_size()
is_torch = type(arrays[0]).__module__.startswith("torch")
@@ -145,4 +166,8 @@ def _contract_mpi(tree, arrays, comm, root=0):
result = np.zeros_like(result_np) if rank == root else None
comm.Reduce(result_np, result, root=root)
if rank == root and is_torch:
import torch
return torch.from_numpy(np.asarray(result))
return result