From 01410de05a4a7544b13cac76bd97438a5781caf6 Mon Sep 17 00:00:00 2001 From: ianchb Date: Sun, 1 Mar 2026 16:20:51 +0800 Subject: [PATCH] refactor(Parallel): streamline MPI communication by consolidating request handling and memory management --- AMSS_NCKU_source/Parallel.C | 741 ++++++++++++++++++++++-------------- 1 file changed, 453 insertions(+), 288 deletions(-) diff --git a/AMSS_NCKU_source/Parallel.C b/AMSS_NCKU_source/Parallel.C index bd591c4..e3bb4a3 100644 --- a/AMSS_NCKU_source/Parallel.C +++ b/AMSS_NCKU_source/Parallel.C @@ -3883,175 +3883,263 @@ int Parallel::data_packermix(double *data, MyList *src, MyLis return size_out; } // -void Parallel::transfer(MyList **src, MyList **dst, - MyList *VarList1 /* source */, MyList *VarList2 /*target */, - int Symmetry) -{ - int myrank, cpusize; - MPI_Comm_size(MPI_COMM_WORLD, &cpusize); - MPI_Comm_rank(MPI_COMM_WORLD, &myrank); - - int node; - - MPI_Request *reqs; - MPI_Status *stats; - reqs = new MPI_Request[2 * cpusize]; - stats = new MPI_Status[2 * cpusize]; - int req_no = 0; - - double **send_data, **rec_data; - send_data = new double *[cpusize]; - rec_data = new double *[cpusize]; - int length; - - for (node = 0; node < cpusize; node++) - { - send_data[node] = rec_data[node] = 0; - if (node == myrank) - { - if (length = data_packer(0, src[myrank], dst[myrank], node, PACK, VarList1, VarList2, Symmetry)) - { - rec_data[node] = new double[length]; - if (!rec_data[node]) - { - cout << "out of memory when new in short transfer, place 1" << endl; - MPI_Abort(MPI_COMM_WORLD, 1); - } - data_packer(rec_data[node], src[myrank], dst[myrank], node, PACK, VarList1, VarList2, Symmetry); - } - } - else - { - // send from this cpu to cpu#node - if (length = data_packer(0, src[myrank], dst[myrank], node, PACK, VarList1, VarList2, Symmetry)) - { - send_data[node] = new double[length]; - if (!send_data[node]) - { - cout << "out of memory when new in short transfer, place 2" << endl; - MPI_Abort(MPI_COMM_WORLD, 1); - } - data_packer(send_data[node], src[myrank], dst[myrank], node, PACK, VarList1, VarList2, Symmetry); - MPI_Isend((void *)send_data[node], length, MPI_DOUBLE, node, 1, MPI_COMM_WORLD, reqs + req_no++); - } - // receive from cpu#node to this cpu - if (length = data_packer(0, src[node], dst[node], node, UNPACK, VarList1, VarList2, Symmetry)) - { - rec_data[node] = new double[length]; - if (!rec_data[node]) - { - cout << "out of memory when new in short transfer, place 3" << endl; - MPI_Abort(MPI_COMM_WORLD, 1); - } - MPI_Irecv((void *)rec_data[node], length, MPI_DOUBLE, node, 1, MPI_COMM_WORLD, reqs + req_no++); - } - } - } - // wait for all requests to complete - MPI_Waitall(req_no, reqs, stats); - - for (node = 0; node < cpusize; node++) - if (rec_data[node]) - data_packer(rec_data[node], src[node], dst[node], node, UNPACK, VarList1, VarList2, Symmetry); - - for (node = 0; node < cpusize; node++) - { - if (send_data[node]) - delete[] send_data[node]; - if (rec_data[node]) - delete[] rec_data[node]; - } - - delete[] reqs; - delete[] stats; - delete[] send_data; - delete[] rec_data; -} +void Parallel::transfer(MyList **src, MyList **dst, + MyList *VarList1 /* source */, MyList *VarList2 /*target */, + int Symmetry) +{ + int myrank, cpusize; + MPI_Comm_size(MPI_COMM_WORLD, &cpusize); + MPI_Comm_rank(MPI_COMM_WORLD, &myrank); + + int node; + + MPI_Request *reqs = new MPI_Request[2 * cpusize]; + MPI_Status *stats = new MPI_Status[2 * cpusize]; + int *req_node = new int[2 * cpusize]; + int *req_is_recv = new int[2 * cpusize]; + int *completed = new int[2 * cpusize]; + int req_no = 0; + int pending_recv = 0; + + double **send_data = new double *[cpusize]; + double **rec_data = new double *[cpusize]; + int *send_lengths = new int[cpusize]; + int *recv_lengths = new int[cpusize]; + + for (node = 0; node < cpusize; node++) + { + send_data[node] = rec_data[node] = 0; + send_lengths[node] = recv_lengths[node] = 0; + } + + // Post receives first so peers can progress rendezvous early. + for (node = 0; node < cpusize; node++) + { + if (node == myrank) continue; + + recv_lengths[node] = data_packer(0, src[node], dst[node], node, UNPACK, VarList1, VarList2, Symmetry); + if (recv_lengths[node] > 0) + { + rec_data[node] = new double[recv_lengths[node]]; + if (!rec_data[node]) + { + cout << "out of memory when new in short transfer, place 1" << endl; + MPI_Abort(MPI_COMM_WORLD, 1); + } + MPI_Irecv((void *)rec_data[node], recv_lengths[node], MPI_DOUBLE, node, 1, MPI_COMM_WORLD, reqs + req_no); + req_node[req_no] = node; + req_is_recv[req_no] = 1; + req_no++; + pending_recv++; + } + } + + // Local transfer on this rank. + recv_lengths[myrank] = data_packer(0, src[myrank], dst[myrank], myrank, PACK, VarList1, VarList2, Symmetry); + if (recv_lengths[myrank] > 0) + { + rec_data[myrank] = new double[recv_lengths[myrank]]; + if (!rec_data[myrank]) + { + cout << "out of memory when new in short transfer, place 2" << endl; + MPI_Abort(MPI_COMM_WORLD, 1); + } + data_packer(rec_data[myrank], src[myrank], dst[myrank], myrank, PACK, VarList1, VarList2, Symmetry); + } + + // Pack and post sends. + for (node = 0; node < cpusize; node++) + { + if (node == myrank) continue; + + send_lengths[node] = data_packer(0, src[myrank], dst[myrank], node, PACK, VarList1, VarList2, Symmetry); + if (send_lengths[node] > 0) + { + send_data[node] = new double[send_lengths[node]]; + if (!send_data[node]) + { + cout << "out of memory when new in short transfer, place 3" << endl; + MPI_Abort(MPI_COMM_WORLD, 1); + } + data_packer(send_data[node], src[myrank], dst[myrank], node, PACK, VarList1, VarList2, Symmetry); + MPI_Isend((void *)send_data[node], send_lengths[node], MPI_DOUBLE, node, 1, MPI_COMM_WORLD, reqs + req_no); + req_node[req_no] = node; + req_is_recv[req_no] = 0; + req_no++; + } + } + + // Unpack as soon as receive completes to reduce pure wait time. + while (pending_recv > 0) + { + int outcount = 0; + MPI_Waitsome(req_no, reqs, &outcount, completed, stats); + if (outcount == MPI_UNDEFINED) break; + + for (int i = 0; i < outcount; i++) + { + int idx = completed[i]; + if (idx >= 0 && req_is_recv[idx]) + { + int recv_node = req_node[idx]; + data_packer(rec_data[recv_node], src[recv_node], dst[recv_node], recv_node, UNPACK, VarList1, VarList2, Symmetry); + pending_recv--; + } + } + } + + if (req_no > 0) MPI_Waitall(req_no, reqs, stats); + + if (rec_data[myrank]) + data_packer(rec_data[myrank], src[myrank], dst[myrank], myrank, UNPACK, VarList1, VarList2, Symmetry); + + for (node = 0; node < cpusize; node++) + { + if (send_data[node]) + delete[] send_data[node]; + if (rec_data[node]) + delete[] rec_data[node]; + } + + delete[] reqs; + delete[] stats; + delete[] req_node; + delete[] req_is_recv; + delete[] completed; + delete[] send_data; + delete[] rec_data; + delete[] send_lengths; + delete[] recv_lengths; +} // -void Parallel::transfermix(MyList **src, MyList **dst, - MyList *VarList1 /* source */, MyList *VarList2 /*target */, - int Symmetry) -{ - int myrank, cpusize; - MPI_Comm_size(MPI_COMM_WORLD, &cpusize); - MPI_Comm_rank(MPI_COMM_WORLD, &myrank); - - int node; - - MPI_Request *reqs; - MPI_Status *stats; - reqs = new MPI_Request[2 * cpusize]; - stats = new MPI_Status[2 * cpusize]; - int req_no = 0; - - double **send_data, **rec_data; - send_data = new double *[cpusize]; - rec_data = new double *[cpusize]; - int length; - - for (node = 0; node < cpusize; node++) - { - send_data[node] = rec_data[node] = 0; - if (node == myrank) - { - if (length = data_packermix(0, src[myrank], dst[myrank], node, PACK, VarList1, VarList2, Symmetry)) - { - rec_data[node] = new double[length]; - if (!rec_data[node]) - { - cout << "out of memory when new in short transfer, place 1" << endl; - MPI_Abort(MPI_COMM_WORLD, 1); - } - data_packermix(rec_data[node], src[myrank], dst[myrank], node, PACK, VarList1, VarList2, Symmetry); - } - } - else - { - // send from this cpu to cpu#node - if (length = data_packermix(0, src[myrank], dst[myrank], node, PACK, VarList1, VarList2, Symmetry)) - { - send_data[node] = new double[length]; - if (!send_data[node]) - { - cout << "out of memory when new in short transfer, place 2" << endl; - MPI_Abort(MPI_COMM_WORLD, 1); - } - data_packermix(send_data[node], src[myrank], dst[myrank], node, PACK, VarList1, VarList2, Symmetry); - MPI_Isend((void *)send_data[node], length, MPI_DOUBLE, node, 1, MPI_COMM_WORLD, reqs + req_no++); - } - // receive from cpu#node to this cpu - if (length = data_packermix(0, src[node], dst[node], node, UNPACK, VarList1, VarList2, Symmetry)) - { - rec_data[node] = new double[length]; - if (!rec_data[node]) - { - cout << "out of memory when new in short transfer, place 3" << endl; - MPI_Abort(MPI_COMM_WORLD, 1); - } - MPI_Irecv((void *)rec_data[node], length, MPI_DOUBLE, node, 1, MPI_COMM_WORLD, reqs + req_no++); - } - } - } - // wait for all requests to complete - MPI_Waitall(req_no, reqs, stats); - - for (node = 0; node < cpusize; node++) - if (rec_data[node]) - data_packermix(rec_data[node], src[node], dst[node], node, UNPACK, VarList1, VarList2, Symmetry); - - for (node = 0; node < cpusize; node++) - { - if (send_data[node]) - delete[] send_data[node]; - if (rec_data[node]) - delete[] rec_data[node]; - } - - delete[] reqs; - delete[] stats; - delete[] send_data; - delete[] rec_data; -} +void Parallel::transfermix(MyList **src, MyList **dst, + MyList *VarList1 /* source */, MyList *VarList2 /*target */, + int Symmetry) +{ + int myrank, cpusize; + MPI_Comm_size(MPI_COMM_WORLD, &cpusize); + MPI_Comm_rank(MPI_COMM_WORLD, &myrank); + + int node; + + MPI_Request *reqs = new MPI_Request[2 * cpusize]; + MPI_Status *stats = new MPI_Status[2 * cpusize]; + int *req_node = new int[2 * cpusize]; + int *req_is_recv = new int[2 * cpusize]; + int *completed = new int[2 * cpusize]; + int req_no = 0; + int pending_recv = 0; + + double **send_data = new double *[cpusize]; + double **rec_data = new double *[cpusize]; + int *send_lengths = new int[cpusize]; + int *recv_lengths = new int[cpusize]; + + for (node = 0; node < cpusize; node++) + { + send_data[node] = rec_data[node] = 0; + send_lengths[node] = recv_lengths[node] = 0; + } + + // Post receives first so peers can progress rendezvous early. + for (node = 0; node < cpusize; node++) + { + if (node == myrank) continue; + + recv_lengths[node] = data_packermix(0, src[node], dst[node], node, UNPACK, VarList1, VarList2, Symmetry); + if (recv_lengths[node] > 0) + { + rec_data[node] = new double[recv_lengths[node]]; + if (!rec_data[node]) + { + cout << "out of memory when new in short transfer, place 1" << endl; + MPI_Abort(MPI_COMM_WORLD, 1); + } + MPI_Irecv((void *)rec_data[node], recv_lengths[node], MPI_DOUBLE, node, 1, MPI_COMM_WORLD, reqs + req_no); + req_node[req_no] = node; + req_is_recv[req_no] = 1; + req_no++; + pending_recv++; + } + } + + // Local transfer on this rank. + recv_lengths[myrank] = data_packermix(0, src[myrank], dst[myrank], myrank, PACK, VarList1, VarList2, Symmetry); + if (recv_lengths[myrank] > 0) + { + rec_data[myrank] = new double[recv_lengths[myrank]]; + if (!rec_data[myrank]) + { + cout << "out of memory when new in short transfer, place 2" << endl; + MPI_Abort(MPI_COMM_WORLD, 1); + } + data_packermix(rec_data[myrank], src[myrank], dst[myrank], myrank, PACK, VarList1, VarList2, Symmetry); + } + + // Pack and post sends. + for (node = 0; node < cpusize; node++) + { + if (node == myrank) continue; + + send_lengths[node] = data_packermix(0, src[myrank], dst[myrank], node, PACK, VarList1, VarList2, Symmetry); + if (send_lengths[node] > 0) + { + send_data[node] = new double[send_lengths[node]]; + if (!send_data[node]) + { + cout << "out of memory when new in short transfer, place 3" << endl; + MPI_Abort(MPI_COMM_WORLD, 1); + } + data_packermix(send_data[node], src[myrank], dst[myrank], node, PACK, VarList1, VarList2, Symmetry); + MPI_Isend((void *)send_data[node], send_lengths[node], MPI_DOUBLE, node, 1, MPI_COMM_WORLD, reqs + req_no); + req_node[req_no] = node; + req_is_recv[req_no] = 0; + req_no++; + } + } + + // Unpack as soon as receive completes to reduce pure wait time. + while (pending_recv > 0) + { + int outcount = 0; + MPI_Waitsome(req_no, reqs, &outcount, completed, stats); + if (outcount == MPI_UNDEFINED) break; + + for (int i = 0; i < outcount; i++) + { + int idx = completed[i]; + if (idx >= 0 && req_is_recv[idx]) + { + int recv_node = req_node[idx]; + data_packermix(rec_data[recv_node], src[recv_node], dst[recv_node], recv_node, UNPACK, VarList1, VarList2, Symmetry); + pending_recv--; + } + } + } + + if (req_no > 0) MPI_Waitall(req_no, reqs, stats); + + if (rec_data[myrank]) + data_packermix(rec_data[myrank], src[myrank], dst[myrank], myrank, UNPACK, VarList1, VarList2, Symmetry); + + for (node = 0; node < cpusize; node++) + { + if (send_data[node]) + delete[] send_data[node]; + if (rec_data[node]) + delete[] rec_data[node]; + } + + delete[] reqs; + delete[] stats; + delete[] req_node; + delete[] req_is_recv; + delete[] completed; + delete[] send_data; + delete[] rec_data; + delete[] send_lengths; + delete[] recv_lengths; +} void Parallel::Sync(Patch *Pat, MyList *VarList, int Symmetry) { int cpusize; @@ -4279,73 +4367,110 @@ void Parallel::SyncCache::destroy() cpusize = 0; max_reqs = 0; } // transfer_cached: reuse pre-allocated buffers from SyncCache -void Parallel::transfer_cached(MyList **src, MyList **dst, - MyList *VarList1, MyList *VarList2, - int Symmetry, SyncCache &cache) -{ - int myrank; +void Parallel::transfer_cached(MyList **src, MyList **dst, + MyList *VarList1, MyList *VarList2, + int Symmetry, SyncCache &cache) +{ + int myrank; MPI_Comm_size(MPI_COMM_WORLD, &cache.cpusize); MPI_Comm_rank(MPI_COMM_WORLD, &myrank); int cpusize = cache.cpusize; - int req_no = 0; - int node; - - for (node = 0; node < cpusize; node++) - { - if (node == myrank) - { - int length = data_packer(0, src[myrank], dst[myrank], node, PACK, VarList1, VarList2, Symmetry); - cache.recv_lengths[node] = length; - if (length > 0) - { - if (length > cache.recv_buf_caps[node]) - { - if (cache.recv_bufs[node]) delete[] cache.recv_bufs[node]; - cache.recv_bufs[node] = new double[length]; - cache.recv_buf_caps[node] = length; - } - data_packer(cache.recv_bufs[node], src[myrank], dst[myrank], node, PACK, VarList1, VarList2, Symmetry); - } - } - else - { - // send - int slength = data_packer(0, src[myrank], dst[myrank], node, PACK, VarList1, VarList2, Symmetry); - cache.send_lengths[node] = slength; - if (slength > 0) - { - if (slength > cache.send_buf_caps[node]) - { - if (cache.send_bufs[node]) delete[] cache.send_bufs[node]; - cache.send_bufs[node] = new double[slength]; - cache.send_buf_caps[node] = slength; - } - data_packer(cache.send_bufs[node], src[myrank], dst[myrank], node, PACK, VarList1, VarList2, Symmetry); - MPI_Isend((void *)cache.send_bufs[node], slength, MPI_DOUBLE, node, 1, MPI_COMM_WORLD, cache.reqs + req_no++); - } - // recv - int rlength = data_packer(0, src[node], dst[node], node, UNPACK, VarList1, VarList2, Symmetry); - cache.recv_lengths[node] = rlength; - if (rlength > 0) - { - if (rlength > cache.recv_buf_caps[node]) - { - if (cache.recv_bufs[node]) delete[] cache.recv_bufs[node]; - cache.recv_bufs[node] = new double[rlength]; - cache.recv_buf_caps[node] = rlength; - } - MPI_Irecv((void *)cache.recv_bufs[node], rlength, MPI_DOUBLE, node, 1, MPI_COMM_WORLD, cache.reqs + req_no++); - } - } - } - - MPI_Waitall(req_no, cache.reqs, cache.stats); - - for (node = 0; node < cpusize; node++) - if (cache.recv_bufs[node] && cache.recv_lengths[node] > 0) - data_packer(cache.recv_bufs[node], src[node], dst[node], node, UNPACK, VarList1, VarList2, Symmetry); -} + int req_no = 0; + int pending_recv = 0; + int node; + int *req_node = new int[cache.max_reqs]; + int *req_is_recv = new int[cache.max_reqs]; + int *completed = new int[cache.max_reqs]; + + // Post receives first so peers can progress rendezvous early. + for (node = 0; node < cpusize; node++) + { + if (node == myrank) continue; + + int rlength = data_packer(0, src[node], dst[node], node, UNPACK, VarList1, VarList2, Symmetry); + cache.recv_lengths[node] = rlength; + if (rlength > 0) + { + if (rlength > cache.recv_buf_caps[node]) + { + if (cache.recv_bufs[node]) delete[] cache.recv_bufs[node]; + cache.recv_bufs[node] = new double[rlength]; + cache.recv_buf_caps[node] = rlength; + } + MPI_Irecv((void *)cache.recv_bufs[node], rlength, MPI_DOUBLE, node, 1, MPI_COMM_WORLD, cache.reqs + req_no); + req_node[req_no] = node; + req_is_recv[req_no] = 1; + req_no++; + pending_recv++; + } + } + + // Local transfer on this rank. + int self_len = data_packer(0, src[myrank], dst[myrank], myrank, PACK, VarList1, VarList2, Symmetry); + cache.recv_lengths[myrank] = self_len; + if (self_len > 0) + { + if (self_len > cache.recv_buf_caps[myrank]) + { + if (cache.recv_bufs[myrank]) delete[] cache.recv_bufs[myrank]; + cache.recv_bufs[myrank] = new double[self_len]; + cache.recv_buf_caps[myrank] = self_len; + } + data_packer(cache.recv_bufs[myrank], src[myrank], dst[myrank], myrank, PACK, VarList1, VarList2, Symmetry); + } + + // Pack and post sends. + for (node = 0; node < cpusize; node++) + { + if (node == myrank) continue; + + int slength = data_packer(0, src[myrank], dst[myrank], node, PACK, VarList1, VarList2, Symmetry); + cache.send_lengths[node] = slength; + if (slength > 0) + { + if (slength > cache.send_buf_caps[node]) + { + if (cache.send_bufs[node]) delete[] cache.send_bufs[node]; + cache.send_bufs[node] = new double[slength]; + cache.send_buf_caps[node] = slength; + } + data_packer(cache.send_bufs[node], src[myrank], dst[myrank], node, PACK, VarList1, VarList2, Symmetry); + MPI_Isend((void *)cache.send_bufs[node], slength, MPI_DOUBLE, node, 1, MPI_COMM_WORLD, cache.reqs + req_no); + req_node[req_no] = node; + req_is_recv[req_no] = 0; + req_no++; + } + } + + // Unpack as soon as receive completes to reduce pure wait time. + while (pending_recv > 0) + { + int outcount = 0; + MPI_Waitsome(req_no, cache.reqs, &outcount, completed, cache.stats); + if (outcount == MPI_UNDEFINED) break; + + for (int i = 0; i < outcount; i++) + { + int idx = completed[i]; + if (idx >= 0 && req_is_recv[idx]) + { + int recv_node_i = req_node[idx]; + data_packer(cache.recv_bufs[recv_node_i], src[recv_node_i], dst[recv_node_i], recv_node_i, UNPACK, VarList1, VarList2, Symmetry); + pending_recv--; + } + } + } + + if (req_no > 0) MPI_Waitall(req_no, cache.reqs, cache.stats); + + if (self_len > 0) + data_packer(cache.recv_bufs[myrank], src[myrank], dst[myrank], myrank, UNPACK, VarList1, VarList2, Symmetry); + + delete[] req_node; + delete[] req_is_recv; + delete[] completed; +} // Sync_cached: build grid segment lists on first call, reuse on subsequent calls void Parallel::Sync_cached(MyList *PatL, MyList *VarList, int Symmetry, SyncCache &cache) { @@ -5758,9 +5883,9 @@ void Parallel::OutBdLow2Hi_cached(MyList *PatcL, MyList *PatfL, } // OutBdLow2Himix_cached: same as OutBdLow2Hi_cached but uses transfermix for unpacking -void Parallel::OutBdLow2Himix_cached(MyList *PatcL, MyList *PatfL, - MyList *VarList1, MyList *VarList2, - int Symmetry, SyncCache &cache) +void Parallel::OutBdLow2Himix_cached(MyList *PatcL, MyList *PatfL, + MyList *VarList1, MyList *VarList2, + int Symmetry, SyncCache &cache) { if (!cache.valid) { @@ -5806,60 +5931,100 @@ void Parallel::OutBdLow2Himix_cached(MyList *PatcL, MyList *PatfL, MPI_Comm_rank(MPI_COMM_WORLD, &myrank); int cpusize = cache.cpusize; - int req_no = 0; - for (int node = 0; node < cpusize; node++) - { - if (node == myrank) - { - int length = data_packermix(0, cache.combined_src[myrank], cache.combined_dst[myrank], node, PACK, VarList1, VarList2, Symmetry); - cache.recv_lengths[node] = length; - if (length > 0) - { - if (length > cache.recv_buf_caps[node]) - { - if (cache.recv_bufs[node]) delete[] cache.recv_bufs[node]; - cache.recv_bufs[node] = new double[length]; - cache.recv_buf_caps[node] = length; - } - data_packermix(cache.recv_bufs[node], cache.combined_src[myrank], cache.combined_dst[myrank], node, PACK, VarList1, VarList2, Symmetry); - } - } - else - { - int slength = data_packermix(0, cache.combined_src[myrank], cache.combined_dst[myrank], node, PACK, VarList1, VarList2, Symmetry); - cache.send_lengths[node] = slength; - if (slength > 0) - { - if (slength > cache.send_buf_caps[node]) - { - if (cache.send_bufs[node]) delete[] cache.send_bufs[node]; - cache.send_bufs[node] = new double[slength]; - cache.send_buf_caps[node] = slength; - } - data_packermix(cache.send_bufs[node], cache.combined_src[myrank], cache.combined_dst[myrank], node, PACK, VarList1, VarList2, Symmetry); - MPI_Isend((void *)cache.send_bufs[node], slength, MPI_DOUBLE, node, 1, MPI_COMM_WORLD, cache.reqs + req_no++); - } - int rlength = data_packermix(0, cache.combined_src[node], cache.combined_dst[node], node, UNPACK, VarList1, VarList2, Symmetry); - cache.recv_lengths[node] = rlength; - if (rlength > 0) - { - if (rlength > cache.recv_buf_caps[node]) - { - if (cache.recv_bufs[node]) delete[] cache.recv_bufs[node]; - cache.recv_bufs[node] = new double[rlength]; - cache.recv_buf_caps[node] = rlength; - } - MPI_Irecv((void *)cache.recv_bufs[node], rlength, MPI_DOUBLE, node, 1, MPI_COMM_WORLD, cache.reqs + req_no++); - } - } - } - - MPI_Waitall(req_no, cache.reqs, cache.stats); - - for (int node = 0; node < cpusize; node++) - if (cache.recv_bufs[node] && cache.recv_lengths[node] > 0) - data_packermix(cache.recv_bufs[node], cache.combined_src[node], cache.combined_dst[node], node, UNPACK, VarList1, VarList2, Symmetry); -} + int req_no = 0; + int pending_recv = 0; + int *req_node = new int[cache.max_reqs]; + int *req_is_recv = new int[cache.max_reqs]; + int *completed = new int[cache.max_reqs]; + + // Post receives first so peers can progress rendezvous early. + for (int node = 0; node < cpusize; node++) + { + if (node == myrank) continue; + + int rlength = data_packermix(0, cache.combined_src[node], cache.combined_dst[node], node, UNPACK, VarList1, VarList2, Symmetry); + cache.recv_lengths[node] = rlength; + if (rlength > 0) + { + if (rlength > cache.recv_buf_caps[node]) + { + if (cache.recv_bufs[node]) delete[] cache.recv_bufs[node]; + cache.recv_bufs[node] = new double[rlength]; + cache.recv_buf_caps[node] = rlength; + } + MPI_Irecv((void *)cache.recv_bufs[node], rlength, MPI_DOUBLE, node, 1, MPI_COMM_WORLD, cache.reqs + req_no); + req_node[req_no] = node; + req_is_recv[req_no] = 1; + req_no++; + pending_recv++; + } + } + + // Local transfer on this rank. + int self_len = data_packermix(0, cache.combined_src[myrank], cache.combined_dst[myrank], myrank, PACK, VarList1, VarList2, Symmetry); + cache.recv_lengths[myrank] = self_len; + if (self_len > 0) + { + if (self_len > cache.recv_buf_caps[myrank]) + { + if (cache.recv_bufs[myrank]) delete[] cache.recv_bufs[myrank]; + cache.recv_bufs[myrank] = new double[self_len]; + cache.recv_buf_caps[myrank] = self_len; + } + data_packermix(cache.recv_bufs[myrank], cache.combined_src[myrank], cache.combined_dst[myrank], myrank, PACK, VarList1, VarList2, Symmetry); + } + + // Pack and post sends. + for (int node = 0; node < cpusize; node++) + { + if (node == myrank) continue; + + int slength = data_packermix(0, cache.combined_src[myrank], cache.combined_dst[myrank], node, PACK, VarList1, VarList2, Symmetry); + cache.send_lengths[node] = slength; + if (slength > 0) + { + if (slength > cache.send_buf_caps[node]) + { + if (cache.send_bufs[node]) delete[] cache.send_bufs[node]; + cache.send_bufs[node] = new double[slength]; + cache.send_buf_caps[node] = slength; + } + data_packermix(cache.send_bufs[node], cache.combined_src[myrank], cache.combined_dst[myrank], node, PACK, VarList1, VarList2, Symmetry); + MPI_Isend((void *)cache.send_bufs[node], slength, MPI_DOUBLE, node, 1, MPI_COMM_WORLD, cache.reqs + req_no); + req_node[req_no] = node; + req_is_recv[req_no] = 0; + req_no++; + } + } + + // Unpack as soon as receive completes to reduce pure wait time. + while (pending_recv > 0) + { + int outcount = 0; + MPI_Waitsome(req_no, cache.reqs, &outcount, completed, cache.stats); + if (outcount == MPI_UNDEFINED) break; + + for (int i = 0; i < outcount; i++) + { + int idx = completed[i]; + if (idx >= 0 && req_is_recv[idx]) + { + int recv_node_i = req_node[idx]; + data_packermix(cache.recv_bufs[recv_node_i], cache.combined_src[recv_node_i], cache.combined_dst[recv_node_i], recv_node_i, UNPACK, VarList1, VarList2, Symmetry); + pending_recv--; + } + } + } + + if (req_no > 0) MPI_Waitall(req_no, cache.reqs, cache.stats); + + if (self_len > 0) + data_packermix(cache.recv_bufs[myrank], cache.combined_src[myrank], cache.combined_dst[myrank], myrank, UNPACK, VarList1, VarList2, Symmetry); + + delete[] req_node; + delete[] req_is_recv; + delete[] completed; +} // collect all buffer grid segments or blocks for given patch MyList *Parallel::build_buffer_gsl(Patch *Pat)