diff --git a/src/common/hist_util.cc b/src/common/hist_util.cc index 1ac058645..78a5c950b 100644 --- a/src/common/hist_util.cc +++ b/src/common/hist_util.cc @@ -178,8 +178,7 @@ uint32_t HistCutMatrix::GetBinIdx(const Entry& e) { void GHistIndexMatrix::Init(DMatrix* p_fmat, int max_num_bins) { cut.Init(p_fmat, max_num_bins); - const int32_t nthread = omp_get_max_threads(); - // const int nthread = 1; + const size_t nthread = omp_get_max_threads(); const uint32_t nbins = cut.row_ptr.back(); hit_count.resize(nbins, 0); hit_count_tloc_.resize(nthread * nbins, 0); @@ -197,17 +196,21 @@ void GHistIndexMatrix::Init(DMatrix* p_fmat, int max_num_bins) { size_t prev_sum = 0; for (const auto &batch : p_fmat->GetRowBatches()) { - MemStackAllocator partial_sums(nthread); + // The number of threads is pegged to the batch size. If the OMP + // block is parallelized on anything other than the batch/block size, + // it should be reassigned + const size_t batch_threads = std::min(batch.Size(), static_cast(omp_get_max_threads())); + MemStackAllocator partial_sums(batch_threads); size_t* p_part = partial_sums.Get(); - size_t block_size = batch.Size() / nthread; + size_t block_size = batch.Size() / batch_threads; - #pragma omp parallel num_threads(nthread) + #pragma omp parallel num_threads(batch_threads) { #pragma omp for - for (int32_t tid = 0; tid < nthread; ++tid) { + for (int32_t tid = 0; tid < batch_threads; ++tid) { size_t ibegin = block_size * tid; - size_t iend = (tid == (nthread-1) ? batch.Size() : (block_size * (tid+1))); + size_t iend = (tid == (batch_threads-1) ? batch.Size() : (block_size * (tid+1))); size_t sum = 0; for (size_t i = ibegin; i < iend; ++i) { @@ -219,15 +222,15 @@ void GHistIndexMatrix::Init(DMatrix* p_fmat, int max_num_bins) { #pragma omp single { p_part[0] = prev_sum; - for (int32_t i = 1; i < nthread; ++i) { + for (int32_t i = 1; i < batch_threads; ++i) { p_part[i] = p_part[i - 1] + row_ptr[rbegin + i*block_size]; } } #pragma omp for - for (int32_t tid = 0; tid < nthread; ++tid) { + for (int32_t tid = 0; tid < batch_threads; ++tid) { size_t ibegin = block_size * tid; - size_t iend = (tid == (nthread-1) ? batch.Size() : (block_size * (tid+1))); + size_t iend = (tid == (batch_threads-1) ? batch.Size() : (block_size * (tid+1))); for (size_t i = ibegin; i < iend; ++i) { row_ptr[rbegin + 1 + i] += p_part[tid]; @@ -235,13 +238,12 @@ void GHistIndexMatrix::Init(DMatrix* p_fmat, int max_num_bins) { } } - index.resize(row_ptr.back()); + index.resize(row_ptr[rbegin + batch.Size()]); CHECK_GT(cut.cut.size(), 0U); - auto bsize = static_cast(batch.Size()); - #pragma omp parallel for num_threads(nthread) schedule(static) - for (omp_ulong i = 0; i < bsize; ++i) { // NOLINT(*) + #pragma omp parallel for num_threads(batch_threads) schedule(static) + for (omp_ulong i = 0; i < batch.Size(); ++i) { // NOLINT(*) const int tid = omp_get_thread_num(); size_t ibegin = row_ptr[rbegin + i]; size_t iend = row_ptr[rbegin + i + 1]; diff --git a/tests/cpp/common/test_column_matrix.cc b/tests/cpp/common/test_column_matrix.cc index 9afd10c2a..83cbefe57 100644 --- a/tests/cpp/common/test_column_matrix.cc +++ b/tests/cpp/common/test_column_matrix.cc @@ -50,5 +50,23 @@ TEST(DenseColumnWithMissing, Test) { } delete dmat; } + +void +TestGHistIndexMatrixCreation(size_t nthreads) { + /* This should create multiple sparse pages */ + std::unique_ptr dmat = CreateSparsePageDMatrix(1024, 1024); + omp_set_num_threads(nthreads); + GHistIndexMatrix gmat; + gmat.Init(dmat.get(), 256); +} + +TEST(HistIndexCreationWithExternalMemory, Test) { + // Vary the number of threads to make sure that the last batch + // is distributed properly to the available number of threads + // in the thread pool + TestGHistIndexMatrixCreation(20); + TestGHistIndexMatrixCreation(30); + TestGHistIndexMatrixCreation(40); +} } // namespace common } // namespace xgboost