Add SyncBegin/SyncEnd to Parallel for MPI communication-computation overlap
Split the blocking Parallel::Sync into async SyncBegin (initiates local copy + MPI_Isend/Irecv) and SyncEnd (MPI_Waitall + unpack). This allows overlapping MPI ghost zone exchange with error checking and Shell patch computation. Modified Step() in bssn_class.C for both PSTR==0 and PSTR==1/2/3 versions to start Sync before error checks, overlapping the MPI_Allreduce with the ongoing ghost zone transfers. Co-authored-by: copilot-swe-agent[bot] <198982749+copilot@users.noreply.github.com>
This commit is contained in:
@@ -3756,6 +3756,231 @@ void Parallel::Sync(MyList<Patch> *PatL, MyList<var> *VarList, int Symmetry)
|
||||
delete[] transfer_src;
|
||||
delete[] transfer_dst;
|
||||
}
|
||||
//
|
||||
// Async Sync: split into SyncBegin (initiate MPI) and SyncEnd (wait + unpack)
|
||||
// This allows overlapping MPI communication with computation.
|
||||
//
|
||||
static void transfer_begin(Parallel::TransferState *ts)
|
||||
{
|
||||
int myrank;
|
||||
MPI_Comm_rank(MPI_COMM_WORLD, &myrank);
|
||||
int cpusize = ts->cpusize;
|
||||
|
||||
ts->reqs = new MPI_Request[2 * cpusize];
|
||||
ts->stats = new MPI_Status[2 * cpusize];
|
||||
ts->req_no = 0;
|
||||
ts->send_data = new double *[cpusize];
|
||||
ts->rec_data = new double *[cpusize];
|
||||
int length;
|
||||
|
||||
for (int node = 0; node < cpusize; node++)
|
||||
{
|
||||
ts->send_data[node] = ts->rec_data[node] = 0;
|
||||
if (node == myrank)
|
||||
{
|
||||
// Local copy: pack then immediately unpack (no MPI needed)
|
||||
if ((length = Parallel::data_packer(0, ts->transfer_src[myrank], ts->transfer_dst[myrank],
|
||||
node, PACK, ts->VarList1, ts->VarList2, ts->Symmetry)))
|
||||
{
|
||||
double *local_data = new double[length];
|
||||
if (!local_data)
|
||||
{
|
||||
cout << "out of memory in transfer_begin, local copy" << endl;
|
||||
MPI_Abort(MPI_COMM_WORLD, 1);
|
||||
}
|
||||
Parallel::data_packer(local_data, ts->transfer_src[myrank], ts->transfer_dst[myrank],
|
||||
node, PACK, ts->VarList1, ts->VarList2, ts->Symmetry);
|
||||
Parallel::data_packer(local_data, ts->transfer_src[node], ts->transfer_dst[node],
|
||||
node, UNPACK, ts->VarList1, ts->VarList2, ts->Symmetry);
|
||||
delete[] local_data;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
// send from this cpu to cpu#node
|
||||
if ((length = Parallel::data_packer(0, ts->transfer_src[myrank], ts->transfer_dst[myrank],
|
||||
node, PACK, ts->VarList1, ts->VarList2, ts->Symmetry)))
|
||||
{
|
||||
ts->send_data[node] = new double[length];
|
||||
if (!ts->send_data[node])
|
||||
{
|
||||
cout << "out of memory in transfer_begin, send" << endl;
|
||||
MPI_Abort(MPI_COMM_WORLD, 1);
|
||||
}
|
||||
Parallel::data_packer(ts->send_data[node], ts->transfer_src[myrank], ts->transfer_dst[myrank],
|
||||
node, PACK, ts->VarList1, ts->VarList2, ts->Symmetry);
|
||||
MPI_Isend((void *)ts->send_data[node], length, MPI_DOUBLE, node, 1, MPI_COMM_WORLD,
|
||||
ts->reqs + ts->req_no++);
|
||||
}
|
||||
// receive from cpu#node to this cpu
|
||||
if ((length = Parallel::data_packer(0, ts->transfer_src[node], ts->transfer_dst[node],
|
||||
node, UNPACK, ts->VarList1, ts->VarList2, ts->Symmetry)))
|
||||
{
|
||||
ts->rec_data[node] = new double[length];
|
||||
if (!ts->rec_data[node])
|
||||
{
|
||||
cout << "out of memory in transfer_begin, recv" << endl;
|
||||
MPI_Abort(MPI_COMM_WORLD, 1);
|
||||
}
|
||||
MPI_Irecv((void *)ts->rec_data[node], length, MPI_DOUBLE, node, 1, MPI_COMM_WORLD,
|
||||
ts->reqs + ts->req_no++);
|
||||
}
|
||||
}
|
||||
}
|
||||
// NOTE: MPI_Waitall is NOT called here - that happens in transfer_end
|
||||
}
|
||||
//
|
||||
static void transfer_end(Parallel::TransferState *ts)
|
||||
{
|
||||
// Wait for all pending MPI operations
|
||||
MPI_Waitall(ts->req_no, ts->reqs, ts->stats);
|
||||
|
||||
// Unpack received data from remote ranks
|
||||
for (int node = 0; node < ts->cpusize; node++)
|
||||
if (ts->rec_data[node])
|
||||
Parallel::data_packer(ts->rec_data[node], ts->transfer_src[node], ts->transfer_dst[node],
|
||||
node, UNPACK, ts->VarList1, ts->VarList2, ts->Symmetry);
|
||||
|
||||
// Cleanup MPI buffers
|
||||
for (int node = 0; node < ts->cpusize; node++)
|
||||
{
|
||||
if (ts->send_data[node])
|
||||
delete[] ts->send_data[node];
|
||||
if (ts->rec_data[node])
|
||||
delete[] ts->rec_data[node];
|
||||
}
|
||||
delete[] ts->reqs;
|
||||
delete[] ts->stats;
|
||||
delete[] ts->send_data;
|
||||
delete[] ts->rec_data;
|
||||
}
|
||||
//
|
||||
Parallel::SyncHandle *Parallel::SyncBegin(Patch *Pat, MyList<var> *VarList, int Symmetry)
|
||||
{
|
||||
int cpusize;
|
||||
MPI_Comm_size(MPI_COMM_WORLD, &cpusize);
|
||||
|
||||
SyncHandle *handle = new SyncHandle;
|
||||
handle->num_states = 1;
|
||||
handle->states = new TransferState[1];
|
||||
|
||||
TransferState *ts = &handle->states[0];
|
||||
ts->cpusize = cpusize;
|
||||
ts->VarList1 = VarList;
|
||||
ts->VarList2 = VarList;
|
||||
ts->Symmetry = Symmetry;
|
||||
|
||||
ts->dst = build_ghost_gsl(Pat);
|
||||
ts->src = new MyList<Parallel::gridseg> *[cpusize];
|
||||
ts->transfer_src = new MyList<Parallel::gridseg> *[cpusize];
|
||||
ts->transfer_dst = new MyList<Parallel::gridseg> *[cpusize];
|
||||
for (int node = 0; node < cpusize; node++)
|
||||
{
|
||||
ts->src[node] = build_owned_gsl0(Pat, node);
|
||||
build_gstl(ts->src[node], ts->dst, &ts->transfer_src[node], &ts->transfer_dst[node]);
|
||||
}
|
||||
|
||||
transfer_begin(ts);
|
||||
|
||||
return handle;
|
||||
}
|
||||
//
|
||||
Parallel::SyncHandle *Parallel::SyncBegin(MyList<Patch> *PatL, MyList<var> *VarList, int Symmetry)
|
||||
{
|
||||
int cpusize;
|
||||
MPI_Comm_size(MPI_COMM_WORLD, &cpusize);
|
||||
|
||||
// Count patches
|
||||
int num_patches = 0;
|
||||
MyList<Patch> *Pp = PatL;
|
||||
while (Pp) { num_patches++; Pp = Pp->next; }
|
||||
|
||||
SyncHandle *handle = new SyncHandle;
|
||||
handle->num_states = num_patches + 1; // intra-patch transfers + 1 inter-patch transfer
|
||||
handle->states = new TransferState[handle->num_states];
|
||||
|
||||
// Intra-patch sync: for each patch, build ghost lists and initiate transfer
|
||||
int idx = 0;
|
||||
Pp = PatL;
|
||||
while (Pp)
|
||||
{
|
||||
TransferState *ts = &handle->states[idx];
|
||||
ts->cpusize = cpusize;
|
||||
ts->VarList1 = VarList;
|
||||
ts->VarList2 = VarList;
|
||||
ts->Symmetry = Symmetry;
|
||||
|
||||
ts->dst = build_ghost_gsl(Pp->data);
|
||||
ts->src = new MyList<Parallel::gridseg> *[cpusize];
|
||||
ts->transfer_src = new MyList<Parallel::gridseg> *[cpusize];
|
||||
ts->transfer_dst = new MyList<Parallel::gridseg> *[cpusize];
|
||||
for (int node = 0; node < cpusize; node++)
|
||||
{
|
||||
ts->src[node] = build_owned_gsl0(Pp->data, node);
|
||||
build_gstl(ts->src[node], ts->dst, &ts->transfer_src[node], &ts->transfer_dst[node]);
|
||||
}
|
||||
|
||||
transfer_begin(ts);
|
||||
|
||||
idx++;
|
||||
Pp = Pp->next;
|
||||
}
|
||||
|
||||
// Inter-patch sync: buffer zone exchange between patches
|
||||
{
|
||||
TransferState *ts = &handle->states[idx];
|
||||
ts->cpusize = cpusize;
|
||||
ts->VarList1 = VarList;
|
||||
ts->VarList2 = VarList;
|
||||
ts->Symmetry = Symmetry;
|
||||
|
||||
ts->dst = build_buffer_gsl(PatL);
|
||||
ts->src = new MyList<Parallel::gridseg> *[cpusize];
|
||||
ts->transfer_src = new MyList<Parallel::gridseg> *[cpusize];
|
||||
ts->transfer_dst = new MyList<Parallel::gridseg> *[cpusize];
|
||||
for (int node = 0; node < cpusize; node++)
|
||||
{
|
||||
ts->src[node] = build_owned_gsl(PatL, node, 5, Symmetry);
|
||||
build_gstl(ts->src[node], ts->dst, &ts->transfer_src[node], &ts->transfer_dst[node]);
|
||||
}
|
||||
|
||||
transfer_begin(ts);
|
||||
}
|
||||
|
||||
return handle;
|
||||
}
|
||||
//
|
||||
void Parallel::SyncEnd(SyncHandle *handle)
|
||||
{
|
||||
if (!handle)
|
||||
return;
|
||||
|
||||
// Wait for all pending transfers and unpack
|
||||
for (int i = 0; i < handle->num_states; i++)
|
||||
{
|
||||
TransferState *ts = &handle->states[i];
|
||||
transfer_end(ts);
|
||||
|
||||
// Cleanup grid segment lists
|
||||
if (ts->dst)
|
||||
ts->dst->destroyList();
|
||||
for (int node = 0; node < ts->cpusize; node++)
|
||||
{
|
||||
if (ts->src[node])
|
||||
ts->src[node]->destroyList();
|
||||
if (ts->transfer_src[node])
|
||||
ts->transfer_src[node]->destroyList();
|
||||
if (ts->transfer_dst[node])
|
||||
ts->transfer_dst[node]->destroyList();
|
||||
}
|
||||
delete[] ts->src;
|
||||
delete[] ts->transfer_src;
|
||||
delete[] ts->transfer_dst;
|
||||
}
|
||||
|
||||
delete[] handle->states;
|
||||
delete handle;
|
||||
}
|
||||
// collect buffer grid segments or blocks for the periodic boundary condition of given patch
|
||||
// ---------------------------------------------------
|
||||
// |con | |con |
|
||||
|
||||
@@ -81,6 +81,32 @@ namespace Parallel
|
||||
int Symmetry);
|
||||
void Sync(Patch *Pat, MyList<var> *VarList, int Symmetry);
|
||||
void Sync(MyList<Patch> *PatL, MyList<var> *VarList, int Symmetry);
|
||||
|
||||
// Async Sync: overlap MPI communication with computation
|
||||
struct TransferState
|
||||
{
|
||||
MPI_Request *reqs;
|
||||
MPI_Status *stats;
|
||||
int req_no;
|
||||
double **send_data;
|
||||
double **rec_data;
|
||||
int cpusize;
|
||||
MyList<gridseg> **transfer_src;
|
||||
MyList<gridseg> **transfer_dst;
|
||||
MyList<gridseg> **src;
|
||||
MyList<gridseg> *dst;
|
||||
MyList<var> *VarList1;
|
||||
MyList<var> *VarList2;
|
||||
int Symmetry;
|
||||
};
|
||||
struct SyncHandle
|
||||
{
|
||||
TransferState *states;
|
||||
int num_states;
|
||||
};
|
||||
SyncHandle *SyncBegin(Patch *Pat, MyList<var> *VarList, int Symmetry);
|
||||
SyncHandle *SyncBegin(MyList<Patch> *PatL, MyList<var> *VarList, int Symmetry);
|
||||
void SyncEnd(SyncHandle *handle);
|
||||
void OutBdLow2Hi(Patch *Patc, Patch *Patf,
|
||||
MyList<var> *VarList1 /* source */, MyList<var> *VarList2 /* target */,
|
||||
int Symmetry);
|
||||
|
||||
@@ -3158,13 +3158,18 @@ void bssn_class::Step(int lev, int YN)
|
||||
}
|
||||
Pp = Pp->next;
|
||||
}
|
||||
// check error information
|
||||
|
||||
// Start async ghost zone exchange - overlaps with error check and Shell computation
|
||||
Parallel::SyncHandle *sync_pre = Parallel::SyncBegin(GH->PatL[lev], SynchList_pre, Symmetry);
|
||||
|
||||
// check error information (overlaps with MPI transfer)
|
||||
{
|
||||
int erh = ERROR;
|
||||
MPI_Allreduce(&erh, &ERROR, 1, MPI_INT, MPI_SUM, MPI_COMM_WORLD);
|
||||
}
|
||||
if (ERROR)
|
||||
{
|
||||
Parallel::SyncEnd(sync_pre); sync_pre = 0;
|
||||
Parallel::Dump_Data(GH->PatL[lev], StateList, 0, PhysTime, dT_lev);
|
||||
if (myrank == 0)
|
||||
{
|
||||
@@ -3324,6 +3329,7 @@ void bssn_class::Step(int lev, int YN)
|
||||
|
||||
if (ERROR)
|
||||
{
|
||||
Parallel::SyncEnd(sync_pre); sync_pre = 0;
|
||||
SH->Dump_Data(StateList, 0, PhysTime, dT_lev);
|
||||
if (myrank == 0)
|
||||
{
|
||||
@@ -3334,7 +3340,8 @@ void bssn_class::Step(int lev, int YN)
|
||||
}
|
||||
#endif
|
||||
|
||||
Parallel::Sync(GH->PatL[lev], SynchList_pre, Symmetry);
|
||||
// Complete async ghost zone exchange
|
||||
if (sync_pre) Parallel::SyncEnd(sync_pre);
|
||||
|
||||
#ifdef WithShell
|
||||
if (lev == 0)
|
||||
@@ -3528,7 +3535,10 @@ void bssn_class::Step(int lev, int YN)
|
||||
Pp = Pp->next;
|
||||
}
|
||||
|
||||
// check error information
|
||||
// Start async ghost zone exchange - overlaps with error check and Shell computation
|
||||
Parallel::SyncHandle *sync_cor = Parallel::SyncBegin(GH->PatL[lev], SynchList_cor, Symmetry);
|
||||
|
||||
// check error information (overlaps with MPI transfer)
|
||||
{
|
||||
int erh = ERROR;
|
||||
MPI_Allreduce(&erh, &ERROR, 1, MPI_INT, MPI_SUM, MPI_COMM_WORLD);
|
||||
@@ -3536,6 +3546,7 @@ void bssn_class::Step(int lev, int YN)
|
||||
|
||||
if (ERROR)
|
||||
{
|
||||
Parallel::SyncEnd(sync_cor); sync_cor = 0;
|
||||
Parallel::Dump_Data(GH->PatL[lev], SynchList_pre, 0, PhysTime, dT_lev);
|
||||
if (myrank == 0)
|
||||
{
|
||||
@@ -3692,6 +3703,7 @@ void bssn_class::Step(int lev, int YN)
|
||||
}
|
||||
if (ERROR)
|
||||
{
|
||||
Parallel::SyncEnd(sync_cor); sync_cor = 0;
|
||||
SH->Dump_Data(SynchList_pre, 0, PhysTime, dT_lev);
|
||||
if (myrank == 0)
|
||||
{
|
||||
@@ -3704,7 +3716,8 @@ void bssn_class::Step(int lev, int YN)
|
||||
}
|
||||
#endif
|
||||
|
||||
Parallel::Sync(GH->PatL[lev], SynchList_cor, Symmetry);
|
||||
// Complete async ghost zone exchange
|
||||
if (sync_cor) Parallel::SyncEnd(sync_cor);
|
||||
|
||||
#ifdef WithShell
|
||||
if (lev == 0)
|
||||
@@ -4943,13 +4956,17 @@ void bssn_class::Step(int lev, int YN)
|
||||
|
||||
// misc::tillherecheck(GH->Commlev[lev],GH->start_rank[lev],"after Predictor rhs calculation");
|
||||
|
||||
// check error information
|
||||
// Start async ghost zone exchange - overlaps with error check and BH position
|
||||
Parallel::SyncHandle *sync_pre = Parallel::SyncBegin(GH->PatL[lev], SynchList_pre, Symmetry);
|
||||
|
||||
// check error information (overlaps with MPI transfer)
|
||||
{
|
||||
int erh = ERROR;
|
||||
MPI_Allreduce(&erh, &ERROR, 1, MPI_INT, MPI_SUM, GH->Commlev[lev]);
|
||||
}
|
||||
if (ERROR)
|
||||
{
|
||||
Parallel::SyncEnd(sync_pre); sync_pre = 0;
|
||||
Parallel::Dump_Data(GH->PatL[lev], StateList, 0, PhysTime, dT_lev);
|
||||
if (myrank == 0)
|
||||
{
|
||||
@@ -4961,7 +4978,8 @@ void bssn_class::Step(int lev, int YN)
|
||||
|
||||
// misc::tillherecheck(GH->Commlev[lev],GH->start_rank[lev],"before Predictor sync");
|
||||
|
||||
Parallel::Sync(GH->PatL[lev], SynchList_pre, Symmetry);
|
||||
// Complete async ghost zone exchange
|
||||
if (sync_pre) Parallel::SyncEnd(sync_pre);
|
||||
|
||||
#if (MAPBH == 0)
|
||||
// for black hole position
|
||||
@@ -5140,13 +5158,17 @@ void bssn_class::Step(int lev, int YN)
|
||||
|
||||
// misc::tillherecheck(GH->Commlev[lev],GH->start_rank[lev],"before Corrector error check");
|
||||
|
||||
// check error information
|
||||
// Start async ghost zone exchange - overlaps with error check and BH position
|
||||
Parallel::SyncHandle *sync_cor = Parallel::SyncBegin(GH->PatL[lev], SynchList_cor, Symmetry);
|
||||
|
||||
// check error information (overlaps with MPI transfer)
|
||||
{
|
||||
int erh = ERROR;
|
||||
MPI_Allreduce(&erh, &ERROR, 1, MPI_INT, MPI_SUM, GH->Commlev[lev]);
|
||||
}
|
||||
if (ERROR)
|
||||
{
|
||||
Parallel::SyncEnd(sync_cor); sync_cor = 0;
|
||||
Parallel::Dump_Data(GH->PatL[lev], SynchList_pre, 0, PhysTime, dT_lev);
|
||||
if (myrank == 0)
|
||||
{
|
||||
@@ -5160,7 +5182,8 @@ void bssn_class::Step(int lev, int YN)
|
||||
|
||||
// misc::tillherecheck(GH->Commlev[lev],GH->start_rank[lev],"before Corrector sync");
|
||||
|
||||
Parallel::Sync(GH->PatL[lev], SynchList_cor, Symmetry);
|
||||
// Complete async ghost zone exchange
|
||||
if (sync_cor) Parallel::SyncEnd(sync_cor);
|
||||
|
||||
// misc::tillherecheck(GH->Commlev[lev],GH->start_rank[lev],"after Corrector sync");
|
||||
|
||||
|
||||
Reference in New Issue
Block a user