From a918dc103ec64ab28d82fcdc6b18505ec12d5ce4 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+copilot@users.noreply.github.com> Date: Sun, 8 Feb 2026 08:00:15 +0000 Subject: [PATCH] Add SyncBegin/SyncEnd to Parallel for MPI communication-computation overlap Split the blocking Parallel::Sync into async SyncBegin (initiates local copy + MPI_Isend/Irecv) and SyncEnd (MPI_Waitall + unpack). This allows overlapping MPI ghost zone exchange with error checking and Shell patch computation. Modified Step() in bssn_class.C for both PSTR==0 and PSTR==1/2/3 versions to start Sync before error checks, overlapping the MPI_Allreduce with the ongoing ghost zone transfers. Co-authored-by: copilot-swe-agent[bot] <198982749+copilot@users.noreply.github.com> --- AMSS_NCKU_source/Parallel.C | 225 ++++++++++++++++++++++++++++++++++ AMSS_NCKU_source/Parallel.h | 26 ++++ AMSS_NCKU_source/bssn_class.C | 39 ++++-- 3 files changed, 282 insertions(+), 8 deletions(-) diff --git a/AMSS_NCKU_source/Parallel.C b/AMSS_NCKU_source/Parallel.C index 713a6a7..943b293 100644 --- a/AMSS_NCKU_source/Parallel.C +++ b/AMSS_NCKU_source/Parallel.C @@ -3756,6 +3756,231 @@ void Parallel::Sync(MyList *PatL, MyList *VarList, int Symmetry) delete[] transfer_src; delete[] transfer_dst; } +// +// Async Sync: split into SyncBegin (initiate MPI) and SyncEnd (wait + unpack) +// This allows overlapping MPI communication with computation. +// +static void transfer_begin(Parallel::TransferState *ts) +{ + int myrank; + MPI_Comm_rank(MPI_COMM_WORLD, &myrank); + int cpusize = ts->cpusize; + + ts->reqs = new MPI_Request[2 * cpusize]; + ts->stats = new MPI_Status[2 * cpusize]; + ts->req_no = 0; + ts->send_data = new double *[cpusize]; + ts->rec_data = new double *[cpusize]; + int length; + + for (int node = 0; node < cpusize; node++) + { + ts->send_data[node] = ts->rec_data[node] = 0; + if (node == myrank) + { + // Local copy: pack then immediately unpack (no MPI needed) + if ((length = Parallel::data_packer(0, ts->transfer_src[myrank], ts->transfer_dst[myrank], + node, PACK, ts->VarList1, ts->VarList2, ts->Symmetry))) + { + double *local_data = new double[length]; + if (!local_data) + { + cout << "out of memory in transfer_begin, local copy" << endl; + MPI_Abort(MPI_COMM_WORLD, 1); + } + Parallel::data_packer(local_data, ts->transfer_src[myrank], ts->transfer_dst[myrank], + node, PACK, ts->VarList1, ts->VarList2, ts->Symmetry); + Parallel::data_packer(local_data, ts->transfer_src[node], ts->transfer_dst[node], + node, UNPACK, ts->VarList1, ts->VarList2, ts->Symmetry); + delete[] local_data; + } + } + else + { + // send from this cpu to cpu#node + if ((length = Parallel::data_packer(0, ts->transfer_src[myrank], ts->transfer_dst[myrank], + node, PACK, ts->VarList1, ts->VarList2, ts->Symmetry))) + { + ts->send_data[node] = new double[length]; + if (!ts->send_data[node]) + { + cout << "out of memory in transfer_begin, send" << endl; + MPI_Abort(MPI_COMM_WORLD, 1); + } + Parallel::data_packer(ts->send_data[node], ts->transfer_src[myrank], ts->transfer_dst[myrank], + node, PACK, ts->VarList1, ts->VarList2, ts->Symmetry); + MPI_Isend((void *)ts->send_data[node], length, MPI_DOUBLE, node, 1, MPI_COMM_WORLD, + ts->reqs + ts->req_no++); + } + // receive from cpu#node to this cpu + if ((length = Parallel::data_packer(0, ts->transfer_src[node], ts->transfer_dst[node], + node, UNPACK, ts->VarList1, ts->VarList2, ts->Symmetry))) + { + ts->rec_data[node] = new double[length]; + if (!ts->rec_data[node]) + { + cout << "out of memory in transfer_begin, recv" << endl; + MPI_Abort(MPI_COMM_WORLD, 1); + } + MPI_Irecv((void *)ts->rec_data[node], length, MPI_DOUBLE, node, 1, MPI_COMM_WORLD, + ts->reqs + ts->req_no++); + } + } + } + // NOTE: MPI_Waitall is NOT called here - that happens in transfer_end +} +// +static void transfer_end(Parallel::TransferState *ts) +{ + // Wait for all pending MPI operations + MPI_Waitall(ts->req_no, ts->reqs, ts->stats); + + // Unpack received data from remote ranks + for (int node = 0; node < ts->cpusize; node++) + if (ts->rec_data[node]) + Parallel::data_packer(ts->rec_data[node], ts->transfer_src[node], ts->transfer_dst[node], + node, UNPACK, ts->VarList1, ts->VarList2, ts->Symmetry); + + // Cleanup MPI buffers + for (int node = 0; node < ts->cpusize; node++) + { + if (ts->send_data[node]) + delete[] ts->send_data[node]; + if (ts->rec_data[node]) + delete[] ts->rec_data[node]; + } + delete[] ts->reqs; + delete[] ts->stats; + delete[] ts->send_data; + delete[] ts->rec_data; +} +// +Parallel::SyncHandle *Parallel::SyncBegin(Patch *Pat, MyList *VarList, int Symmetry) +{ + int cpusize; + MPI_Comm_size(MPI_COMM_WORLD, &cpusize); + + SyncHandle *handle = new SyncHandle; + handle->num_states = 1; + handle->states = new TransferState[1]; + + TransferState *ts = &handle->states[0]; + ts->cpusize = cpusize; + ts->VarList1 = VarList; + ts->VarList2 = VarList; + ts->Symmetry = Symmetry; + + ts->dst = build_ghost_gsl(Pat); + ts->src = new MyList *[cpusize]; + ts->transfer_src = new MyList *[cpusize]; + ts->transfer_dst = new MyList *[cpusize]; + for (int node = 0; node < cpusize; node++) + { + ts->src[node] = build_owned_gsl0(Pat, node); + build_gstl(ts->src[node], ts->dst, &ts->transfer_src[node], &ts->transfer_dst[node]); + } + + transfer_begin(ts); + + return handle; +} +// +Parallel::SyncHandle *Parallel::SyncBegin(MyList *PatL, MyList *VarList, int Symmetry) +{ + int cpusize; + MPI_Comm_size(MPI_COMM_WORLD, &cpusize); + + // Count patches + int num_patches = 0; + MyList *Pp = PatL; + while (Pp) { num_patches++; Pp = Pp->next; } + + SyncHandle *handle = new SyncHandle; + handle->num_states = num_patches + 1; // intra-patch transfers + 1 inter-patch transfer + handle->states = new TransferState[handle->num_states]; + + // Intra-patch sync: for each patch, build ghost lists and initiate transfer + int idx = 0; + Pp = PatL; + while (Pp) + { + TransferState *ts = &handle->states[idx]; + ts->cpusize = cpusize; + ts->VarList1 = VarList; + ts->VarList2 = VarList; + ts->Symmetry = Symmetry; + + ts->dst = build_ghost_gsl(Pp->data); + ts->src = new MyList *[cpusize]; + ts->transfer_src = new MyList *[cpusize]; + ts->transfer_dst = new MyList *[cpusize]; + for (int node = 0; node < cpusize; node++) + { + ts->src[node] = build_owned_gsl0(Pp->data, node); + build_gstl(ts->src[node], ts->dst, &ts->transfer_src[node], &ts->transfer_dst[node]); + } + + transfer_begin(ts); + + idx++; + Pp = Pp->next; + } + + // Inter-patch sync: buffer zone exchange between patches + { + TransferState *ts = &handle->states[idx]; + ts->cpusize = cpusize; + ts->VarList1 = VarList; + ts->VarList2 = VarList; + ts->Symmetry = Symmetry; + + ts->dst = build_buffer_gsl(PatL); + ts->src = new MyList *[cpusize]; + ts->transfer_src = new MyList *[cpusize]; + ts->transfer_dst = new MyList *[cpusize]; + for (int node = 0; node < cpusize; node++) + { + ts->src[node] = build_owned_gsl(PatL, node, 5, Symmetry); + build_gstl(ts->src[node], ts->dst, &ts->transfer_src[node], &ts->transfer_dst[node]); + } + + transfer_begin(ts); + } + + return handle; +} +// +void Parallel::SyncEnd(SyncHandle *handle) +{ + if (!handle) + return; + + // Wait for all pending transfers and unpack + for (int i = 0; i < handle->num_states; i++) + { + TransferState *ts = &handle->states[i]; + transfer_end(ts); + + // Cleanup grid segment lists + if (ts->dst) + ts->dst->destroyList(); + for (int node = 0; node < ts->cpusize; node++) + { + if (ts->src[node]) + ts->src[node]->destroyList(); + if (ts->transfer_src[node]) + ts->transfer_src[node]->destroyList(); + if (ts->transfer_dst[node]) + ts->transfer_dst[node]->destroyList(); + } + delete[] ts->src; + delete[] ts->transfer_src; + delete[] ts->transfer_dst; + } + + delete[] handle->states; + delete handle; +} // collect buffer grid segments or blocks for the periodic boundary condition of given patch // --------------------------------------------------- // |con | |con | diff --git a/AMSS_NCKU_source/Parallel.h b/AMSS_NCKU_source/Parallel.h index 12fc356..e407aba 100644 --- a/AMSS_NCKU_source/Parallel.h +++ b/AMSS_NCKU_source/Parallel.h @@ -81,6 +81,32 @@ namespace Parallel int Symmetry); void Sync(Patch *Pat, MyList *VarList, int Symmetry); void Sync(MyList *PatL, MyList *VarList, int Symmetry); + + // Async Sync: overlap MPI communication with computation + struct TransferState + { + MPI_Request *reqs; + MPI_Status *stats; + int req_no; + double **send_data; + double **rec_data; + int cpusize; + MyList **transfer_src; + MyList **transfer_dst; + MyList **src; + MyList *dst; + MyList *VarList1; + MyList *VarList2; + int Symmetry; + }; + struct SyncHandle + { + TransferState *states; + int num_states; + }; + SyncHandle *SyncBegin(Patch *Pat, MyList *VarList, int Symmetry); + SyncHandle *SyncBegin(MyList *PatL, MyList *VarList, int Symmetry); + void SyncEnd(SyncHandle *handle); void OutBdLow2Hi(Patch *Patc, Patch *Patf, MyList *VarList1 /* source */, MyList *VarList2 /* target */, int Symmetry); diff --git a/AMSS_NCKU_source/bssn_class.C b/AMSS_NCKU_source/bssn_class.C index fc6c88e..107f970 100644 --- a/AMSS_NCKU_source/bssn_class.C +++ b/AMSS_NCKU_source/bssn_class.C @@ -3158,13 +3158,18 @@ void bssn_class::Step(int lev, int YN) } Pp = Pp->next; } - // check error information + + // Start async ghost zone exchange - overlaps with error check and Shell computation + Parallel::SyncHandle *sync_pre = Parallel::SyncBegin(GH->PatL[lev], SynchList_pre, Symmetry); + + // check error information (overlaps with MPI transfer) { int erh = ERROR; MPI_Allreduce(&erh, &ERROR, 1, MPI_INT, MPI_SUM, MPI_COMM_WORLD); } if (ERROR) { + Parallel::SyncEnd(sync_pre); sync_pre = 0; Parallel::Dump_Data(GH->PatL[lev], StateList, 0, PhysTime, dT_lev); if (myrank == 0) { @@ -3324,6 +3329,7 @@ void bssn_class::Step(int lev, int YN) if (ERROR) { + Parallel::SyncEnd(sync_pre); sync_pre = 0; SH->Dump_Data(StateList, 0, PhysTime, dT_lev); if (myrank == 0) { @@ -3334,7 +3340,8 @@ void bssn_class::Step(int lev, int YN) } #endif - Parallel::Sync(GH->PatL[lev], SynchList_pre, Symmetry); + // Complete async ghost zone exchange + if (sync_pre) Parallel::SyncEnd(sync_pre); #ifdef WithShell if (lev == 0) @@ -3528,7 +3535,10 @@ void bssn_class::Step(int lev, int YN) Pp = Pp->next; } - // check error information + // Start async ghost zone exchange - overlaps with error check and Shell computation + Parallel::SyncHandle *sync_cor = Parallel::SyncBegin(GH->PatL[lev], SynchList_cor, Symmetry); + + // check error information (overlaps with MPI transfer) { int erh = ERROR; MPI_Allreduce(&erh, &ERROR, 1, MPI_INT, MPI_SUM, MPI_COMM_WORLD); @@ -3536,6 +3546,7 @@ void bssn_class::Step(int lev, int YN) if (ERROR) { + Parallel::SyncEnd(sync_cor); sync_cor = 0; Parallel::Dump_Data(GH->PatL[lev], SynchList_pre, 0, PhysTime, dT_lev); if (myrank == 0) { @@ -3692,6 +3703,7 @@ void bssn_class::Step(int lev, int YN) } if (ERROR) { + Parallel::SyncEnd(sync_cor); sync_cor = 0; SH->Dump_Data(SynchList_pre, 0, PhysTime, dT_lev); if (myrank == 0) { @@ -3704,7 +3716,8 @@ void bssn_class::Step(int lev, int YN) } #endif - Parallel::Sync(GH->PatL[lev], SynchList_cor, Symmetry); + // Complete async ghost zone exchange + if (sync_cor) Parallel::SyncEnd(sync_cor); #ifdef WithShell if (lev == 0) @@ -4943,13 +4956,17 @@ void bssn_class::Step(int lev, int YN) // misc::tillherecheck(GH->Commlev[lev],GH->start_rank[lev],"after Predictor rhs calculation"); - // check error information + // Start async ghost zone exchange - overlaps with error check and BH position + Parallel::SyncHandle *sync_pre = Parallel::SyncBegin(GH->PatL[lev], SynchList_pre, Symmetry); + + // check error information (overlaps with MPI transfer) { int erh = ERROR; MPI_Allreduce(&erh, &ERROR, 1, MPI_INT, MPI_SUM, GH->Commlev[lev]); } if (ERROR) { + Parallel::SyncEnd(sync_pre); sync_pre = 0; Parallel::Dump_Data(GH->PatL[lev], StateList, 0, PhysTime, dT_lev); if (myrank == 0) { @@ -4961,7 +4978,8 @@ void bssn_class::Step(int lev, int YN) // misc::tillherecheck(GH->Commlev[lev],GH->start_rank[lev],"before Predictor sync"); - Parallel::Sync(GH->PatL[lev], SynchList_pre, Symmetry); + // Complete async ghost zone exchange + if (sync_pre) Parallel::SyncEnd(sync_pre); #if (MAPBH == 0) // for black hole position @@ -5140,13 +5158,17 @@ void bssn_class::Step(int lev, int YN) // misc::tillherecheck(GH->Commlev[lev],GH->start_rank[lev],"before Corrector error check"); - // check error information + // Start async ghost zone exchange - overlaps with error check and BH position + Parallel::SyncHandle *sync_cor = Parallel::SyncBegin(GH->PatL[lev], SynchList_cor, Symmetry); + + // check error information (overlaps with MPI transfer) { int erh = ERROR; MPI_Allreduce(&erh, &ERROR, 1, MPI_INT, MPI_SUM, GH->Commlev[lev]); } if (ERROR) { + Parallel::SyncEnd(sync_cor); sync_cor = 0; Parallel::Dump_Data(GH->PatL[lev], SynchList_pre, 0, PhysTime, dT_lev); if (myrank == 0) { @@ -5160,7 +5182,8 @@ void bssn_class::Step(int lev, int YN) // misc::tillherecheck(GH->Commlev[lev],GH->start_rank[lev],"before Corrector sync"); - Parallel::Sync(GH->PatL[lev], SynchList_cor, Symmetry); + // Complete async ghost zone exchange + if (sync_cor) Parallel::SyncEnd(sync_cor); // misc::tillherecheck(GH->Commlev[lev],GH->start_rank[lev],"after Corrector sync");