refactor(Parallel): streamline MPI communication by consolidating request handling and memory management

This commit is contained in:
2026-03-01 16:20:51 +08:00
committed by CGH0S7
parent 95220a05c8
commit 42c69fab24

View File

@@ -3883,175 +3883,263 @@ int Parallel::data_packermix(double *data, MyList<Parallel::gridseg> *src, MyLis
return size_out; return size_out;
} }
// //
void Parallel::transfer(MyList<Parallel::gridseg> **src, MyList<Parallel::gridseg> **dst, void Parallel::transfer(MyList<Parallel::gridseg> **src, MyList<Parallel::gridseg> **dst,
MyList<var> *VarList1 /* source */, MyList<var> *VarList2 /*target */, MyList<var> *VarList1 /* source */, MyList<var> *VarList2 /*target */,
int Symmetry) int Symmetry)
{ {
int myrank, cpusize; int myrank, cpusize;
MPI_Comm_size(MPI_COMM_WORLD, &cpusize); MPI_Comm_size(MPI_COMM_WORLD, &cpusize);
MPI_Comm_rank(MPI_COMM_WORLD, &myrank); MPI_Comm_rank(MPI_COMM_WORLD, &myrank);
int node; int node;
MPI_Request *reqs; MPI_Request *reqs = new MPI_Request[2 * cpusize];
MPI_Status *stats; MPI_Status *stats = new MPI_Status[2 * cpusize];
reqs = new MPI_Request[2 * cpusize]; int *req_node = new int[2 * cpusize];
stats = new MPI_Status[2 * cpusize]; int *req_is_recv = new int[2 * cpusize];
int req_no = 0; int *completed = new int[2 * cpusize];
int req_no = 0;
double **send_data, **rec_data; int pending_recv = 0;
send_data = new double *[cpusize];
rec_data = new double *[cpusize]; double **send_data = new double *[cpusize];
int length; double **rec_data = new double *[cpusize];
int *send_lengths = new int[cpusize];
for (node = 0; node < cpusize; node++) int *recv_lengths = new int[cpusize];
{
send_data[node] = rec_data[node] = 0; for (node = 0; node < cpusize; node++)
if (node == myrank) {
{ send_data[node] = rec_data[node] = 0;
if (length = data_packer(0, src[myrank], dst[myrank], node, PACK, VarList1, VarList2, Symmetry)) send_lengths[node] = recv_lengths[node] = 0;
{ }
rec_data[node] = new double[length];
if (!rec_data[node]) // Post receives first so peers can progress rendezvous early.
{ for (node = 0; node < cpusize; node++)
cout << "out of memory when new in short transfer, place 1" << endl; {
MPI_Abort(MPI_COMM_WORLD, 1); if (node == myrank) continue;
}
data_packer(rec_data[node], src[myrank], dst[myrank], node, PACK, VarList1, VarList2, Symmetry); recv_lengths[node] = data_packer(0, src[node], dst[node], node, UNPACK, VarList1, VarList2, Symmetry);
} if (recv_lengths[node] > 0)
} {
else rec_data[node] = new double[recv_lengths[node]];
{ if (!rec_data[node])
// send from this cpu to cpu#node {
if (length = data_packer(0, src[myrank], dst[myrank], node, PACK, VarList1, VarList2, Symmetry)) cout << "out of memory when new in short transfer, place 1" << endl;
{ MPI_Abort(MPI_COMM_WORLD, 1);
send_data[node] = new double[length]; }
if (!send_data[node]) MPI_Irecv((void *)rec_data[node], recv_lengths[node], MPI_DOUBLE, node, 1, MPI_COMM_WORLD, reqs + req_no);
{ req_node[req_no] = node;
cout << "out of memory when new in short transfer, place 2" << endl; req_is_recv[req_no] = 1;
MPI_Abort(MPI_COMM_WORLD, 1); req_no++;
} pending_recv++;
data_packer(send_data[node], src[myrank], dst[myrank], node, PACK, VarList1, VarList2, Symmetry); }
MPI_Isend((void *)send_data[node], length, MPI_DOUBLE, node, 1, MPI_COMM_WORLD, reqs + req_no++); }
}
// receive from cpu#node to this cpu // Local transfer on this rank.
if (length = data_packer(0, src[node], dst[node], node, UNPACK, VarList1, VarList2, Symmetry)) recv_lengths[myrank] = data_packer(0, src[myrank], dst[myrank], myrank, PACK, VarList1, VarList2, Symmetry);
{ if (recv_lengths[myrank] > 0)
rec_data[node] = new double[length]; {
if (!rec_data[node]) rec_data[myrank] = new double[recv_lengths[myrank]];
{ if (!rec_data[myrank])
cout << "out of memory when new in short transfer, place 3" << endl; {
MPI_Abort(MPI_COMM_WORLD, 1); cout << "out of memory when new in short transfer, place 2" << endl;
} MPI_Abort(MPI_COMM_WORLD, 1);
MPI_Irecv((void *)rec_data[node], length, MPI_DOUBLE, node, 1, MPI_COMM_WORLD, reqs + req_no++); }
} data_packer(rec_data[myrank], src[myrank], dst[myrank], myrank, PACK, VarList1, VarList2, Symmetry);
} }
}
// wait for all requests to complete // Pack and post sends.
MPI_Waitall(req_no, reqs, stats); for (node = 0; node < cpusize; node++)
{
for (node = 0; node < cpusize; node++) if (node == myrank) continue;
if (rec_data[node])
data_packer(rec_data[node], src[node], dst[node], node, UNPACK, VarList1, VarList2, Symmetry); send_lengths[node] = data_packer(0, src[myrank], dst[myrank], node, PACK, VarList1, VarList2, Symmetry);
if (send_lengths[node] > 0)
for (node = 0; node < cpusize; node++) {
{ send_data[node] = new double[send_lengths[node]];
if (send_data[node]) if (!send_data[node])
delete[] send_data[node]; {
if (rec_data[node]) cout << "out of memory when new in short transfer, place 3" << endl;
delete[] rec_data[node]; MPI_Abort(MPI_COMM_WORLD, 1);
} }
data_packer(send_data[node], src[myrank], dst[myrank], node, PACK, VarList1, VarList2, Symmetry);
delete[] reqs; MPI_Isend((void *)send_data[node], send_lengths[node], MPI_DOUBLE, node, 1, MPI_COMM_WORLD, reqs + req_no);
delete[] stats; req_node[req_no] = node;
delete[] send_data; req_is_recv[req_no] = 0;
delete[] rec_data; req_no++;
} }
}
// Unpack as soon as receive completes to reduce pure wait time.
while (pending_recv > 0)
{
int outcount = 0;
MPI_Waitsome(req_no, reqs, &outcount, completed, stats);
if (outcount == MPI_UNDEFINED) break;
for (int i = 0; i < outcount; i++)
{
int idx = completed[i];
if (idx >= 0 && req_is_recv[idx])
{
int recv_node = req_node[idx];
data_packer(rec_data[recv_node], src[recv_node], dst[recv_node], recv_node, UNPACK, VarList1, VarList2, Symmetry);
pending_recv--;
}
}
}
if (req_no > 0) MPI_Waitall(req_no, reqs, stats);
if (rec_data[myrank])
data_packer(rec_data[myrank], src[myrank], dst[myrank], myrank, UNPACK, VarList1, VarList2, Symmetry);
for (node = 0; node < cpusize; node++)
{
if (send_data[node])
delete[] send_data[node];
if (rec_data[node])
delete[] rec_data[node];
}
delete[] reqs;
delete[] stats;
delete[] req_node;
delete[] req_is_recv;
delete[] completed;
delete[] send_data;
delete[] rec_data;
delete[] send_lengths;
delete[] recv_lengths;
}
// //
void Parallel::transfermix(MyList<Parallel::gridseg> **src, MyList<Parallel::gridseg> **dst, void Parallel::transfermix(MyList<Parallel::gridseg> **src, MyList<Parallel::gridseg> **dst,
MyList<var> *VarList1 /* source */, MyList<var> *VarList2 /*target */, MyList<var> *VarList1 /* source */, MyList<var> *VarList2 /*target */,
int Symmetry) int Symmetry)
{ {
int myrank, cpusize; int myrank, cpusize;
MPI_Comm_size(MPI_COMM_WORLD, &cpusize); MPI_Comm_size(MPI_COMM_WORLD, &cpusize);
MPI_Comm_rank(MPI_COMM_WORLD, &myrank); MPI_Comm_rank(MPI_COMM_WORLD, &myrank);
int node; int node;
MPI_Request *reqs; MPI_Request *reqs = new MPI_Request[2 * cpusize];
MPI_Status *stats; MPI_Status *stats = new MPI_Status[2 * cpusize];
reqs = new MPI_Request[2 * cpusize]; int *req_node = new int[2 * cpusize];
stats = new MPI_Status[2 * cpusize]; int *req_is_recv = new int[2 * cpusize];
int req_no = 0; int *completed = new int[2 * cpusize];
int req_no = 0;
double **send_data, **rec_data; int pending_recv = 0;
send_data = new double *[cpusize];
rec_data = new double *[cpusize]; double **send_data = new double *[cpusize];
int length; double **rec_data = new double *[cpusize];
int *send_lengths = new int[cpusize];
for (node = 0; node < cpusize; node++) int *recv_lengths = new int[cpusize];
{
send_data[node] = rec_data[node] = 0; for (node = 0; node < cpusize; node++)
if (node == myrank) {
{ send_data[node] = rec_data[node] = 0;
if (length = data_packermix(0, src[myrank], dst[myrank], node, PACK, VarList1, VarList2, Symmetry)) send_lengths[node] = recv_lengths[node] = 0;
{ }
rec_data[node] = new double[length];
if (!rec_data[node]) // Post receives first so peers can progress rendezvous early.
{ for (node = 0; node < cpusize; node++)
cout << "out of memory when new in short transfer, place 1" << endl; {
MPI_Abort(MPI_COMM_WORLD, 1); if (node == myrank) continue;
}
data_packermix(rec_data[node], src[myrank], dst[myrank], node, PACK, VarList1, VarList2, Symmetry); recv_lengths[node] = data_packermix(0, src[node], dst[node], node, UNPACK, VarList1, VarList2, Symmetry);
} if (recv_lengths[node] > 0)
} {
else rec_data[node] = new double[recv_lengths[node]];
{ if (!rec_data[node])
// send from this cpu to cpu#node {
if (length = data_packermix(0, src[myrank], dst[myrank], node, PACK, VarList1, VarList2, Symmetry)) cout << "out of memory when new in short transfer, place 1" << endl;
{ MPI_Abort(MPI_COMM_WORLD, 1);
send_data[node] = new double[length]; }
if (!send_data[node]) MPI_Irecv((void *)rec_data[node], recv_lengths[node], MPI_DOUBLE, node, 1, MPI_COMM_WORLD, reqs + req_no);
{ req_node[req_no] = node;
cout << "out of memory when new in short transfer, place 2" << endl; req_is_recv[req_no] = 1;
MPI_Abort(MPI_COMM_WORLD, 1); req_no++;
} pending_recv++;
data_packermix(send_data[node], src[myrank], dst[myrank], node, PACK, VarList1, VarList2, Symmetry); }
MPI_Isend((void *)send_data[node], length, MPI_DOUBLE, node, 1, MPI_COMM_WORLD, reqs + req_no++); }
}
// receive from cpu#node to this cpu // Local transfer on this rank.
if (length = data_packermix(0, src[node], dst[node], node, UNPACK, VarList1, VarList2, Symmetry)) recv_lengths[myrank] = data_packermix(0, src[myrank], dst[myrank], myrank, PACK, VarList1, VarList2, Symmetry);
{ if (recv_lengths[myrank] > 0)
rec_data[node] = new double[length]; {
if (!rec_data[node]) rec_data[myrank] = new double[recv_lengths[myrank]];
{ if (!rec_data[myrank])
cout << "out of memory when new in short transfer, place 3" << endl; {
MPI_Abort(MPI_COMM_WORLD, 1); cout << "out of memory when new in short transfer, place 2" << endl;
} MPI_Abort(MPI_COMM_WORLD, 1);
MPI_Irecv((void *)rec_data[node], length, MPI_DOUBLE, node, 1, MPI_COMM_WORLD, reqs + req_no++); }
} data_packermix(rec_data[myrank], src[myrank], dst[myrank], myrank, PACK, VarList1, VarList2, Symmetry);
} }
}
// wait for all requests to complete // Pack and post sends.
MPI_Waitall(req_no, reqs, stats); for (node = 0; node < cpusize; node++)
{
for (node = 0; node < cpusize; node++) if (node == myrank) continue;
if (rec_data[node])
data_packermix(rec_data[node], src[node], dst[node], node, UNPACK, VarList1, VarList2, Symmetry); send_lengths[node] = data_packermix(0, src[myrank], dst[myrank], node, PACK, VarList1, VarList2, Symmetry);
if (send_lengths[node] > 0)
for (node = 0; node < cpusize; node++) {
{ send_data[node] = new double[send_lengths[node]];
if (send_data[node]) if (!send_data[node])
delete[] send_data[node]; {
if (rec_data[node]) cout << "out of memory when new in short transfer, place 3" << endl;
delete[] rec_data[node]; MPI_Abort(MPI_COMM_WORLD, 1);
} }
data_packermix(send_data[node], src[myrank], dst[myrank], node, PACK, VarList1, VarList2, Symmetry);
delete[] reqs; MPI_Isend((void *)send_data[node], send_lengths[node], MPI_DOUBLE, node, 1, MPI_COMM_WORLD, reqs + req_no);
delete[] stats; req_node[req_no] = node;
delete[] send_data; req_is_recv[req_no] = 0;
delete[] rec_data; req_no++;
} }
}
// Unpack as soon as receive completes to reduce pure wait time.
while (pending_recv > 0)
{
int outcount = 0;
MPI_Waitsome(req_no, reqs, &outcount, completed, stats);
if (outcount == MPI_UNDEFINED) break;
for (int i = 0; i < outcount; i++)
{
int idx = completed[i];
if (idx >= 0 && req_is_recv[idx])
{
int recv_node = req_node[idx];
data_packermix(rec_data[recv_node], src[recv_node], dst[recv_node], recv_node, UNPACK, VarList1, VarList2, Symmetry);
pending_recv--;
}
}
}
if (req_no > 0) MPI_Waitall(req_no, reqs, stats);
if (rec_data[myrank])
data_packermix(rec_data[myrank], src[myrank], dst[myrank], myrank, UNPACK, VarList1, VarList2, Symmetry);
for (node = 0; node < cpusize; node++)
{
if (send_data[node])
delete[] send_data[node];
if (rec_data[node])
delete[] rec_data[node];
}
delete[] reqs;
delete[] stats;
delete[] req_node;
delete[] req_is_recv;
delete[] completed;
delete[] send_data;
delete[] rec_data;
delete[] send_lengths;
delete[] recv_lengths;
}
void Parallel::Sync(Patch *Pat, MyList<var> *VarList, int Symmetry) void Parallel::Sync(Patch *Pat, MyList<var> *VarList, int Symmetry)
{ {
int cpusize; int cpusize;
@@ -4279,73 +4367,110 @@ void Parallel::SyncCache::destroy()
cpusize = 0; max_reqs = 0; cpusize = 0; max_reqs = 0;
} }
// transfer_cached: reuse pre-allocated buffers from SyncCache // transfer_cached: reuse pre-allocated buffers from SyncCache
void Parallel::transfer_cached(MyList<Parallel::gridseg> **src, MyList<Parallel::gridseg> **dst, void Parallel::transfer_cached(MyList<Parallel::gridseg> **src, MyList<Parallel::gridseg> **dst,
MyList<var> *VarList1, MyList<var> *VarList2, MyList<var> *VarList1, MyList<var> *VarList2,
int Symmetry, SyncCache &cache) int Symmetry, SyncCache &cache)
{ {
int myrank; int myrank;
MPI_Comm_size(MPI_COMM_WORLD, &cache.cpusize); MPI_Comm_size(MPI_COMM_WORLD, &cache.cpusize);
MPI_Comm_rank(MPI_COMM_WORLD, &myrank); MPI_Comm_rank(MPI_COMM_WORLD, &myrank);
int cpusize = cache.cpusize; int cpusize = cache.cpusize;
int req_no = 0; int req_no = 0;
int node; int pending_recv = 0;
int node;
for (node = 0; node < cpusize; node++) int *req_node = new int[cache.max_reqs];
{ int *req_is_recv = new int[cache.max_reqs];
if (node == myrank) int *completed = new int[cache.max_reqs];
{
int length = data_packer(0, src[myrank], dst[myrank], node, PACK, VarList1, VarList2, Symmetry); // Post receives first so peers can progress rendezvous early.
cache.recv_lengths[node] = length; for (node = 0; node < cpusize; node++)
if (length > 0) {
{ if (node == myrank) continue;
if (length > cache.recv_buf_caps[node])
{ int rlength = data_packer(0, src[node], dst[node], node, UNPACK, VarList1, VarList2, Symmetry);
if (cache.recv_bufs[node]) delete[] cache.recv_bufs[node]; cache.recv_lengths[node] = rlength;
cache.recv_bufs[node] = new double[length]; if (rlength > 0)
cache.recv_buf_caps[node] = length; {
} if (rlength > cache.recv_buf_caps[node])
data_packer(cache.recv_bufs[node], src[myrank], dst[myrank], node, PACK, VarList1, VarList2, Symmetry); {
} if (cache.recv_bufs[node]) delete[] cache.recv_bufs[node];
} cache.recv_bufs[node] = new double[rlength];
else cache.recv_buf_caps[node] = rlength;
{ }
// send MPI_Irecv((void *)cache.recv_bufs[node], rlength, MPI_DOUBLE, node, 1, MPI_COMM_WORLD, cache.reqs + req_no);
int slength = data_packer(0, src[myrank], dst[myrank], node, PACK, VarList1, VarList2, Symmetry); req_node[req_no] = node;
cache.send_lengths[node] = slength; req_is_recv[req_no] = 1;
if (slength > 0) req_no++;
{ pending_recv++;
if (slength > cache.send_buf_caps[node]) }
{ }
if (cache.send_bufs[node]) delete[] cache.send_bufs[node];
cache.send_bufs[node] = new double[slength]; // Local transfer on this rank.
cache.send_buf_caps[node] = slength; int self_len = data_packer(0, src[myrank], dst[myrank], myrank, PACK, VarList1, VarList2, Symmetry);
} cache.recv_lengths[myrank] = self_len;
data_packer(cache.send_bufs[node], src[myrank], dst[myrank], node, PACK, VarList1, VarList2, Symmetry); if (self_len > 0)
MPI_Isend((void *)cache.send_bufs[node], slength, MPI_DOUBLE, node, 1, MPI_COMM_WORLD, cache.reqs + req_no++); {
} if (self_len > cache.recv_buf_caps[myrank])
// recv {
int rlength = data_packer(0, src[node], dst[node], node, UNPACK, VarList1, VarList2, Symmetry); if (cache.recv_bufs[myrank]) delete[] cache.recv_bufs[myrank];
cache.recv_lengths[node] = rlength; cache.recv_bufs[myrank] = new double[self_len];
if (rlength > 0) cache.recv_buf_caps[myrank] = self_len;
{ }
if (rlength > cache.recv_buf_caps[node]) data_packer(cache.recv_bufs[myrank], src[myrank], dst[myrank], myrank, PACK, VarList1, VarList2, Symmetry);
{ }
if (cache.recv_bufs[node]) delete[] cache.recv_bufs[node];
cache.recv_bufs[node] = new double[rlength]; // Pack and post sends.
cache.recv_buf_caps[node] = rlength; for (node = 0; node < cpusize; node++)
} {
MPI_Irecv((void *)cache.recv_bufs[node], rlength, MPI_DOUBLE, node, 1, MPI_COMM_WORLD, cache.reqs + req_no++); if (node == myrank) continue;
}
} int slength = data_packer(0, src[myrank], dst[myrank], node, PACK, VarList1, VarList2, Symmetry);
} cache.send_lengths[node] = slength;
if (slength > 0)
MPI_Waitall(req_no, cache.reqs, cache.stats); {
if (slength > cache.send_buf_caps[node])
for (node = 0; node < cpusize; node++) {
if (cache.recv_bufs[node] && cache.recv_lengths[node] > 0) if (cache.send_bufs[node]) delete[] cache.send_bufs[node];
data_packer(cache.recv_bufs[node], src[node], dst[node], node, UNPACK, VarList1, VarList2, Symmetry); cache.send_bufs[node] = new double[slength];
} cache.send_buf_caps[node] = slength;
}
data_packer(cache.send_bufs[node], src[myrank], dst[myrank], node, PACK, VarList1, VarList2, Symmetry);
MPI_Isend((void *)cache.send_bufs[node], slength, MPI_DOUBLE, node, 1, MPI_COMM_WORLD, cache.reqs + req_no);
req_node[req_no] = node;
req_is_recv[req_no] = 0;
req_no++;
}
}
// Unpack as soon as receive completes to reduce pure wait time.
while (pending_recv > 0)
{
int outcount = 0;
MPI_Waitsome(req_no, cache.reqs, &outcount, completed, cache.stats);
if (outcount == MPI_UNDEFINED) break;
for (int i = 0; i < outcount; i++)
{
int idx = completed[i];
if (idx >= 0 && req_is_recv[idx])
{
int recv_node_i = req_node[idx];
data_packer(cache.recv_bufs[recv_node_i], src[recv_node_i], dst[recv_node_i], recv_node_i, UNPACK, VarList1, VarList2, Symmetry);
pending_recv--;
}
}
}
if (req_no > 0) MPI_Waitall(req_no, cache.reqs, cache.stats);
if (self_len > 0)
data_packer(cache.recv_bufs[myrank], src[myrank], dst[myrank], myrank, UNPACK, VarList1, VarList2, Symmetry);
delete[] req_node;
delete[] req_is_recv;
delete[] completed;
}
// Sync_cached: build grid segment lists on first call, reuse on subsequent calls // Sync_cached: build grid segment lists on first call, reuse on subsequent calls
void Parallel::Sync_cached(MyList<Patch> *PatL, MyList<var> *VarList, int Symmetry, SyncCache &cache) void Parallel::Sync_cached(MyList<Patch> *PatL, MyList<var> *VarList, int Symmetry, SyncCache &cache)
{ {
@@ -5758,9 +5883,9 @@ void Parallel::OutBdLow2Hi_cached(MyList<Patch> *PatcL, MyList<Patch> *PatfL,
} }
// OutBdLow2Himix_cached: same as OutBdLow2Hi_cached but uses transfermix for unpacking // OutBdLow2Himix_cached: same as OutBdLow2Hi_cached but uses transfermix for unpacking
void Parallel::OutBdLow2Himix_cached(MyList<Patch> *PatcL, MyList<Patch> *PatfL, void Parallel::OutBdLow2Himix_cached(MyList<Patch> *PatcL, MyList<Patch> *PatfL,
MyList<var> *VarList1, MyList<var> *VarList2, MyList<var> *VarList1, MyList<var> *VarList2,
int Symmetry, SyncCache &cache) int Symmetry, SyncCache &cache)
{ {
if (!cache.valid) if (!cache.valid)
{ {
@@ -5806,60 +5931,100 @@ void Parallel::OutBdLow2Himix_cached(MyList<Patch> *PatcL, MyList<Patch> *PatfL,
MPI_Comm_rank(MPI_COMM_WORLD, &myrank); MPI_Comm_rank(MPI_COMM_WORLD, &myrank);
int cpusize = cache.cpusize; int cpusize = cache.cpusize;
int req_no = 0; int req_no = 0;
for (int node = 0; node < cpusize; node++) int pending_recv = 0;
{ int *req_node = new int[cache.max_reqs];
if (node == myrank) int *req_is_recv = new int[cache.max_reqs];
{ int *completed = new int[cache.max_reqs];
int length = data_packermix(0, cache.combined_src[myrank], cache.combined_dst[myrank], node, PACK, VarList1, VarList2, Symmetry);
cache.recv_lengths[node] = length; // Post receives first so peers can progress rendezvous early.
if (length > 0) for (int node = 0; node < cpusize; node++)
{ {
if (length > cache.recv_buf_caps[node]) if (node == myrank) continue;
{
if (cache.recv_bufs[node]) delete[] cache.recv_bufs[node]; int rlength = data_packermix(0, cache.combined_src[node], cache.combined_dst[node], node, UNPACK, VarList1, VarList2, Symmetry);
cache.recv_bufs[node] = new double[length]; cache.recv_lengths[node] = rlength;
cache.recv_buf_caps[node] = length; if (rlength > 0)
} {
data_packermix(cache.recv_bufs[node], cache.combined_src[myrank], cache.combined_dst[myrank], node, PACK, VarList1, VarList2, Symmetry); if (rlength > cache.recv_buf_caps[node])
} {
} if (cache.recv_bufs[node]) delete[] cache.recv_bufs[node];
else cache.recv_bufs[node] = new double[rlength];
{ cache.recv_buf_caps[node] = rlength;
int slength = data_packermix(0, cache.combined_src[myrank], cache.combined_dst[myrank], node, PACK, VarList1, VarList2, Symmetry); }
cache.send_lengths[node] = slength; MPI_Irecv((void *)cache.recv_bufs[node], rlength, MPI_DOUBLE, node, 1, MPI_COMM_WORLD, cache.reqs + req_no);
if (slength > 0) req_node[req_no] = node;
{ req_is_recv[req_no] = 1;
if (slength > cache.send_buf_caps[node]) req_no++;
{ pending_recv++;
if (cache.send_bufs[node]) delete[] cache.send_bufs[node]; }
cache.send_bufs[node] = new double[slength]; }
cache.send_buf_caps[node] = slength;
} // Local transfer on this rank.
data_packermix(cache.send_bufs[node], cache.combined_src[myrank], cache.combined_dst[myrank], node, PACK, VarList1, VarList2, Symmetry); int self_len = data_packermix(0, cache.combined_src[myrank], cache.combined_dst[myrank], myrank, PACK, VarList1, VarList2, Symmetry);
MPI_Isend((void *)cache.send_bufs[node], slength, MPI_DOUBLE, node, 1, MPI_COMM_WORLD, cache.reqs + req_no++); cache.recv_lengths[myrank] = self_len;
} if (self_len > 0)
int rlength = data_packermix(0, cache.combined_src[node], cache.combined_dst[node], node, UNPACK, VarList1, VarList2, Symmetry); {
cache.recv_lengths[node] = rlength; if (self_len > cache.recv_buf_caps[myrank])
if (rlength > 0) {
{ if (cache.recv_bufs[myrank]) delete[] cache.recv_bufs[myrank];
if (rlength > cache.recv_buf_caps[node]) cache.recv_bufs[myrank] = new double[self_len];
{ cache.recv_buf_caps[myrank] = self_len;
if (cache.recv_bufs[node]) delete[] cache.recv_bufs[node]; }
cache.recv_bufs[node] = new double[rlength]; data_packermix(cache.recv_bufs[myrank], cache.combined_src[myrank], cache.combined_dst[myrank], myrank, PACK, VarList1, VarList2, Symmetry);
cache.recv_buf_caps[node] = rlength; }
}
MPI_Irecv((void *)cache.recv_bufs[node], rlength, MPI_DOUBLE, node, 1, MPI_COMM_WORLD, cache.reqs + req_no++); // Pack and post sends.
} for (int node = 0; node < cpusize; node++)
} {
} if (node == myrank) continue;
MPI_Waitall(req_no, cache.reqs, cache.stats); int slength = data_packermix(0, cache.combined_src[myrank], cache.combined_dst[myrank], node, PACK, VarList1, VarList2, Symmetry);
cache.send_lengths[node] = slength;
for (int node = 0; node < cpusize; node++) if (slength > 0)
if (cache.recv_bufs[node] && cache.recv_lengths[node] > 0) {
data_packermix(cache.recv_bufs[node], cache.combined_src[node], cache.combined_dst[node], node, UNPACK, VarList1, VarList2, Symmetry); if (slength > cache.send_buf_caps[node])
} {
if (cache.send_bufs[node]) delete[] cache.send_bufs[node];
cache.send_bufs[node] = new double[slength];
cache.send_buf_caps[node] = slength;
}
data_packermix(cache.send_bufs[node], cache.combined_src[myrank], cache.combined_dst[myrank], node, PACK, VarList1, VarList2, Symmetry);
MPI_Isend((void *)cache.send_bufs[node], slength, MPI_DOUBLE, node, 1, MPI_COMM_WORLD, cache.reqs + req_no);
req_node[req_no] = node;
req_is_recv[req_no] = 0;
req_no++;
}
}
// Unpack as soon as receive completes to reduce pure wait time.
while (pending_recv > 0)
{
int outcount = 0;
MPI_Waitsome(req_no, cache.reqs, &outcount, completed, cache.stats);
if (outcount == MPI_UNDEFINED) break;
for (int i = 0; i < outcount; i++)
{
int idx = completed[i];
if (idx >= 0 && req_is_recv[idx])
{
int recv_node_i = req_node[idx];
data_packermix(cache.recv_bufs[recv_node_i], cache.combined_src[recv_node_i], cache.combined_dst[recv_node_i], recv_node_i, UNPACK, VarList1, VarList2, Symmetry);
pending_recv--;
}
}
}
if (req_no > 0) MPI_Waitall(req_no, cache.reqs, cache.stats);
if (self_len > 0)
data_packermix(cache.recv_bufs[myrank], cache.combined_src[myrank], cache.combined_dst[myrank], myrank, UNPACK, VarList1, VarList2, Symmetry);
delete[] req_node;
delete[] req_is_recv;
delete[] completed;
}
// collect all buffer grid segments or blocks for given patch // collect all buffer grid segments or blocks for given patch
MyList<Parallel::gridseg> *Parallel::build_buffer_gsl(Patch *Pat) MyList<Parallel::gridseg> *Parallel::build_buffer_gsl(Patch *Pat)