diff --git a/AMSS_NCKU_source/Parallel.C b/AMSS_NCKU_source/Parallel.C index 0550ac6..aa49d29 100644 --- a/AMSS_NCKU_source/Parallel.C +++ b/AMSS_NCKU_source/Parallel.C @@ -207,6 +207,17 @@ bool parallel_gpu_unpack_segments(const double *data, return true; } +int parallel_var_list_count(MyList *var_list) +{ + int count = 0; + while (var_list) + { + count++; + var_list = var_list->next; + } + return count; +} + void parallel_report_mpi_error(const char *context, int errcode, int req_no) { char errstr[MPI_MAX_ERROR_STRING]; @@ -4642,13 +4653,14 @@ void Parallel::Sync_merged(MyList *PatL, MyList *VarList, int Symmet delete[] combined_dst; } // SyncCache constructor -Parallel::SyncCache::SyncCache() - : valid(false), cpusize(0), combined_src(0), combined_dst(0), - send_lengths(0), recv_lengths(0), send_bufs(0), recv_bufs(0), - send_buf_caps(0), recv_buf_caps(0), reqs(0), stats(0), max_reqs(0), - lengths_valid(false), tc_req_node(0), tc_req_is_recv(0), tc_completed(0) -{ -} +Parallel::SyncCache::SyncCache() + : valid(false), cpusize(0), combined_src(0), combined_dst(0), + send_lengths(0), recv_lengths(0), send_bufs(0), recv_bufs(0), + send_buf_caps(0), recv_buf_caps(0), reqs(0), stats(0), max_reqs(0), + lengths_valid(false), lengths_var_count(-1), + tc_req_node(0), tc_req_is_recv(0), tc_completed(0) +{ +} // SyncCache invalidate: free grid segment lists but keep buffers void Parallel::SyncCache::invalidate() { @@ -4662,10 +4674,11 @@ void Parallel::SyncCache::invalidate() combined_dst[i]->destroyList(); combined_src[i] = combined_dst[i] = 0; send_lengths[i] = recv_lengths[i] = 0; - } - valid = false; - lengths_valid = false; -} + } + valid = false; + lengths_valid = false; + lengths_var_count = -1; +} // SyncCache destroy: free everything void Parallel::SyncCache::destroy() { @@ -4691,16 +4704,18 @@ void Parallel::SyncCache::destroy() combined_src = combined_dst = 0; send_lengths = recv_lengths = 0; send_buf_caps = recv_buf_caps = 0; - send_bufs = recv_bufs = 0; - reqs = 0; stats = 0; - tc_req_node = 0; tc_req_is_recv = 0; tc_completed = 0; - cpusize = 0; max_reqs = 0; -} + send_bufs = recv_bufs = 0; + reqs = 0; stats = 0; + tc_req_node = 0; tc_req_is_recv = 0; tc_completed = 0; + cpusize = 0; max_reqs = 0; + lengths_valid = false; + lengths_var_count = -1; +} // transfer_cached: reuse pre-allocated buffers from SyncCache -void Parallel::transfer_cached(MyList **src, MyList **dst, - MyList *VarList1, MyList *VarList2, - int Symmetry, SyncCache &cache) -{ +void Parallel::transfer_cached(MyList **src, MyList **dst, + MyList *VarList1, MyList *VarList2, + int Symmetry, SyncCache &cache) +{ int myrank; MPI_Comm_size(MPI_COMM_WORLD, &cache.cpusize); MPI_Comm_rank(MPI_COMM_WORLD, &myrank); @@ -4709,21 +4724,29 @@ void Parallel::transfer_cached(MyList **src, MyList 0) - { - if (rlength > cache.recv_buf_caps[node]) + for (node = 0; node < cpusize; node++) + { + if (node == myrank) continue; + + int rlength; + if (!lengths_match) + { + rlength = data_packer(0, src[node], dst[node], node, UNPACK, VarList1, VarList2, Symmetry); + cache.recv_lengths[node] = rlength; + } + else + rlength = cache.recv_lengths[node]; + if (rlength > 0) + { + if (rlength > cache.recv_buf_caps[node]) { if (cache.recv_bufs[node]) delete[] cache.recv_bufs[node]; cache.recv_bufs[node] = new double[rlength]; @@ -4735,13 +4758,19 @@ void Parallel::transfer_cached(MyList **src, MyList 0) - { + } + + // Local transfer on this rank. + int self_len; + if (!lengths_match) + { + self_len = data_packer(0, src[myrank], dst[myrank], myrank, PACK, VarList1, VarList2, Symmetry); + cache.recv_lengths[myrank] = self_len; + } + else + self_len = cache.recv_lengths[myrank]; + if (self_len > 0) + { if (self_len > cache.recv_buf_caps[myrank]) { if (cache.recv_bufs[myrank]) delete[] cache.recv_bufs[myrank]; @@ -4752,14 +4781,20 @@ void Parallel::transfer_cached(MyList **src, MyList 0) - { + for (node = 0; node < cpusize; node++) + { + if (node == myrank) continue; + + int slength; + if (!lengths_match) + { + slength = data_packer(0, src[myrank], dst[myrank], node, PACK, VarList1, VarList2, Symmetry); + cache.send_lengths[node] = slength; + } + else + slength = cache.send_lengths[node]; + if (slength > 0) + { if (slength > cache.send_buf_caps[node]) { if (cache.send_bufs[node]) delete[] cache.send_bufs[node]; @@ -4772,11 +4807,14 @@ void Parallel::transfer_cached(MyList **src, MyList 0) - { + } + + cache.lengths_valid = true; + cache.lengths_var_count = current_var_count; + + // Unpack as soon as receive completes to reduce pure wait time. + while (pending_recv > 0) + { int outcount = 0; parallel_waitsome_checked(req_no, cache.reqs, &outcount, completed, cache.stats, "Parallel::transfer_cached"); @@ -4900,9 +4938,9 @@ void Parallel::Sync_cached(MyList *PatL, MyList *VarList, int Symmet transfer_cached(cache.combined_src, cache.combined_dst, VarList, VarList, Symmetry, cache); } // Sync_start: pack and post MPI_Isend/Irecv, return immediately -void Parallel::Sync_start(MyList *PatL, MyList *VarList, int Symmetry, - SyncCache &cache, AsyncSyncState &state) -{ +void Parallel::Sync_start(MyList *PatL, MyList *VarList, int Symmetry, + SyncCache &cache, AsyncSyncState &state) +{ // Ensure cache is built if (!cache.valid) { @@ -5000,6 +5038,8 @@ void Parallel::Sync_start(MyList *PatL, MyList *VarList, int Symmetr int myrank; MPI_Comm_rank(MPI_COMM_WORLD, &myrank); int cpusize = cache.cpusize; + const int current_var_count = parallel_var_list_count(VarList); + const bool lengths_match = cache.lengths_valid && cache.lengths_var_count == current_var_count; state.req_no = 0; state.active = true; state.mpi_tag = parallel_next_transfer_tag(); @@ -5017,10 +5057,10 @@ void Parallel::Sync_start(MyList *PatL, MyList *VarList, int Symmetr if (node == myrank) { int length; - if (!cache.lengths_valid) { - length = data_packer(0, src[myrank], dst[myrank], node, PACK, VarList, VarList, Symmetry); - cache.recv_lengths[node] = length; - } else { + if (!lengths_match) { + length = data_packer(0, src[myrank], dst[myrank], node, PACK, VarList, VarList, Symmetry); + cache.recv_lengths[node] = length; + } else { length = cache.recv_lengths[node]; } if (length > 0) @@ -5040,10 +5080,10 @@ void Parallel::Sync_start(MyList *PatL, MyList *VarList, int Symmetr else { int slength; - if (!cache.lengths_valid) { - slength = data_packer(0, src[myrank], dst[myrank], node, PACK, VarList, VarList, Symmetry); - cache.send_lengths[node] = slength; - } else { + if (!lengths_match) { + slength = data_packer(0, src[myrank], dst[myrank], node, PACK, VarList, VarList, Symmetry); + cache.send_lengths[node] = slength; + } else { slength = cache.send_lengths[node]; } if (slength > 0) @@ -5063,10 +5103,10 @@ void Parallel::Sync_start(MyList *PatL, MyList *VarList, int Symmetr MPI_Isend((void *)cache.send_bufs[node], slength, MPI_DOUBLE, node, state.mpi_tag, MPI_COMM_WORLD, cache.reqs + state.req_no++); } int rlength; - if (!cache.lengths_valid) { - rlength = data_packer(0, src[node], dst[node], node, UNPACK, VarList, VarList, Symmetry); - cache.recv_lengths[node] = rlength; - } else { + if (!lengths_match) { + rlength = data_packer(0, src[node], dst[node], node, UNPACK, VarList, VarList, Symmetry); + cache.recv_lengths[node] = rlength; + } else { rlength = cache.recv_lengths[node]; } if (rlength > 0) @@ -5083,10 +5123,11 @@ void Parallel::Sync_start(MyList *PatL, MyList *VarList, int Symmetr MPI_Irecv((void *)cache.recv_bufs[node], rlength, MPI_DOUBLE, node, state.mpi_tag, MPI_COMM_WORLD, cache.reqs + state.req_no++); } } - } - cache.lengths_valid = true; -} -// Sync_finish: progressive unpack as receives complete, then wait for sends + } + cache.lengths_valid = true; + cache.lengths_var_count = current_var_count; +} +// Sync_finish: progressive unpack as receives complete, then wait for sends void Parallel::Sync_finish(SyncCache &cache, AsyncSyncState &state, MyList *VarList, int Symmetry, bool unpack_to_host) { @@ -6348,19 +6389,27 @@ void Parallel::OutBdLow2Himix_cached(MyList *PatcL, MyList *PatfL, int req_no = 0; int pending_recv = 0; const int mpi_tag = parallel_next_transfer_tag(); - int *req_node = new int[cache.max_reqs]; - int *req_is_recv = new int[cache.max_reqs]; - int *completed = new int[cache.max_reqs]; - - // Post receives first so peers can progress rendezvous early. - for (int node = 0; node < cpusize; node++) - { - if (node == myrank) continue; - - int rlength = data_packermix(0, cache.combined_src[node], cache.combined_dst[node], node, UNPACK, VarList1, VarList2, Symmetry); - cache.recv_lengths[node] = rlength; - if (rlength > 0) - { + const int current_var_count = parallel_var_list_count(VarList1); + const bool lengths_match = cache.lengths_valid && cache.lengths_var_count == current_var_count; + int *req_node = cache.tc_req_node; + int *req_is_recv = cache.tc_req_is_recv; + int *completed = cache.tc_completed; + + // Post receives first so peers can progress rendezvous early. + for (int node = 0; node < cpusize; node++) + { + if (node == myrank) continue; + + int rlength; + if (!lengths_match) + { + rlength = data_packermix(0, cache.combined_src[node], cache.combined_dst[node], node, UNPACK, VarList1, VarList2, Symmetry); + cache.recv_lengths[node] = rlength; + } + else + rlength = cache.recv_lengths[node]; + if (rlength > 0) + { if (rlength > cache.recv_buf_caps[node]) { if (cache.recv_bufs[node]) delete[] cache.recv_bufs[node]; @@ -6373,13 +6422,19 @@ void Parallel::OutBdLow2Himix_cached(MyList *PatcL, MyList *PatfL, req_no++; pending_recv++; } - } - - // Local transfer on this rank. - int self_len = data_packermix(0, cache.combined_src[myrank], cache.combined_dst[myrank], myrank, PACK, VarList1, VarList2, Symmetry); - cache.recv_lengths[myrank] = self_len; - if (self_len > 0) - { + } + + // Local transfer on this rank. + int self_len; + if (!lengths_match) + { + self_len = data_packermix(0, cache.combined_src[myrank], cache.combined_dst[myrank], myrank, PACK, VarList1, VarList2, Symmetry); + cache.recv_lengths[myrank] = self_len; + } + else + self_len = cache.recv_lengths[myrank]; + if (self_len > 0) + { if (self_len > cache.recv_buf_caps[myrank]) { if (cache.recv_bufs[myrank]) delete[] cache.recv_bufs[myrank]; @@ -6390,14 +6445,20 @@ void Parallel::OutBdLow2Himix_cached(MyList *PatcL, MyList *PatfL, } // Pack and post sends. - for (int node = 0; node < cpusize; node++) - { - if (node == myrank) continue; - - int slength = data_packermix(0, cache.combined_src[myrank], cache.combined_dst[myrank], node, PACK, VarList1, VarList2, Symmetry); - cache.send_lengths[node] = slength; - if (slength > 0) - { + for (int node = 0; node < cpusize; node++) + { + if (node == myrank) continue; + + int slength; + if (!lengths_match) + { + slength = data_packermix(0, cache.combined_src[myrank], cache.combined_dst[myrank], node, PACK, VarList1, VarList2, Symmetry); + cache.send_lengths[node] = slength; + } + else + slength = cache.send_lengths[node]; + if (slength > 0) + { if (slength > cache.send_buf_caps[node]) { if (cache.send_bufs[node]) delete[] cache.send_bufs[node]; @@ -6410,11 +6471,14 @@ void Parallel::OutBdLow2Himix_cached(MyList *PatcL, MyList *PatfL, req_is_recv[req_no] = 0; req_no++; } - } - - // Unpack as soon as receive completes to reduce pure wait time. - while (pending_recv > 0) - { + } + + cache.lengths_valid = true; + cache.lengths_var_count = current_var_count; + + // Unpack as soon as receive completes to reduce pure wait time. + while (pending_recv > 0) + { int outcount = 0; parallel_waitsome_checked(req_no, cache.reqs, &outcount, completed, cache.stats, "Parallel::transfermix_cached"); @@ -6433,14 +6497,10 @@ void Parallel::OutBdLow2Himix_cached(MyList *PatcL, MyList *PatfL, } if (req_no > 0) parallel_waitall_checked(req_no, cache.reqs, cache.stats, "Parallel::transfermix_cached"); - + if (self_len > 0) data_packermix(cache.recv_bufs[myrank], cache.combined_src[myrank], cache.combined_dst[myrank], myrank, UNPACK, VarList1, VarList2, Symmetry); - - delete[] req_node; - delete[] req_is_recv; - delete[] completed; -} +} // collect all buffer grid segments or blocks for given patch MyList *Parallel::build_buffer_gsl(Patch *Pat) diff --git a/AMSS_NCKU_source/Parallel.h b/AMSS_NCKU_source/Parallel.h index e1e3784..22d41f3 100644 --- a/AMSS_NCKU_source/Parallel.h +++ b/AMSS_NCKU_source/Parallel.h @@ -108,12 +108,13 @@ namespace Parallel int *send_buf_caps; int *recv_buf_caps; MPI_Request *reqs; - MPI_Status *stats; - int max_reqs; - bool lengths_valid; - int *tc_req_node; - int *tc_req_is_recv; - int *tc_completed; + MPI_Status *stats; + int max_reqs; + bool lengths_valid; + int lengths_var_count; + int *tc_req_node; + int *tc_req_is_recv; + int *tc_completed; SyncCache(); void invalidate(); void destroy(); diff --git a/AMSS_NCKU_source/bssn_cuda_ops.cu b/AMSS_NCKU_source/bssn_cuda_ops.cu index 59eefd2..0b08db6 100644 --- a/AMSS_NCKU_source/bssn_cuda_ops.cu +++ b/AMSS_NCKU_source/bssn_cuda_ops.cu @@ -2016,25 +2016,6 @@ int bssn_cuda_prolong3_pack(int wei, if (!launch_kernel(grid, block, (const void *)prolong3_cell_kernel, args)) return 1; - cudaError_t sync_err = cudaDeviceSynchronize(); - if (sync_err != cudaSuccess) - { - std::fprintf(stderr, - "prolong3 debug: symmetry=%d extc=(%d,%d,%d) extf=(%d,%d,%d) " - "imino=%d imaxo=%d jmino=%d jmaxo=%d kmino=%d kmaxo=%d " - "ic_min=%d ic_max=%d jc_min=%d jc_max=%d kc_min=%d kc_max=%d " - "lbc=(%d,%d,%d) lbf=(%d,%d,%d)\n", - symmetry, - extc[0], extc[1], extc[2], - extf[0], extf[1], extf[2], - imino, imaxo, jmino, jmaxo, kmino, kmaxo, - ic_min, ic_max, jc_min, jc_max, kc_min, kc_max, - lbc[0], lbc[1], lbc[2], - lbf[0], lbf[1], lbf[2]); - report_cuda_error("cudaDeviceSynchronize prolong3", sync_err); - return 1; - } - int host_error_flag = 0; err = cudaMemcpy(&host_error_flag, cache.error_flag.ptr, sizeof(int), cudaMemcpyDeviceToHost); if (err != cudaSuccess) @@ -2241,28 +2222,6 @@ int bssn_cuda_restrict3_pack(int wei, if (!launch_kernel(grid, block, (const void *)restrict3_cell_kernel, args)) return 1; - cudaError_t sync_err = cudaDeviceSynchronize(); - if (sync_err != cudaSuccess) - { - std::fprintf(stderr, - "restrict3 debug: symmetry=%d extc=(%d,%d,%d) extf=(%d,%d,%d) " - "imino=%d imaxo=%d jmino=%d jmaxo=%d kmino=%d kmaxo=%d " - "imini=%d imaxi=%d jmini=%d jmaxi=%d kmini=%d kmaxi=%d " - "lbc=(%d,%d,%d) lbf=(%d,%d,%d) " - "fi=[%d,%d] fj=[%d,%d] fk=[%d,%d] window=[%d:%d,%d:%d,%d:%d]\n", - symmetry, - extc[0], extc[1], extc[2], - extf[0], extf[1], extf[2], - imino, imaxo, jmino, jmaxo, kmino, kmaxo, - imini, imaxi, jmini, jmaxi, kmini, kmaxi, - lbc[0], lbc[1], lbc[2], - lbf[0], lbf[1], lbf[2], - fi_min, fi_max, fj_min, fj_max, fk_min, fk_max, - ii_lo, ii_hi, jj_lo, jj_hi, kk_lo, kk_hi); - report_cuda_error("cudaDeviceSynchronize restrict3", sync_err); - return 1; - } - int host_error_flag = 0; err = cudaMemcpy(&host_error_flag, cache.error_flag.ptr, sizeof(int), cudaMemcpyDeviceToHost); if (err != cudaSuccess)