Unpack intermediate sync stages directly to GPU
This commit is contained in:
@@ -123,6 +123,85 @@ bool parallel_gpu_pack_segments(double *data,
|
||||
return true;
|
||||
}
|
||||
|
||||
bool parallel_can_gpu_unpack_segments(MyList<Parallel::gridseg> *src, MyList<Parallel::gridseg> *dst,
|
||||
int rank_in, MyList<var> *VarLists, MyList<var> *VarListd)
|
||||
{
|
||||
int myrank;
|
||||
MPI_Comm_rank(MPI_COMM_WORLD, &myrank);
|
||||
if (!src || !dst)
|
||||
return false;
|
||||
|
||||
if (src->data->Bg->lev != dst->data->Bg->lev)
|
||||
return false;
|
||||
|
||||
while (src && dst)
|
||||
{
|
||||
if ((src->data->Bg->rank == rank_in) && (dst->data->Bg->rank == myrank))
|
||||
{
|
||||
MyList<var> *varls = VarLists;
|
||||
MyList<var> *varld = VarListd;
|
||||
while (varls && varld)
|
||||
{
|
||||
(void)varls;
|
||||
if (!bssn_gpu_find_device_buffer(dst->data->Bg->fgfs[varld->data->sgfn]))
|
||||
return false;
|
||||
varls = varls->next;
|
||||
varld = varld->next;
|
||||
}
|
||||
if (varls || varld)
|
||||
return false;
|
||||
}
|
||||
src = src->next;
|
||||
dst = dst->next;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
bool parallel_gpu_unpack_segments(const double *data,
|
||||
MyList<Parallel::gridseg> *src, MyList<Parallel::gridseg> *dst,
|
||||
int rank_in, MyList<var> *VarLists, MyList<var> *VarListd)
|
||||
{
|
||||
if (!data)
|
||||
return false;
|
||||
|
||||
int myrank;
|
||||
MPI_Comm_rank(MPI_COMM_WORLD, &myrank);
|
||||
if (!src || !dst)
|
||||
return false;
|
||||
if (src->data->Bg->lev != dst->data->Bg->lev)
|
||||
return false;
|
||||
|
||||
int size_out = 0;
|
||||
while (src && dst)
|
||||
{
|
||||
if ((src->data->Bg->rank == rank_in) && (dst->data->Bg->rank == myrank))
|
||||
{
|
||||
MyList<var> *varls = VarLists;
|
||||
MyList<var> *varld = VarListd;
|
||||
while (varls && varld)
|
||||
{
|
||||
(void)varls;
|
||||
if (bssn_gpu_stage_upload_buffer_to_region(data + size_out,
|
||||
dst->data->Bg->fgfs[varld->data->sgfn],
|
||||
dst->data->Bg->shape,
|
||||
dst->data->Bg->bbox,
|
||||
dst->data->Bg->bbox + dim,
|
||||
dst->data->shape,
|
||||
dst->data->llb))
|
||||
return false;
|
||||
size_out += dst->data->shape[0] * dst->data->shape[1] * dst->data->shape[2];
|
||||
varls = varls->next;
|
||||
varld = varld->next;
|
||||
}
|
||||
if (varls || varld)
|
||||
return false;
|
||||
}
|
||||
src = src->next;
|
||||
dst = dst->next;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
void parallel_report_mpi_error(const char *context, int errcode, int req_no)
|
||||
{
|
||||
char errstr[MPI_MAX_ERROR_STRING];
|
||||
@@ -4996,23 +5075,28 @@ void Parallel::Sync_start(MyList<Patch> *PatL, MyList<var> *VarList, int Symmetr
|
||||
cache.lengths_valid = true;
|
||||
}
|
||||
// Sync_finish: progressive unpack as receives complete, then wait for sends
|
||||
void Parallel::Sync_finish(SyncCache &cache, AsyncSyncState &state,
|
||||
MyList<var> *VarList, int Symmetry)
|
||||
{
|
||||
if (!state.active)
|
||||
return;
|
||||
void Parallel::Sync_finish(SyncCache &cache, AsyncSyncState &state,
|
||||
MyList<var> *VarList, int Symmetry, bool unpack_to_host)
|
||||
{
|
||||
if (!state.active)
|
||||
return;
|
||||
|
||||
int myrank;
|
||||
MPI_Comm_rank(MPI_COMM_WORLD, &myrank);
|
||||
MyList<Parallel::gridseg> **src = cache.combined_src;
|
||||
MyList<Parallel::gridseg> **dst = cache.combined_dst;
|
||||
|
||||
// Unpack local data first (no MPI needed)
|
||||
if (cache.recv_bufs[myrank] && cache.recv_lengths[myrank] > 0)
|
||||
data_packer(cache.recv_bufs[myrank], src[myrank], dst[myrank], myrank, UNPACK, VarList, VarList, Symmetry);
|
||||
|
||||
// Progressive unpack of remote receives
|
||||
if (state.pending_recv > 0 && state.req_no > 0)
|
||||
|
||||
// Unpack local data first (no MPI needed)
|
||||
if (cache.recv_bufs[myrank] && cache.recv_lengths[myrank] > 0)
|
||||
{
|
||||
if (unpack_to_host ||
|
||||
!parallel_can_gpu_unpack_segments(src[myrank], dst[myrank], myrank, VarList, VarList) ||
|
||||
!parallel_gpu_unpack_segments(cache.recv_bufs[myrank], src[myrank], dst[myrank], myrank, VarList, VarList))
|
||||
data_packer(cache.recv_bufs[myrank], src[myrank], dst[myrank], myrank, UNPACK, VarList, VarList, Symmetry);
|
||||
}
|
||||
|
||||
// Progressive unpack of remote receives
|
||||
if (state.pending_recv > 0 && state.req_no > 0)
|
||||
{
|
||||
int pending = state.pending_recv;
|
||||
int *completed = new int[cache.max_reqs];
|
||||
@@ -5025,13 +5109,16 @@ void Parallel::Sync_finish(SyncCache &cache, AsyncSyncState &state,
|
||||
for (int i = 0; i < outcount; i++)
|
||||
{
|
||||
int idx = completed[i];
|
||||
if (idx >= 0 && state.req_is_recv[idx])
|
||||
{
|
||||
int recv_node = state.req_node[idx];
|
||||
data_packer(cache.recv_bufs[recv_node], src[recv_node], dst[recv_node], recv_node, UNPACK, VarList, VarList, Symmetry);
|
||||
pending--;
|
||||
}
|
||||
}
|
||||
if (idx >= 0 && state.req_is_recv[idx])
|
||||
{
|
||||
int recv_node = state.req_node[idx];
|
||||
if (unpack_to_host ||
|
||||
!parallel_can_gpu_unpack_segments(src[recv_node], dst[recv_node], recv_node, VarList, VarList) ||
|
||||
!parallel_gpu_unpack_segments(cache.recv_bufs[recv_node], src[recv_node], dst[recv_node], recv_node, VarList, VarList))
|
||||
data_packer(cache.recv_bufs[recv_node], src[recv_node], dst[recv_node], recv_node, UNPACK, VarList, VarList, Symmetry);
|
||||
pending--;
|
||||
}
|
||||
}
|
||||
}
|
||||
delete[] completed;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user