Stabilize GPU output path and MPI sync

This commit is contained in:
2026-04-09 10:57:49 +08:00
parent 4e3946a4f0
commit 49409645c0
8 changed files with 748 additions and 334 deletions

View File

@@ -1,9 +1,115 @@
#include "Parallel.h"
#include "fmisc.h"
#include "prolongrestrict.h"
#include "misc.h"
#include "parameters.h"
#include "fmisc.h"
#include "prolongrestrict.h"
#include "misc.h"
#include "parameters.h"
#include <cstring>
namespace {
const char *g_parallel_transfer_context = "Parallel::transfer";
int parallel_next_transfer_tag()
{
const char *ctx = g_parallel_transfer_context ? g_parallel_transfer_context : "Parallel::transfer";
unsigned int hash = 5381u;
while (*ctx)
{
hash = ((hash << 5) + hash) + static_cast<unsigned char>(*ctx);
ctx++;
}
return 32 + static_cast<int>(hash % 32000u);
}
struct ParallelTransferContextGuard
{
const char *prev;
explicit ParallelTransferContextGuard(const char *context) : prev(g_parallel_transfer_context)
{
g_parallel_transfer_context = context;
}
~ParallelTransferContextGuard()
{
g_parallel_transfer_context = prev;
}
};
void parallel_report_mpi_error(const char *context, int errcode, int req_no)
{
char errstr[MPI_MAX_ERROR_STRING];
int len = 0;
MPI_Error_string(errcode, errstr, &len);
int myrank = -1;
MPI_Comm_rank(MPI_COMM_WORLD, &myrank);
fprintf(stderr, "MPI failure on rank %d in %s (transfer_ctx=%s, req_no=%d): %.*s\n",
myrank, context, g_parallel_transfer_context, req_no, len, errstr);
fflush(stderr);
}
void parallel_report_mpi_status_errors(const char *context, int req_no,
int status_count, MPI_Status *stats)
{
if (!stats)
return;
int myrank = -1;
MPI_Comm_rank(MPI_COMM_WORLD, &myrank);
const int count = (status_count >= 0 && status_count <= req_no) ? status_count : req_no;
for (int i = 0; i < count; i++)
{
if (stats[i].MPI_ERROR != MPI_SUCCESS)
{
char errstr[MPI_MAX_ERROR_STRING];
int len = 0;
MPI_Error_string(stats[i].MPI_ERROR, errstr, &len);
fprintf(stderr,
"MPI request failure on rank %d in %s (transfer_ctx=%s, status_idx=%d, source=%d, tag=%d): %.*s\n",
myrank, context, g_parallel_transfer_context, i,
stats[i].MPI_SOURCE, stats[i].MPI_TAG, len, errstr);
}
}
fflush(stderr);
}
int parallel_waitsome_checked(int req_no, MPI_Request *reqs, int *outcount,
int *completed, MPI_Status *stats,
const char *context)
{
MPI_Errhandler old_handler;
MPI_Comm_get_errhandler(MPI_COMM_WORLD, &old_handler);
MPI_Comm_set_errhandler(MPI_COMM_WORLD, MPI_ERRORS_RETURN);
int rc = MPI_Waitsome(req_no, reqs, outcount, completed, stats);
MPI_Comm_set_errhandler(MPI_COMM_WORLD, old_handler);
MPI_Errhandler_free(&old_handler);
if (rc != MPI_SUCCESS)
{
parallel_report_mpi_status_errors(context, req_no, outcount ? *outcount : req_no, stats);
parallel_report_mpi_error(context, rc, req_no);
MPI_Abort(MPI_COMM_WORLD, 1);
}
return rc;
}
int parallel_waitall_checked(int req_no, MPI_Request *reqs, MPI_Status *stats,
const char *context)
{
MPI_Errhandler old_handler;
MPI_Comm_get_errhandler(MPI_COMM_WORLD, &old_handler);
MPI_Comm_set_errhandler(MPI_COMM_WORLD, MPI_ERRORS_RETURN);
int rc = MPI_Waitall(req_no, reqs, stats);
MPI_Comm_set_errhandler(MPI_COMM_WORLD, old_handler);
MPI_Errhandler_free(&old_handler);
if (rc != MPI_SUCCESS)
{
parallel_report_mpi_status_errors(context, req_no, req_no, stats);
parallel_report_mpi_error(context, rc, req_no);
MPI_Abort(MPI_COMM_WORLD, 1);
}
return rc;
}
} // namespace
int Parallel::partition1(int &nx, int split_size, int min_width, int cpusize, int shape) // special for 1 diemnsion
{
@@ -3896,10 +4002,11 @@ void Parallel::transfer(MyList<Parallel::gridseg> **src, MyList<Parallel::gridse
MPI_Request *reqs = new MPI_Request[2 * cpusize];
MPI_Status *stats = new MPI_Status[2 * cpusize];
int *req_node = new int[2 * cpusize];
int *req_is_recv = new int[2 * cpusize];
int *completed = new int[2 * cpusize];
int req_no = 0;
int pending_recv = 0;
int *req_is_recv = new int[2 * cpusize];
int *completed = new int[2 * cpusize];
int req_no = 0;
int pending_recv = 0;
const int mpi_tag = parallel_next_transfer_tag();
double **send_data = new double *[cpusize];
double **rec_data = new double *[cpusize];
@@ -3913,31 +4020,31 @@ void Parallel::transfer(MyList<Parallel::gridseg> **src, MyList<Parallel::gridse
}
// Post receives first so peers can progress rendezvous early.
for (node = 0; node < cpusize; node++)
{
if (node == myrank) continue;
recv_lengths[node] = data_packer(0, src[node], dst[node], node, UNPACK, VarList1, VarList2, Symmetry);
if (recv_lengths[node] > 0)
{
for (node = 0; node < cpusize; node++)
{
if (node == myrank) continue;
recv_lengths[node] = data_packer(0, src[node], dst[node], node, UNPACK, VarList1, VarList2, Symmetry);
if (recv_lengths[node] > 0)
{
rec_data[node] = new double[recv_lengths[node]];
if (!rec_data[node])
{
cout << "out of memory when new in short transfer, place 1" << endl;
MPI_Abort(MPI_COMM_WORLD, 1);
}
MPI_Irecv((void *)rec_data[node], recv_lengths[node], MPI_DOUBLE, node, 1, MPI_COMM_WORLD, reqs + req_no);
MPI_Irecv((void *)rec_data[node], recv_lengths[node], MPI_DOUBLE, node, mpi_tag, MPI_COMM_WORLD, reqs + req_no);
req_node[req_no] = node;
req_is_recv[req_no] = 1;
req_no++;
pending_recv++;
}
}
// Local transfer on this rank.
recv_lengths[myrank] = data_packer(0, src[myrank], dst[myrank], myrank, PACK, VarList1, VarList2, Symmetry);
if (recv_lengths[myrank] > 0)
{
}
// Local transfer on this rank.
recv_lengths[myrank] = data_packer(0, src[myrank], dst[myrank], myrank, PACK, VarList1, VarList2, Symmetry);
if (recv_lengths[myrank] > 0)
{
rec_data[myrank] = new double[recv_lengths[myrank]];
if (!rec_data[myrank])
{
@@ -3947,33 +4054,33 @@ void Parallel::transfer(MyList<Parallel::gridseg> **src, MyList<Parallel::gridse
data_packer(rec_data[myrank], src[myrank], dst[myrank], myrank, PACK, VarList1, VarList2, Symmetry);
}
// Pack and post sends.
for (node = 0; node < cpusize; node++)
{
if (node == myrank) continue;
send_lengths[node] = data_packer(0, src[myrank], dst[myrank], node, PACK, VarList1, VarList2, Symmetry);
if (send_lengths[node] > 0)
{
send_data[node] = new double[send_lengths[node]];
// Pack and post sends.
for (node = 0; node < cpusize; node++)
{
if (node == myrank) continue;
send_lengths[node] = data_packer(0, src[myrank], dst[myrank], node, PACK, VarList1, VarList2, Symmetry);
if (send_lengths[node] > 0)
{
send_data[node] = new double[send_lengths[node]];
if (!send_data[node])
{
cout << "out of memory when new in short transfer, place 3" << endl;
MPI_Abort(MPI_COMM_WORLD, 1);
}
data_packer(send_data[node], src[myrank], dst[myrank], node, PACK, VarList1, VarList2, Symmetry);
MPI_Isend((void *)send_data[node], send_lengths[node], MPI_DOUBLE, node, 1, MPI_COMM_WORLD, reqs + req_no);
MPI_Isend((void *)send_data[node], send_lengths[node], MPI_DOUBLE, node, mpi_tag, MPI_COMM_WORLD, reqs + req_no);
req_node[req_no] = node;
req_is_recv[req_no] = 0;
req_no++;
}
}
// Unpack as soon as receive completes to reduce pure wait time.
while (pending_recv > 0)
req_no++;
}
}
// Unpack as soon as receive completes to reduce pure wait time.
while (pending_recv > 0)
{
int outcount = 0;
MPI_Waitsome(req_no, reqs, &outcount, completed, stats);
parallel_waitsome_checked(req_no, reqs, &outcount, completed, stats,
"Parallel::transfer");
if (outcount == MPI_UNDEFINED) break;
for (int i = 0; i < outcount; i++)
@@ -3988,7 +4095,7 @@ void Parallel::transfer(MyList<Parallel::gridseg> **src, MyList<Parallel::gridse
}
}
if (req_no > 0) MPI_Waitall(req_no, reqs, stats);
if (req_no > 0) parallel_waitall_checked(req_no, reqs, stats, "Parallel::transfer");
if (rec_data[myrank])
data_packer(rec_data[myrank], src[myrank], dst[myrank], myrank, UNPACK, VarList1, VarList2, Symmetry);
@@ -4005,12 +4112,12 @@ void Parallel::transfer(MyList<Parallel::gridseg> **src, MyList<Parallel::gridse
delete[] stats;
delete[] req_node;
delete[] req_is_recv;
delete[] completed;
delete[] send_data;
delete[] rec_data;
delete[] send_lengths;
delete[] recv_lengths;
}
delete[] completed;
delete[] send_data;
delete[] rec_data;
delete[] send_lengths;
delete[] recv_lengths;
}
//
void Parallel::transfermix(MyList<Parallel::gridseg> **src, MyList<Parallel::gridseg> **dst,
MyList<var> *VarList1 /* source */, MyList<var> *VarList2 /*target */,
@@ -4027,8 +4134,9 @@ void Parallel::transfermix(MyList<Parallel::gridseg> **src, MyList<Parallel::gri
int *req_node = new int[2 * cpusize];
int *req_is_recv = new int[2 * cpusize];
int *completed = new int[2 * cpusize];
int req_no = 0;
int pending_recv = 0;
int req_no = 0;
int pending_recv = 0;
const int mpi_tag = parallel_next_transfer_tag();
double **send_data = new double *[cpusize];
double **rec_data = new double *[cpusize];
@@ -4055,7 +4163,7 @@ void Parallel::transfermix(MyList<Parallel::gridseg> **src, MyList<Parallel::gri
cout << "out of memory when new in short transfer, place 1" << endl;
MPI_Abort(MPI_COMM_WORLD, 1);
}
MPI_Irecv((void *)rec_data[node], recv_lengths[node], MPI_DOUBLE, node, 1, MPI_COMM_WORLD, reqs + req_no);
MPI_Irecv((void *)rec_data[node], recv_lengths[node], MPI_DOUBLE, node, mpi_tag, MPI_COMM_WORLD, reqs + req_no);
req_node[req_no] = node;
req_is_recv[req_no] = 1;
req_no++;
@@ -4091,7 +4199,7 @@ void Parallel::transfermix(MyList<Parallel::gridseg> **src, MyList<Parallel::gri
MPI_Abort(MPI_COMM_WORLD, 1);
}
data_packermix(send_data[node], src[myrank], dst[myrank], node, PACK, VarList1, VarList2, Symmetry);
MPI_Isend((void *)send_data[node], send_lengths[node], MPI_DOUBLE, node, 1, MPI_COMM_WORLD, reqs + req_no);
MPI_Isend((void *)send_data[node], send_lengths[node], MPI_DOUBLE, node, mpi_tag, MPI_COMM_WORLD, reqs + req_no);
req_node[req_no] = node;
req_is_recv[req_no] = 0;
req_no++;
@@ -4102,7 +4210,8 @@ void Parallel::transfermix(MyList<Parallel::gridseg> **src, MyList<Parallel::gri
while (pending_recv > 0)
{
int outcount = 0;
MPI_Waitsome(req_no, reqs, &outcount, completed, stats);
parallel_waitsome_checked(req_no, reqs, &outcount, completed, stats,
"Parallel::transfermix");
if (outcount == MPI_UNDEFINED) break;
for (int i = 0; i < outcount; i++)
@@ -4117,7 +4226,7 @@ void Parallel::transfermix(MyList<Parallel::gridseg> **src, MyList<Parallel::gri
}
}
if (req_no > 0) MPI_Waitall(req_no, reqs, stats);
if (req_no > 0) parallel_waitall_checked(req_no, reqs, stats, "Parallel::transfermix");
if (rec_data[myrank])
data_packermix(rec_data[myrank], src[myrank], dst[myrank], myrank, UNPACK, VarList1, VarList2, Symmetry);
@@ -4134,16 +4243,21 @@ void Parallel::transfermix(MyList<Parallel::gridseg> **src, MyList<Parallel::gri
delete[] stats;
delete[] req_node;
delete[] req_is_recv;
delete[] completed;
delete[] send_data;
delete[] rec_data;
delete[] send_lengths;
delete[] recv_lengths;
}
void Parallel::Sync(Patch *Pat, MyList<var> *VarList, int Symmetry)
{
int cpusize;
MPI_Comm_size(MPI_COMM_WORLD, &cpusize);
delete[] completed;
delete[] send_data;
delete[] rec_data;
delete[] send_lengths;
delete[] recv_lengths;
}
void Parallel::Sync(Patch *Pat, MyList<var> *VarList, int Symmetry)
{
Sync(Pat, VarList, Symmetry, "Parallel::Sync(Patch)");
}
void Parallel::Sync(Patch *Pat, MyList<var> *VarList, int Symmetry, const char *context)
{
int cpusize;
MPI_Comm_size(MPI_COMM_WORLD, &cpusize);
MyList<Parallel::gridseg> *dst;
MyList<Parallel::gridseg> **src, **transfer_src, **transfer_dst;
@@ -4159,7 +4273,10 @@ void Parallel::Sync(Patch *Pat, MyList<var> *VarList, int Symmetry)
// but for transfer_dst[node] the data may locate on any node
}
transfer(transfer_src, transfer_dst, VarList, VarList, Symmetry);
{
ParallelTransferContextGuard transfer_ctx(context ? context : "Parallel::Sync(Patch)");
transfer(transfer_src, transfer_dst, VarList, VarList, Symmetry);
}
if (dst)
dst->destroyList();
@@ -4177,15 +4294,26 @@ void Parallel::Sync(Patch *Pat, MyList<var> *VarList, int Symmetry)
delete[] transfer_src;
delete[] transfer_dst;
}
void Parallel::Sync(MyList<Patch> *PatL, MyList<var> *VarList, int Symmetry)
{
// Patch inner Synch
MyList<Patch> *Pp = PatL;
while (Pp)
{
Sync(Pp->data, VarList, Symmetry);
Pp = Pp->next;
}
void Parallel::Sync(MyList<Patch> *PatL, MyList<var> *VarList, int Symmetry)
{
Sync(PatL, VarList, Symmetry, "Parallel::Sync(PatchList)");
}
void Parallel::Sync(MyList<Patch> *PatL, MyList<var> *VarList, int Symmetry, const char *context)
{
std::string patchlist_context = context ? std::string(context) + " [patchlist]" : "Parallel::Sync(PatchList)";
// Patch inner Synch
MyList<Patch> *Pp = PatL;
int patch_index = 0;
while (Pp)
{
std::string patch_context = context ? std::string(context) + " [patch " + std::to_string(patch_index) + "]"
: "Parallel::Sync(Patch)";
Sync(Pp->data, VarList, Symmetry, patch_context.c_str());
Pp = Pp->next;
patch_index++;
}
// Patch inter Synch
int cpusize;
@@ -4204,7 +4332,10 @@ void Parallel::Sync(MyList<Patch> *PatL, MyList<var> *VarList, int Symmetry)
build_gstl(src[node], dst, &transfer_src[node], &transfer_dst[node]); // for transfer[node], data locate on cpu#node
}
transfer(transfer_src, transfer_dst, VarList, VarList, Symmetry);
{
ParallelTransferContextGuard transfer_ctx(patchlist_context.c_str());
transfer(transfer_src, transfer_dst, VarList, VarList, Symmetry);
}
if (dst)
dst->destroyList();
@@ -4224,10 +4355,15 @@ void Parallel::Sync(MyList<Patch> *PatL, MyList<var> *VarList, int Symmetry)
}
// Merged Sync: collect all intra-patch and inter-patch grid segment lists,
// then issue a single transfer() call instead of N+1 separate ones.
void Parallel::Sync_merged(MyList<Patch> *PatL, MyList<var> *VarList, int Symmetry)
{
int cpusize;
MPI_Comm_size(MPI_COMM_WORLD, &cpusize);
void Parallel::Sync_merged(MyList<Patch> *PatL, MyList<var> *VarList, int Symmetry)
{
Sync_merged(PatL, VarList, Symmetry, "Parallel::Sync_merged");
}
void Parallel::Sync_merged(MyList<Patch> *PatL, MyList<var> *VarList, int Symmetry, const char *context)
{
int cpusize;
MPI_Comm_size(MPI_COMM_WORLD, &cpusize);
MyList<Parallel::gridseg> **combined_src = new MyList<Parallel::gridseg> *[cpusize];
MyList<Parallel::gridseg> **combined_dst = new MyList<Parallel::gridseg> *[cpusize];
@@ -4301,8 +4437,11 @@ void Parallel::Sync_merged(MyList<Patch> *PatL, MyList<var> *VarList, int Symmet
if (dst_buffer)
dst_buffer->destroyList();
// Phase C: Single transfer
transfer(combined_src, combined_dst, VarList, VarList, Symmetry);
// Phase C: Single transfer
{
ParallelTransferContextGuard transfer_ctx(context ? context : "Parallel::Sync_merged");
transfer(combined_src, combined_dst, VarList, VarList, Symmetry);
}
// Phase D: Cleanup
for (int node = 0; node < cpusize; node++)
@@ -4380,8 +4519,9 @@ void Parallel::transfer_cached(MyList<Parallel::gridseg> **src, MyList<Parallel:
MPI_Comm_rank(MPI_COMM_WORLD, &myrank);
int cpusize = cache.cpusize;
int req_no = 0;
int pending_recv = 0;
int req_no = 0;
int pending_recv = 0;
const int mpi_tag = parallel_next_transfer_tag();
int node;
int *req_node = cache.tc_req_node;
int *req_is_recv = cache.tc_req_is_recv;
@@ -4402,7 +4542,7 @@ void Parallel::transfer_cached(MyList<Parallel::gridseg> **src, MyList<Parallel:
cache.recv_bufs[node] = new double[rlength];
cache.recv_buf_caps[node] = rlength;
}
MPI_Irecv((void *)cache.recv_bufs[node], rlength, MPI_DOUBLE, node, 1, MPI_COMM_WORLD, cache.reqs + req_no);
MPI_Irecv((void *)cache.recv_bufs[node], rlength, MPI_DOUBLE, node, mpi_tag, MPI_COMM_WORLD, cache.reqs + req_no);
req_node[req_no] = node;
req_is_recv[req_no] = 1;
req_no++;
@@ -4440,7 +4580,7 @@ void Parallel::transfer_cached(MyList<Parallel::gridseg> **src, MyList<Parallel:
cache.send_buf_caps[node] = slength;
}
data_packer(cache.send_bufs[node], src[myrank], dst[myrank], node, PACK, VarList1, VarList2, Symmetry);
MPI_Isend((void *)cache.send_bufs[node], slength, MPI_DOUBLE, node, 1, MPI_COMM_WORLD, cache.reqs + req_no);
MPI_Isend((void *)cache.send_bufs[node], slength, MPI_DOUBLE, node, mpi_tag, MPI_COMM_WORLD, cache.reqs + req_no);
req_node[req_no] = node;
req_is_recv[req_no] = 0;
req_no++;
@@ -4451,7 +4591,8 @@ void Parallel::transfer_cached(MyList<Parallel::gridseg> **src, MyList<Parallel:
while (pending_recv > 0)
{
int outcount = 0;
MPI_Waitsome(req_no, cache.reqs, &outcount, completed, cache.stats);
parallel_waitsome_checked(req_no, cache.reqs, &outcount, completed, cache.stats,
"Parallel::transfer_cached");
if (outcount == MPI_UNDEFINED) break;
for (int i = 0; i < outcount; i++)
@@ -4466,11 +4607,11 @@ void Parallel::transfer_cached(MyList<Parallel::gridseg> **src, MyList<Parallel:
}
}
if (req_no > 0) MPI_Waitall(req_no, cache.reqs, cache.stats);
if (req_no > 0) parallel_waitall_checked(req_no, cache.reqs, cache.stats, "Parallel::transfer_cached");
if (self_len > 0)
data_packer(cache.recv_bufs[myrank], src[myrank], dst[myrank], myrank, UNPACK, VarList1, VarList2, Symmetry);
}
if (self_len > 0)
data_packer(cache.recv_bufs[myrank], src[myrank], dst[myrank], myrank, UNPACK, VarList1, VarList2, Symmetry);
}
void Parallel::Sync_cached(MyList<Patch> *PatL, MyList<var> *VarList, int Symmetry, SyncCache &cache)
{
if (!cache.valid)
@@ -4669,12 +4810,13 @@ void Parallel::Sync_start(MyList<Patch> *PatL, MyList<var> *VarList, int Symmetr
}
// Now pack and post async MPI operations
int myrank;
MPI_Comm_rank(MPI_COMM_WORLD, &myrank);
int cpusize = cache.cpusize;
state.req_no = 0;
state.active = true;
state.pending_recv = 0;
int myrank;
MPI_Comm_rank(MPI_COMM_WORLD, &myrank);
int cpusize = cache.cpusize;
state.req_no = 0;
state.active = true;
state.mpi_tag = parallel_next_transfer_tag();
state.pending_recv = 0;
// Allocate tracking arrays
delete[] state.req_node; delete[] state.req_is_recv;
state.req_node = new int[cache.max_reqs];
@@ -4725,7 +4867,7 @@ void Parallel::Sync_start(MyList<Patch> *PatL, MyList<var> *VarList, int Symmetr
data_packer(cache.send_bufs[node], src[myrank], dst[myrank], node, PACK, VarList, VarList, Symmetry);
state.req_node[state.req_no] = node;
state.req_is_recv[state.req_no] = 0;
MPI_Isend((void *)cache.send_bufs[node], slength, MPI_DOUBLE, node, 2, MPI_COMM_WORLD, cache.reqs + state.req_no++);
MPI_Isend((void *)cache.send_bufs[node], slength, MPI_DOUBLE, node, state.mpi_tag, MPI_COMM_WORLD, cache.reqs + state.req_no++);
}
int rlength;
if (!cache.lengths_valid) {
@@ -4745,7 +4887,7 @@ void Parallel::Sync_start(MyList<Patch> *PatL, MyList<var> *VarList, int Symmetr
state.req_node[state.req_no] = node;
state.req_is_recv[state.req_no] = 1;
state.pending_recv++;
MPI_Irecv((void *)cache.recv_bufs[node], rlength, MPI_DOUBLE, node, 2, MPI_COMM_WORLD, cache.reqs + state.req_no++);
MPI_Irecv((void *)cache.recv_bufs[node], rlength, MPI_DOUBLE, node, state.mpi_tag, MPI_COMM_WORLD, cache.reqs + state.req_no++);
}
}
}
@@ -4775,7 +4917,8 @@ void Parallel::Sync_finish(SyncCache &cache, AsyncSyncState &state,
while (pending > 0)
{
int outcount = 0;
MPI_Waitsome(state.req_no, cache.reqs, &outcount, completed, cache.stats);
parallel_waitsome_checked(state.req_no, cache.reqs, &outcount, completed, cache.stats,
"Parallel::Sync_finish");
if (outcount == MPI_UNDEFINED) break;
for (int i = 0; i < outcount; i++)
{
@@ -4791,8 +4934,8 @@ void Parallel::Sync_finish(SyncCache &cache, AsyncSyncState &state,
delete[] completed;
}
// Wait for remaining sends
if (state.req_no > 0) MPI_Waitall(state.req_no, cache.reqs, cache.stats);
// Wait for remaining sends
if (state.req_no > 0) parallel_waitall_checked(state.req_no, cache.reqs, cache.stats, "Parallel::Sync_finish");
delete[] state.req_node; state.req_node = 0;
delete[] state.req_is_recv; state.req_is_recv = 0;
@@ -5235,7 +5378,10 @@ void Parallel::PeriodicBD(Patch *Pat, MyList<var> *VarList, int Symmetry)
build_PhysBD_gstl(Pat, src[node], dst, &transfer_src[node], &transfer_dst[node]); // for transfer[node], data locate on cpu#node
}
transfer(transfer_src, transfer_dst, VarList, VarList, Symmetry);
{
ParallelTransferContextGuard transfer_ctx("Parallel::PeriodicBD");
transfer(transfer_src, transfer_dst, VarList, VarList, Symmetry);
}
if (dst)
dst->destroyList();
@@ -5506,7 +5652,10 @@ void Parallel::Prolong(Patch *Patc, Patch *Patf,
build_gstl(src[node], dst, &transfer_src[node], &transfer_dst[node]); // for transfer[node], data locate on cpu#node
}
transfer(transfer_src, transfer_dst, VarList1, VarList2, Symmetry);
{
ParallelTransferContextGuard transfer_ctx("Parallel::Prolong");
transfer(transfer_src, transfer_dst, VarList1, VarList2, Symmetry);
}
if (dst)
dst->destroyList();
@@ -5565,7 +5714,10 @@ void Parallel::Restrict(MyList<Patch> *PatcL, MyList<Patch> *PatfL,
build_gstl(src[node], dst, &transfer_src[node], &transfer_dst[node]); // for transfer[node], data locate on cpu#node
}
transfer(transfer_src, transfer_dst, VarList1, VarList2, Symmetry);
{
ParallelTransferContextGuard transfer_ctx("Parallel::Restrict");
transfer(transfer_src, transfer_dst, VarList1, VarList2, Symmetry);
}
if (dst)
dst->destroyList();
@@ -5609,7 +5761,10 @@ void Parallel::Restrict_after(MyList<Patch> *PatcL, MyList<Patch> *PatfL,
build_gstl(src[node], dst, &transfer_src[node], &transfer_dst[node]); // for transfer[node], data locate on cpu#node
}
transfer(transfer_src, transfer_dst, VarList1, VarList2, Symmetry);
{
ParallelTransferContextGuard transfer_ctx("Parallel::Restrict_after");
transfer(transfer_src, transfer_dst, VarList1, VarList2, Symmetry);
}
if (dst)
dst->destroyList();
@@ -5654,7 +5809,10 @@ void Parallel::OutBdLow2Hi(Patch *Patc, Patch *Patf,
build_gstl(src[node], dst, &transfer_src[node], &transfer_dst[node]); // for transfer[node], data locate on cpu#node
}
transfer(transfer_src, transfer_dst, VarList1, VarList2, Symmetry);
{
ParallelTransferContextGuard transfer_ctx("Parallel::OutBdLow2Hi(Patch)");
transfer(transfer_src, transfer_dst, VarList1, VarList2, Symmetry);
}
if (dst)
dst->destroyList();
@@ -5710,7 +5868,10 @@ void Parallel::OutBdLow2Hi(MyList<Patch> *PatcL, MyList<Patch> *PatfL,
build_gstl(src[node], dst, &transfer_src[node], &transfer_dst[node]); // for transfer[node], data locate on cpu#node
}
transfer(transfer_src, transfer_dst, VarList1, VarList2, Symmetry);
{
ParallelTransferContextGuard transfer_ctx("Parallel::OutBdLow2Hi(PatchList)");
transfer(transfer_src, transfer_dst, VarList1, VarList2, Symmetry);
}
if (dst)
dst->destroyList();
@@ -5981,13 +6142,14 @@ void Parallel::OutBdLow2Himix_cached(MyList<Patch> *PatcL, MyList<Patch> *PatfL,
int myrank;
MPI_Comm_size(MPI_COMM_WORLD, &cache.cpusize);
MPI_Comm_rank(MPI_COMM_WORLD, &myrank);
int cpusize = cache.cpusize;
int req_no = 0;
int pending_recv = 0;
int *req_node = new int[cache.max_reqs];
int *req_is_recv = new int[cache.max_reqs];
int *completed = new int[cache.max_reqs];
int cpusize = cache.cpusize;
int req_no = 0;
int pending_recv = 0;
const int mpi_tag = parallel_next_transfer_tag();
int *req_node = new int[cache.max_reqs];
int *req_is_recv = new int[cache.max_reqs];
int *completed = new int[cache.max_reqs];
// Post receives first so peers can progress rendezvous early.
for (int node = 0; node < cpusize; node++)
@@ -6004,7 +6166,7 @@ void Parallel::OutBdLow2Himix_cached(MyList<Patch> *PatcL, MyList<Patch> *PatfL,
cache.recv_bufs[node] = new double[rlength];
cache.recv_buf_caps[node] = rlength;
}
MPI_Irecv((void *)cache.recv_bufs[node], rlength, MPI_DOUBLE, node, 1, MPI_COMM_WORLD, cache.reqs + req_no);
MPI_Irecv((void *)cache.recv_bufs[node], rlength, MPI_DOUBLE, node, mpi_tag, MPI_COMM_WORLD, cache.reqs + req_no);
req_node[req_no] = node;
req_is_recv[req_no] = 1;
req_no++;
@@ -6042,7 +6204,7 @@ void Parallel::OutBdLow2Himix_cached(MyList<Patch> *PatcL, MyList<Patch> *PatfL,
cache.send_buf_caps[node] = slength;
}
data_packermix(cache.send_bufs[node], cache.combined_src[myrank], cache.combined_dst[myrank], node, PACK, VarList1, VarList2, Symmetry);
MPI_Isend((void *)cache.send_bufs[node], slength, MPI_DOUBLE, node, 1, MPI_COMM_WORLD, cache.reqs + req_no);
MPI_Isend((void *)cache.send_bufs[node], slength, MPI_DOUBLE, node, mpi_tag, MPI_COMM_WORLD, cache.reqs + req_no);
req_node[req_no] = node;
req_is_recv[req_no] = 0;
req_no++;
@@ -6053,7 +6215,8 @@ void Parallel::OutBdLow2Himix_cached(MyList<Patch> *PatcL, MyList<Patch> *PatfL,
while (pending_recv > 0)
{
int outcount = 0;
MPI_Waitsome(req_no, cache.reqs, &outcount, completed, cache.stats);
parallel_waitsome_checked(req_no, cache.reqs, &outcount, completed, cache.stats,
"Parallel::transfermix_cached");
if (outcount == MPI_UNDEFINED) break;
for (int i = 0; i < outcount; i++)
@@ -6068,10 +6231,10 @@ void Parallel::OutBdLow2Himix_cached(MyList<Patch> *PatcL, MyList<Patch> *PatfL,
}
}
if (req_no > 0) MPI_Waitall(req_no, cache.reqs, cache.stats);
if (req_no > 0) parallel_waitall_checked(req_no, cache.reqs, cache.stats, "Parallel::transfermix_cached");
if (self_len > 0)
data_packermix(cache.recv_bufs[myrank], cache.combined_src[myrank], cache.combined_dst[myrank], myrank, UNPACK, VarList1, VarList2, Symmetry);
if (self_len > 0)
data_packermix(cache.recv_bufs[myrank], cache.combined_src[myrank], cache.combined_dst[myrank], myrank, UNPACK, VarList1, VarList2, Symmetry);
delete[] req_node;
delete[] req_is_recv;
@@ -6787,7 +6950,10 @@ void Parallel::fill_level_data(MyList<Patch> *PatLd, MyList<Patch> *PatLs, MyLis
build_gstl(src[node], dst, &transfer_src[node], &transfer_dst[node]); // for transfer[node], data locate on cpu#node
}
transfer(transfer_src, transfer_dst, VarList, VarList, Symmetry);
{
ParallelTransferContextGuard transfer_ctx("Parallel::Interp copy");
transfer(transfer_src, transfer_dst, VarList, VarList, Symmetry);
}
for (int node = 0; node < cpusize; node++)
{
@@ -6826,7 +6992,10 @@ void Parallel::fill_level_data(MyList<Patch> *PatLd, MyList<Patch> *PatLs, MyLis
Sync(PatcL, FutureList, Symmetry);
}
//<~~~prolong then
transfer(transfer_src, transfer_dst, FutureList, FutureList, Symmetry);
{
ParallelTransferContextGuard transfer_ctx("Parallel::Interp prolong FutureList");
transfer(transfer_src, transfer_dst, FutureList, FutureList, Symmetry);
}
// for StateList
// time interpolation part
@@ -6842,7 +7011,10 @@ void Parallel::fill_level_data(MyList<Patch> *PatLd, MyList<Patch> *PatLs, MyLis
Sync(PatcL, tmList, Symmetry);
}
//<~~~prolong then
transfer(transfer_src, transfer_dst, tmList, StateList, Symmetry);
{
ParallelTransferContextGuard transfer_ctx("Parallel::Interp prolong StateList");
transfer(transfer_src, transfer_dst, tmList, StateList, Symmetry);
}
}
else
{
@@ -6853,7 +7025,10 @@ void Parallel::fill_level_data(MyList<Patch> *PatLd, MyList<Patch> *PatLs, MyLis
Sync(PatcL, VarList, Symmetry);
}
//<~~~prolong then
transfer(transfer_src, transfer_dst, VarList, VarList, Symmetry);
{
ParallelTransferContextGuard transfer_ctx("Parallel::Interp prolong VarList");
transfer(transfer_src, transfer_dst, VarList, VarList, Symmetry);
}
}
for (int node = 0; node < cpusize; node++)