Optimize MPI Sync with merged transfers, caching, and async overlap
Phase 1: Merge N+1 transfer() calls into a single transfer() per Sync(PatchList), reducing N+1 MPI_Waitall barriers to 1 via new Sync_merged() that collects all intra-patch and inter-patch grid segment lists into combined per-rank arrays. Phase 2: Cache grid segment lists and reuse grow-only communication buffers across RK4 substeps via SyncCache struct. Caches are per-level and per-variable-list (predictor/corrector), invalidated on regrid. Eliminates redundant build_ghost_gsl/build_owned_gsl0/build_gstl rebuilds and malloc/free cycles between regrids. Phase 3: Split Sync into async Sync_start/Sync_finish to overlap Cartesian ghost zone exchange (MPI_Isend/Irecv) with Shell patch synchronization. Uses MPI tag 2 to avoid conflicts with SH->Synch() which uses transfer() with tag 1. Also updates makefile.inc paths and flags for local build environment. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -3756,6 +3756,484 @@ void Parallel::Sync(MyList<Patch> *PatL, MyList<var> *VarList, int Symmetry)
|
||||
delete[] transfer_src;
|
||||
delete[] transfer_dst;
|
||||
}
|
||||
// Merged Sync: collect all intra-patch and inter-patch grid segment lists,
|
||||
// then issue a single transfer() call instead of N+1 separate ones.
|
||||
void Parallel::Sync_merged(MyList<Patch> *PatL, MyList<var> *VarList, int Symmetry)
|
||||
{
|
||||
int cpusize;
|
||||
MPI_Comm_size(MPI_COMM_WORLD, &cpusize);
|
||||
|
||||
MyList<Parallel::gridseg> **combined_src = new MyList<Parallel::gridseg> *[cpusize];
|
||||
MyList<Parallel::gridseg> **combined_dst = new MyList<Parallel::gridseg> *[cpusize];
|
||||
for (int node = 0; node < cpusize; node++)
|
||||
combined_src[node] = combined_dst[node] = 0;
|
||||
|
||||
// Phase A: Intra-patch ghost exchange segments
|
||||
MyList<Patch> *Pp = PatL;
|
||||
while (Pp)
|
||||
{
|
||||
Patch *Pat = Pp->data;
|
||||
MyList<Parallel::gridseg> *dst_ghost = build_ghost_gsl(Pat);
|
||||
|
||||
for (int node = 0; node < cpusize; node++)
|
||||
{
|
||||
MyList<Parallel::gridseg> *src_owned = build_owned_gsl0(Pat, node);
|
||||
MyList<Parallel::gridseg> *tsrc = 0, *tdst = 0;
|
||||
build_gstl(src_owned, dst_ghost, &tsrc, &tdst);
|
||||
|
||||
if (tsrc)
|
||||
{
|
||||
if (combined_src[node])
|
||||
combined_src[node]->catList(tsrc);
|
||||
else
|
||||
combined_src[node] = tsrc;
|
||||
}
|
||||
if (tdst)
|
||||
{
|
||||
if (combined_dst[node])
|
||||
combined_dst[node]->catList(tdst);
|
||||
else
|
||||
combined_dst[node] = tdst;
|
||||
}
|
||||
|
||||
if (src_owned)
|
||||
src_owned->destroyList();
|
||||
}
|
||||
|
||||
if (dst_ghost)
|
||||
dst_ghost->destroyList();
|
||||
|
||||
Pp = Pp->next;
|
||||
}
|
||||
|
||||
// Phase B: Inter-patch buffer exchange segments
|
||||
MyList<Parallel::gridseg> *dst_buffer = build_buffer_gsl(PatL);
|
||||
for (int node = 0; node < cpusize; node++)
|
||||
{
|
||||
MyList<Parallel::gridseg> *src_owned = build_owned_gsl(PatL, node, 5, Symmetry);
|
||||
MyList<Parallel::gridseg> *tsrc = 0, *tdst = 0;
|
||||
build_gstl(src_owned, dst_buffer, &tsrc, &tdst);
|
||||
|
||||
if (tsrc)
|
||||
{
|
||||
if (combined_src[node])
|
||||
combined_src[node]->catList(tsrc);
|
||||
else
|
||||
combined_src[node] = tsrc;
|
||||
}
|
||||
if (tdst)
|
||||
{
|
||||
if (combined_dst[node])
|
||||
combined_dst[node]->catList(tdst);
|
||||
else
|
||||
combined_dst[node] = tdst;
|
||||
}
|
||||
|
||||
if (src_owned)
|
||||
src_owned->destroyList();
|
||||
}
|
||||
if (dst_buffer)
|
||||
dst_buffer->destroyList();
|
||||
|
||||
// Phase C: Single transfer
|
||||
transfer(combined_src, combined_dst, VarList, VarList, Symmetry);
|
||||
|
||||
// Phase D: Cleanup
|
||||
for (int node = 0; node < cpusize; node++)
|
||||
{
|
||||
if (combined_src[node])
|
||||
combined_src[node]->destroyList();
|
||||
if (combined_dst[node])
|
||||
combined_dst[node]->destroyList();
|
||||
}
|
||||
delete[] combined_src;
|
||||
delete[] combined_dst;
|
||||
}
|
||||
// SyncCache constructor
|
||||
Parallel::SyncCache::SyncCache()
|
||||
: valid(false), cpusize(0), combined_src(0), combined_dst(0),
|
||||
send_lengths(0), recv_lengths(0), send_bufs(0), recv_bufs(0),
|
||||
send_buf_caps(0), recv_buf_caps(0), reqs(0), stats(0), max_reqs(0)
|
||||
{
|
||||
}
|
||||
// SyncCache invalidate: free grid segment lists but keep buffers
|
||||
void Parallel::SyncCache::invalidate()
|
||||
{
|
||||
if (!valid)
|
||||
return;
|
||||
for (int i = 0; i < cpusize; i++)
|
||||
{
|
||||
if (combined_src[i])
|
||||
combined_src[i]->destroyList();
|
||||
if (combined_dst[i])
|
||||
combined_dst[i]->destroyList();
|
||||
combined_src[i] = combined_dst[i] = 0;
|
||||
send_lengths[i] = recv_lengths[i] = 0;
|
||||
}
|
||||
valid = false;
|
||||
}
|
||||
// SyncCache destroy: free everything
|
||||
void Parallel::SyncCache::destroy()
|
||||
{
|
||||
invalidate();
|
||||
if (combined_src) delete[] combined_src;
|
||||
if (combined_dst) delete[] combined_dst;
|
||||
if (send_lengths) delete[] send_lengths;
|
||||
if (recv_lengths) delete[] recv_lengths;
|
||||
if (send_buf_caps) delete[] send_buf_caps;
|
||||
if (recv_buf_caps) delete[] recv_buf_caps;
|
||||
for (int i = 0; i < cpusize; i++)
|
||||
{
|
||||
if (send_bufs && send_bufs[i]) delete[] send_bufs[i];
|
||||
if (recv_bufs && recv_bufs[i]) delete[] recv_bufs[i];
|
||||
}
|
||||
if (send_bufs) delete[] send_bufs;
|
||||
if (recv_bufs) delete[] recv_bufs;
|
||||
if (reqs) delete[] reqs;
|
||||
if (stats) delete[] stats;
|
||||
combined_src = combined_dst = 0;
|
||||
send_lengths = recv_lengths = 0;
|
||||
send_buf_caps = recv_buf_caps = 0;
|
||||
send_bufs = recv_bufs = 0;
|
||||
reqs = 0; stats = 0;
|
||||
cpusize = 0; max_reqs = 0;
|
||||
}
|
||||
// transfer_cached: reuse pre-allocated buffers from SyncCache
|
||||
void Parallel::transfer_cached(MyList<Parallel::gridseg> **src, MyList<Parallel::gridseg> **dst,
|
||||
MyList<var> *VarList1, MyList<var> *VarList2,
|
||||
int Symmetry, SyncCache &cache)
|
||||
{
|
||||
int myrank;
|
||||
MPI_Comm_size(MPI_COMM_WORLD, &cache.cpusize);
|
||||
MPI_Comm_rank(MPI_COMM_WORLD, &myrank);
|
||||
int cpusize = cache.cpusize;
|
||||
|
||||
int req_no = 0;
|
||||
int node;
|
||||
|
||||
for (node = 0; node < cpusize; node++)
|
||||
{
|
||||
if (node == myrank)
|
||||
{
|
||||
int length = data_packer(0, src[myrank], dst[myrank], node, PACK, VarList1, VarList2, Symmetry);
|
||||
cache.recv_lengths[node] = length;
|
||||
if (length > 0)
|
||||
{
|
||||
if (length > cache.recv_buf_caps[node])
|
||||
{
|
||||
if (cache.recv_bufs[node]) delete[] cache.recv_bufs[node];
|
||||
cache.recv_bufs[node] = new double[length];
|
||||
cache.recv_buf_caps[node] = length;
|
||||
}
|
||||
data_packer(cache.recv_bufs[node], src[myrank], dst[myrank], node, PACK, VarList1, VarList2, Symmetry);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
// send
|
||||
int slength = data_packer(0, src[myrank], dst[myrank], node, PACK, VarList1, VarList2, Symmetry);
|
||||
cache.send_lengths[node] = slength;
|
||||
if (slength > 0)
|
||||
{
|
||||
if (slength > cache.send_buf_caps[node])
|
||||
{
|
||||
if (cache.send_bufs[node]) delete[] cache.send_bufs[node];
|
||||
cache.send_bufs[node] = new double[slength];
|
||||
cache.send_buf_caps[node] = slength;
|
||||
}
|
||||
data_packer(cache.send_bufs[node], src[myrank], dst[myrank], node, PACK, VarList1, VarList2, Symmetry);
|
||||
MPI_Isend((void *)cache.send_bufs[node], slength, MPI_DOUBLE, node, 1, MPI_COMM_WORLD, cache.reqs + req_no++);
|
||||
}
|
||||
// recv
|
||||
int rlength = data_packer(0, src[node], dst[node], node, UNPACK, VarList1, VarList2, Symmetry);
|
||||
cache.recv_lengths[node] = rlength;
|
||||
if (rlength > 0)
|
||||
{
|
||||
if (rlength > cache.recv_buf_caps[node])
|
||||
{
|
||||
if (cache.recv_bufs[node]) delete[] cache.recv_bufs[node];
|
||||
cache.recv_bufs[node] = new double[rlength];
|
||||
cache.recv_buf_caps[node] = rlength;
|
||||
}
|
||||
MPI_Irecv((void *)cache.recv_bufs[node], rlength, MPI_DOUBLE, node, 1, MPI_COMM_WORLD, cache.reqs + req_no++);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
MPI_Waitall(req_no, cache.reqs, cache.stats);
|
||||
|
||||
for (node = 0; node < cpusize; node++)
|
||||
if (cache.recv_bufs[node] && cache.recv_lengths[node] > 0)
|
||||
data_packer(cache.recv_bufs[node], src[node], dst[node], node, UNPACK, VarList1, VarList2, Symmetry);
|
||||
}
|
||||
// Sync_cached: build grid segment lists on first call, reuse on subsequent calls
|
||||
void Parallel::Sync_cached(MyList<Patch> *PatL, MyList<var> *VarList, int Symmetry, SyncCache &cache)
|
||||
{
|
||||
if (!cache.valid)
|
||||
{
|
||||
int cpusize;
|
||||
MPI_Comm_size(MPI_COMM_WORLD, &cpusize);
|
||||
cache.cpusize = cpusize;
|
||||
|
||||
// Allocate cache arrays if needed
|
||||
if (!cache.combined_src)
|
||||
{
|
||||
cache.combined_src = new MyList<Parallel::gridseg> *[cpusize];
|
||||
cache.combined_dst = new MyList<Parallel::gridseg> *[cpusize];
|
||||
cache.send_lengths = new int[cpusize];
|
||||
cache.recv_lengths = new int[cpusize];
|
||||
cache.send_bufs = new double *[cpusize];
|
||||
cache.recv_bufs = new double *[cpusize];
|
||||
cache.send_buf_caps = new int[cpusize];
|
||||
cache.recv_buf_caps = new int[cpusize];
|
||||
for (int i = 0; i < cpusize; i++)
|
||||
{
|
||||
cache.send_bufs[i] = cache.recv_bufs[i] = 0;
|
||||
cache.send_buf_caps[i] = cache.recv_buf_caps[i] = 0;
|
||||
}
|
||||
cache.max_reqs = 2 * cpusize;
|
||||
cache.reqs = new MPI_Request[cache.max_reqs];
|
||||
cache.stats = new MPI_Status[cache.max_reqs];
|
||||
}
|
||||
|
||||
for (int node = 0; node < cpusize; node++)
|
||||
{
|
||||
cache.combined_src[node] = cache.combined_dst[node] = 0;
|
||||
cache.send_lengths[node] = cache.recv_lengths[node] = 0;
|
||||
}
|
||||
|
||||
// Build intra-patch segments (same as Sync_merged Phase A)
|
||||
MyList<Patch> *Pp = PatL;
|
||||
while (Pp)
|
||||
{
|
||||
Patch *Pat = Pp->data;
|
||||
MyList<Parallel::gridseg> *dst_ghost = build_ghost_gsl(Pat);
|
||||
for (int node = 0; node < cpusize; node++)
|
||||
{
|
||||
MyList<Parallel::gridseg> *src_owned = build_owned_gsl0(Pat, node);
|
||||
MyList<Parallel::gridseg> *tsrc = 0, *tdst = 0;
|
||||
build_gstl(src_owned, dst_ghost, &tsrc, &tdst);
|
||||
if (tsrc)
|
||||
{
|
||||
if (cache.combined_src[node])
|
||||
cache.combined_src[node]->catList(tsrc);
|
||||
else
|
||||
cache.combined_src[node] = tsrc;
|
||||
}
|
||||
if (tdst)
|
||||
{
|
||||
if (cache.combined_dst[node])
|
||||
cache.combined_dst[node]->catList(tdst);
|
||||
else
|
||||
cache.combined_dst[node] = tdst;
|
||||
}
|
||||
if (src_owned) src_owned->destroyList();
|
||||
}
|
||||
if (dst_ghost) dst_ghost->destroyList();
|
||||
Pp = Pp->next;
|
||||
}
|
||||
|
||||
// Build inter-patch segments (same as Sync_merged Phase B)
|
||||
MyList<Parallel::gridseg> *dst_buffer = build_buffer_gsl(PatL);
|
||||
for (int node = 0; node < cpusize; node++)
|
||||
{
|
||||
MyList<Parallel::gridseg> *src_owned = build_owned_gsl(PatL, node, 5, Symmetry);
|
||||
MyList<Parallel::gridseg> *tsrc = 0, *tdst = 0;
|
||||
build_gstl(src_owned, dst_buffer, &tsrc, &tdst);
|
||||
if (tsrc)
|
||||
{
|
||||
if (cache.combined_src[node])
|
||||
cache.combined_src[node]->catList(tsrc);
|
||||
else
|
||||
cache.combined_src[node] = tsrc;
|
||||
}
|
||||
if (tdst)
|
||||
{
|
||||
if (cache.combined_dst[node])
|
||||
cache.combined_dst[node]->catList(tdst);
|
||||
else
|
||||
cache.combined_dst[node] = tdst;
|
||||
}
|
||||
if (src_owned) src_owned->destroyList();
|
||||
}
|
||||
if (dst_buffer) dst_buffer->destroyList();
|
||||
|
||||
cache.valid = true;
|
||||
}
|
||||
|
||||
// Use cached lists with buffer-reusing transfer
|
||||
transfer_cached(cache.combined_src, cache.combined_dst, VarList, VarList, Symmetry, cache);
|
||||
}
|
||||
// Sync_start: pack and post MPI_Isend/Irecv, return immediately
|
||||
void Parallel::Sync_start(MyList<Patch> *PatL, MyList<var> *VarList, int Symmetry,
|
||||
SyncCache &cache, AsyncSyncState &state)
|
||||
{
|
||||
// Ensure cache is built
|
||||
if (!cache.valid)
|
||||
{
|
||||
// Build cache (same logic as Sync_cached)
|
||||
int cpusize;
|
||||
MPI_Comm_size(MPI_COMM_WORLD, &cpusize);
|
||||
cache.cpusize = cpusize;
|
||||
|
||||
if (!cache.combined_src)
|
||||
{
|
||||
cache.combined_src = new MyList<Parallel::gridseg> *[cpusize];
|
||||
cache.combined_dst = new MyList<Parallel::gridseg> *[cpusize];
|
||||
cache.send_lengths = new int[cpusize];
|
||||
cache.recv_lengths = new int[cpusize];
|
||||
cache.send_bufs = new double *[cpusize];
|
||||
cache.recv_bufs = new double *[cpusize];
|
||||
cache.send_buf_caps = new int[cpusize];
|
||||
cache.recv_buf_caps = new int[cpusize];
|
||||
for (int i = 0; i < cpusize; i++)
|
||||
{
|
||||
cache.send_bufs[i] = cache.recv_bufs[i] = 0;
|
||||
cache.send_buf_caps[i] = cache.recv_buf_caps[i] = 0;
|
||||
}
|
||||
cache.max_reqs = 2 * cpusize;
|
||||
cache.reqs = new MPI_Request[cache.max_reqs];
|
||||
cache.stats = new MPI_Status[cache.max_reqs];
|
||||
}
|
||||
|
||||
for (int node = 0; node < cpusize; node++)
|
||||
{
|
||||
cache.combined_src[node] = cache.combined_dst[node] = 0;
|
||||
cache.send_lengths[node] = cache.recv_lengths[node] = 0;
|
||||
}
|
||||
|
||||
MyList<Patch> *Pp = PatL;
|
||||
while (Pp)
|
||||
{
|
||||
Patch *Pat = Pp->data;
|
||||
MyList<Parallel::gridseg> *dst_ghost = build_ghost_gsl(Pat);
|
||||
for (int node = 0; node < cpusize; node++)
|
||||
{
|
||||
MyList<Parallel::gridseg> *src_owned = build_owned_gsl0(Pat, node);
|
||||
MyList<Parallel::gridseg> *tsrc = 0, *tdst = 0;
|
||||
build_gstl(src_owned, dst_ghost, &tsrc, &tdst);
|
||||
if (tsrc)
|
||||
{
|
||||
if (cache.combined_src[node])
|
||||
cache.combined_src[node]->catList(tsrc);
|
||||
else
|
||||
cache.combined_src[node] = tsrc;
|
||||
}
|
||||
if (tdst)
|
||||
{
|
||||
if (cache.combined_dst[node])
|
||||
cache.combined_dst[node]->catList(tdst);
|
||||
else
|
||||
cache.combined_dst[node] = tdst;
|
||||
}
|
||||
if (src_owned) src_owned->destroyList();
|
||||
}
|
||||
if (dst_ghost) dst_ghost->destroyList();
|
||||
Pp = Pp->next;
|
||||
}
|
||||
|
||||
MyList<Parallel::gridseg> *dst_buffer = build_buffer_gsl(PatL);
|
||||
for (int node = 0; node < cpusize; node++)
|
||||
{
|
||||
MyList<Parallel::gridseg> *src_owned = build_owned_gsl(PatL, node, 5, Symmetry);
|
||||
MyList<Parallel::gridseg> *tsrc = 0, *tdst = 0;
|
||||
build_gstl(src_owned, dst_buffer, &tsrc, &tdst);
|
||||
if (tsrc)
|
||||
{
|
||||
if (cache.combined_src[node])
|
||||
cache.combined_src[node]->catList(tsrc);
|
||||
else
|
||||
cache.combined_src[node] = tsrc;
|
||||
}
|
||||
if (tdst)
|
||||
{
|
||||
if (cache.combined_dst[node])
|
||||
cache.combined_dst[node]->catList(tdst);
|
||||
else
|
||||
cache.combined_dst[node] = tdst;
|
||||
}
|
||||
if (src_owned) src_owned->destroyList();
|
||||
}
|
||||
if (dst_buffer) dst_buffer->destroyList();
|
||||
cache.valid = true;
|
||||
}
|
||||
|
||||
// Now pack and post async MPI operations
|
||||
int myrank;
|
||||
MPI_Comm_rank(MPI_COMM_WORLD, &myrank);
|
||||
int cpusize = cache.cpusize;
|
||||
state.req_no = 0;
|
||||
state.active = true;
|
||||
|
||||
MyList<Parallel::gridseg> **src = cache.combined_src;
|
||||
MyList<Parallel::gridseg> **dst = cache.combined_dst;
|
||||
|
||||
for (int node = 0; node < cpusize; node++)
|
||||
{
|
||||
if (node == myrank)
|
||||
{
|
||||
int length = data_packer(0, src[myrank], dst[myrank], node, PACK, VarList, VarList, Symmetry);
|
||||
cache.recv_lengths[node] = length;
|
||||
if (length > 0)
|
||||
{
|
||||
if (length > cache.recv_buf_caps[node])
|
||||
{
|
||||
if (cache.recv_bufs[node]) delete[] cache.recv_bufs[node];
|
||||
cache.recv_bufs[node] = new double[length];
|
||||
cache.recv_buf_caps[node] = length;
|
||||
}
|
||||
data_packer(cache.recv_bufs[node], src[myrank], dst[myrank], node, PACK, VarList, VarList, Symmetry);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
int slength = data_packer(0, src[myrank], dst[myrank], node, PACK, VarList, VarList, Symmetry);
|
||||
cache.send_lengths[node] = slength;
|
||||
if (slength > 0)
|
||||
{
|
||||
if (slength > cache.send_buf_caps[node])
|
||||
{
|
||||
if (cache.send_bufs[node]) delete[] cache.send_bufs[node];
|
||||
cache.send_bufs[node] = new double[slength];
|
||||
cache.send_buf_caps[node] = slength;
|
||||
}
|
||||
data_packer(cache.send_bufs[node], src[myrank], dst[myrank], node, PACK, VarList, VarList, Symmetry);
|
||||
MPI_Isend((void *)cache.send_bufs[node], slength, MPI_DOUBLE, node, 2, MPI_COMM_WORLD, cache.reqs + state.req_no++);
|
||||
}
|
||||
int rlength = data_packer(0, src[node], dst[node], node, UNPACK, VarList, VarList, Symmetry);
|
||||
cache.recv_lengths[node] = rlength;
|
||||
if (rlength > 0)
|
||||
{
|
||||
if (rlength > cache.recv_buf_caps[node])
|
||||
{
|
||||
if (cache.recv_bufs[node]) delete[] cache.recv_bufs[node];
|
||||
cache.recv_bufs[node] = new double[rlength];
|
||||
cache.recv_buf_caps[node] = rlength;
|
||||
}
|
||||
MPI_Irecv((void *)cache.recv_bufs[node], rlength, MPI_DOUBLE, node, 2, MPI_COMM_WORLD, cache.reqs + state.req_no++);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// Sync_finish: wait for async MPI operations and unpack
|
||||
void Parallel::Sync_finish(SyncCache &cache, AsyncSyncState &state,
|
||||
MyList<var> *VarList, int Symmetry)
|
||||
{
|
||||
if (!state.active)
|
||||
return;
|
||||
|
||||
MPI_Waitall(state.req_no, cache.reqs, cache.stats);
|
||||
|
||||
int cpusize = cache.cpusize;
|
||||
MyList<Parallel::gridseg> **src = cache.combined_src;
|
||||
MyList<Parallel::gridseg> **dst = cache.combined_dst;
|
||||
|
||||
for (int node = 0; node < cpusize; node++)
|
||||
if (cache.recv_bufs[node] && cache.recv_lengths[node] > 0)
|
||||
data_packer(cache.recv_bufs[node], src[node], dst[node], node, UNPACK, VarList, VarList, Symmetry);
|
||||
|
||||
state.active = false;
|
||||
}
|
||||
// collect buffer grid segments or blocks for the periodic boundary condition of given patch
|
||||
// ---------------------------------------------------
|
||||
// |con | |con |
|
||||
|
||||
Reference in New Issue
Block a user