From 9b530e5697545ae384297889bf86b5568f520f4f Mon Sep 17 00:00:00 2001 From: Louis Desreumaux Date: Thu, 25 Feb 2021 06:56:16 +0100 Subject: [PATCH] Improve OpenMP exception handling (#6680) --- R-package/src/xgboost_R.cc | 38 +++-- include/xgboost/data.h | 18 ++- plugin/lz4/sparse_page_lz4_format.cc | 30 ++-- src/common/column_matrix.h | 5 +- src/common/hist_util.cc | 108 +++++++------ src/common/hist_util.h | 5 +- src/common/quantile.cc | 12 +- src/common/threading_utils.h | 28 ++-- src/common/transform.h | 10 +- src/data/data.cc | 22 ++- src/gbm/gblinear.cc | 11 +- src/gbm/gbtree.cc | 6 +- src/linear/coordinate_common.h | 63 +++++--- src/linear/updater_shotgun.cc | 60 +++---- src/metric/elementwise_metric.cu | 10 +- src/metric/multiclass_metric.cu | 23 +-- src/metric/rank_metric.cc | 225 +++++++++++++++------------ src/metric/survival_metric.cu | 16 +- src/objective/rank_obj.cu | 124 ++++++++------- src/objective/regression_obj.cu | 6 +- src/predictor/cpu_predictor.cc | 21 ++- src/tree/updater_basemaker-inl.h | 35 +++-- src/tree/updater_colmaker.cc | 30 ++-- src/tree/updater_histmaker.cc | 82 ++++++---- src/tree/updater_quantile_hist.cc | 66 ++++---- src/tree/updater_refresh.cc | 31 ++-- 26 files changed, 610 insertions(+), 475 deletions(-) diff --git a/R-package/src/xgboost_R.cc b/R-package/src/xgboost_R.cc index 97f17f300..8bff4212a 100644 --- a/R-package/src/xgboost_R.cc +++ b/R-package/src/xgboost_R.cc @@ -1,6 +1,7 @@ // Copyright (c) 2014 by Contributors #include #include +#include #include #include #include @@ -92,12 +93,16 @@ SEXP XGDMatrixCreateFromMat_R(SEXP mat, din = REAL(mat); } std::vector data(nrow * ncol); + dmlc::OMPException exc; #pragma omp parallel for schedule(static) for (omp_ulong i = 0; i < nrow; ++i) { - for (size_t j = 0; j < ncol; ++j) { - data[i * ncol +j] = is_int ? static_cast(iin[i + nrow * j]) : din[i + nrow * j]; - } + exc.Run([&]() { + for (size_t j = 0; j < ncol; ++j) { + data[i * ncol +j] = is_int ? static_cast(iin[i + nrow * j]) : din[i + nrow * j]; + } + }); } + exc.Rethrow(); DMatrixHandle handle; CHECK_CALL(XGDMatrixCreateFromMat(BeginPtr(data), nrow, ncol, asReal(missing), &handle)); ret = PROTECT(R_MakeExternalPtr(handle, R_NilValue, R_NilValue)); @@ -126,11 +131,15 @@ SEXP XGDMatrixCreateFromCSC_R(SEXP indptr, for (size_t i = 0; i < nindptr; ++i) { col_ptr_[i] = static_cast(p_indptr[i]); } + dmlc::OMPException exc; #pragma omp parallel for schedule(static) for (int64_t i = 0; i < static_cast(ndata); ++i) { - indices_[i] = static_cast(p_indices[i]); - data_[i] = static_cast(p_data[i]); + exc.Run([&]() { + indices_[i] = static_cast(p_indices[i]); + data_[i] = static_cast(p_data[i]); + }); } + exc.Rethrow(); DMatrixHandle handle; CHECK_CALL(XGDMatrixCreateFromCSCEx(BeginPtr(col_ptr_), BeginPtr(indices_), BeginPtr(data_), nindptr, ndata, @@ -175,12 +184,16 @@ SEXP XGDMatrixSetInfo_R(SEXP handle, SEXP field, SEXP array) { R_API_BEGIN(); int len = length(array); const char *name = CHAR(asChar(field)); + dmlc::OMPException exc; if (!strcmp("group", name)) { std::vector vec(len); #pragma omp parallel for schedule(static) for (int i = 0; i < len; ++i) { - vec[i] = static_cast(INTEGER(array)[i]); + exc.Run([&]() { + vec[i] = static_cast(INTEGER(array)[i]); + }); } + exc.Rethrow(); CHECK_CALL(XGDMatrixSetUIntInfo(R_ExternalPtrAddr(handle), CHAR(asChar(field)), BeginPtr(vec), len)); @@ -188,8 +201,11 @@ SEXP XGDMatrixSetInfo_R(SEXP handle, SEXP field, SEXP array) { std::vector vec(len); #pragma omp parallel for schedule(static) for (int i = 0; i < len; ++i) { - vec[i] = REAL(array)[i]; + exc.Run([&]() { + vec[i] = REAL(array)[i]; + }); } + exc.Rethrow(); CHECK_CALL(XGDMatrixSetFloatInfo(R_ExternalPtrAddr(handle), CHAR(asChar(field)), BeginPtr(vec), len)); @@ -280,11 +296,15 @@ SEXP XGBoosterBoostOneIter_R(SEXP handle, SEXP dtrain, SEXP grad, SEXP hess) { << "gradient and hess must have same length"; int len = length(grad); std::vector tgrad(len), thess(len); + dmlc::OMPException exc; #pragma omp parallel for schedule(static) for (int j = 0; j < len; ++j) { - tgrad[j] = REAL(grad)[j]; - thess[j] = REAL(hess)[j]; + exc.Run([&]() { + tgrad[j] = REAL(grad)[j]; + thess[j] = REAL(hess)[j]; + }); } + exc.Rethrow(); CHECK_CALL(XGBoosterBoostOneIter(R_ExternalPtrAddr(handle), R_ExternalPtrAddr(dtrain), BeginPtr(tgrad), BeginPtr(thess), diff --git a/include/xgboost/data.h b/include/xgboost/data.h index 7a16b77e2..27aa81577 100644 --- a/include/xgboost/data.h +++ b/include/xgboost/data.h @@ -290,15 +290,19 @@ class SparsePage { void SortRows() { auto ncol = static_cast(this->Size()); -#pragma omp parallel for default(none) shared(ncol) schedule(dynamic, 1) + dmlc::OMPException exc; +#pragma omp parallel for schedule(dynamic, 1) for (bst_omp_uint i = 0; i < ncol; ++i) { - if (this->offset.HostVector()[i] < this->offset.HostVector()[i + 1]) { - std::sort( - this->data.HostVector().begin() + this->offset.HostVector()[i], - this->data.HostVector().begin() + this->offset.HostVector()[i + 1], - Entry::CmpValue); - } + exc.Run([&]() { + if (this->offset.HostVector()[i] < this->offset.HostVector()[i + 1]) { + std::sort( + this->data.HostVector().begin() + this->offset.HostVector()[i], + this->data.HostVector().begin() + this->offset.HostVector()[i + 1], + Entry::CmpValue); + } + }); } + exc.Rethrow(); } /** diff --git a/plugin/lz4/sparse_page_lz4_format.cc b/plugin/lz4/sparse_page_lz4_format.cc index 238d431c2..139cb446a 100644 --- a/plugin/lz4/sparse_page_lz4_format.cc +++ b/plugin/lz4/sparse_page_lz4_format.cc @@ -250,14 +250,18 @@ class SparsePageLZ4Format : public SparsePageFormat { int nindex = index_.num_chunk(); int nvalue = value_.num_chunk(); int ntotal = nindex + nvalue; - #pragma omp parallel for schedule(dynamic, 1) num_threads(nthread_write_) + dmlc::OMPException exc; + #pragma omp parallel for schedule(dynamic, 1) num_threads(nthread_write_) for (int i = 0; i < ntotal; ++i) { - if (i < nindex) { - index_.Compress(i, use_lz4_hc_); - } else { - value_.Compress(i - nindex, use_lz4_hc_); - } + exc.Run([&]() { + if (i < nindex) { + index_.Compress(i, use_lz4_hc_); + } else { + value_.Compress(i - nindex, use_lz4_hc_); + } + }); } + exc.Rethrow(); index_.Write(fo); value_.Write(fo); // statistics @@ -276,14 +280,18 @@ class SparsePageLZ4Format : public SparsePageFormat { int nindex = index_.num_chunk(); int nvalue = value_.num_chunk(); int ntotal = nindex + nvalue; + dmlc::OMPException exc; #pragma omp parallel for schedule(dynamic, 1) num_threads(nthread_) for (int i = 0; i < ntotal; ++i) { - if (i < nindex) { - index_.Decompress(i); - } else { - value_.Decompress(i - nindex); - } + exc.Run([&]() { + if (i < nindex) { + index_.Decompress(i); + } else { + value_.Decompress(i - nindex); + } + }); } + exc.Rethrow(); } private: diff --git a/src/common/column_matrix.h b/src/common/column_matrix.h index 2f43b828d..aeb0f309a 100644 --- a/src/common/column_matrix.h +++ b/src/common/column_matrix.h @@ -230,8 +230,7 @@ class ColumnMatrix { /* missing values make sense only for column with type kDenseColumn, and if no missing values were observed it could be handled much faster. */ if (noMissingValues) { -#pragma omp parallel for num_threads(omp_get_max_threads()) - for (omp_ulong rid = 0; rid < nrow; ++rid) { + ParallelFor(omp_ulong(nrow), [&](omp_ulong rid) { const size_t ibegin = rid*nfeature; const size_t iend = (rid+1)*nfeature; size_t j = 0; @@ -239,7 +238,7 @@ class ColumnMatrix { const size_t idx = feature_offsets_[j]; local_index[idx + rid] = index[i]; } - } + }); } else { /* to handle rows in all batches, sum of all batch sizes equal to gmat.row_ptr.size() - 1 */ size_t rbegin = 0; diff --git a/src/common/hist_util.cc b/src/common/hist_util.cc index c0031c047..75096c654 100644 --- a/src/common/hist_util.cc +++ b/src/common/hist_util.cc @@ -84,38 +84,46 @@ void GHistIndexMatrix::Init(DMatrix* p_fmat, int max_bins) { size_t block_size = batch.Size() / batch_threads; + dmlc::OMPException exc; #pragma omp parallel num_threads(batch_threads) { #pragma omp for for (omp_ulong tid = 0; tid < batch_threads; ++tid) { - size_t ibegin = block_size * tid; - size_t iend = (tid == (batch_threads-1) ? batch.Size() : (block_size * (tid+1))); + exc.Run([&]() { + size_t ibegin = block_size * tid; + size_t iend = (tid == (batch_threads-1) ? batch.Size() : (block_size * (tid+1))); - size_t sum = 0; - for (size_t i = ibegin; i < iend; ++i) { - sum += page[i].size(); - row_ptr[rbegin + 1 + i] = sum; - } + size_t sum = 0; + for (size_t i = ibegin; i < iend; ++i) { + sum += page[i].size(); + row_ptr[rbegin + 1 + i] = sum; + } + }); } #pragma omp single { - p_part[0] = prev_sum; - for (size_t i = 1; i < batch_threads; ++i) { - p_part[i] = p_part[i - 1] + row_ptr[rbegin + i*block_size]; - } + exc.Run([&]() { + p_part[0] = prev_sum; + for (size_t i = 1; i < batch_threads; ++i) { + p_part[i] = p_part[i - 1] + row_ptr[rbegin + i*block_size]; + } + }); } #pragma omp for for (omp_ulong tid = 0; tid < batch_threads; ++tid) { - size_t ibegin = block_size * tid; - size_t iend = (tid == (batch_threads-1) ? batch.Size() : (block_size * (tid+1))); + exc.Run([&]() { + size_t ibegin = block_size * tid; + size_t iend = (tid == (batch_threads-1) ? batch.Size() : (block_size * (tid+1))); - for (size_t i = ibegin; i < iend; ++i) { - row_ptr[rbegin + 1 + i] += p_part[tid]; - } + for (size_t i = ibegin; i < iend; ++i) { + row_ptr[rbegin + 1 + i] += p_part[tid]; + } + }); } } + exc.Rethrow(); const size_t n_offsets = cut.Ptrs().size() - 1; const size_t n_index = row_ptr[rbegin + batch.Size()]; @@ -167,13 +175,12 @@ void GHistIndexMatrix::Init(DMatrix* p_fmat, int max_bins) { [](auto idx, auto) { return idx; }); } - #pragma omp parallel for num_threads(nthread) schedule(static) - for (bst_omp_uint idx = 0; idx < bst_omp_uint(nbins); ++idx) { + ParallelFor(bst_omp_uint(nbins), nthread, [&](bst_omp_uint idx) { for (int32_t tid = 0; tid < nthread; ++tid) { hit_count[idx] += hit_count_tloc_[tid * nbins + idx]; hit_count_tloc_[tid * nbins + idx] = 0; // reset for next batch } - } + }); prev_sum = row_ptr[rbegin + batch.Size()]; rbegin += batch.Size(); @@ -701,7 +708,7 @@ void GHistBuilder::BuildBlockHist(const std::vector& const RowSetCollection::Elem row_indices, const GHistIndexBlockMatrix& gmatb, GHistRowT hist) { - constexpr int kUnroll = 8; // loop unrolling factor + static constexpr int kUnroll = 8; // loop unrolling factor const size_t nblock = gmatb.GetNumBlock(); const size_t nrows = row_indices.end - row_indices.begin; const size_t rest = nrows % kUnroll; @@ -710,40 +717,44 @@ void GHistBuilder::BuildBlockHist(const std::vector& #endif // defined(_OPENMP) xgboost::detail::GradientPairInternal* p_hist = hist.data(); + dmlc::OMPException exc; #pragma omp parallel for num_threads(nthread) schedule(guided) for (bst_omp_uint bid = 0; bid < nblock; ++bid) { - auto gmat = gmatb[bid]; + exc.Run([&]() { + auto gmat = gmatb[bid]; - for (size_t i = 0; i < nrows - rest; i += kUnroll) { - size_t rid[kUnroll]; - size_t ibegin[kUnroll]; - size_t iend[kUnroll]; - GradientPair stat[kUnroll]; + for (size_t i = 0; i < nrows - rest; i += kUnroll) { + size_t rid[kUnroll]; + size_t ibegin[kUnroll]; + size_t iend[kUnroll]; + GradientPair stat[kUnroll]; - for (int k = 0; k < kUnroll; ++k) { - rid[k] = row_indices.begin[i + k]; - ibegin[k] = gmat.row_ptr[rid[k]]; - iend[k] = gmat.row_ptr[rid[k] + 1]; - stat[k] = gpair[rid[k]]; - } - for (int k = 0; k < kUnroll; ++k) { - for (size_t j = ibegin[k]; j < iend[k]; ++j) { - const uint32_t bin = gmat.index[j]; - p_hist[bin].Add(stat[k].GetGrad(), stat[k].GetHess()); + for (int k = 0; k < kUnroll; ++k) { + rid[k] = row_indices.begin[i + k]; + ibegin[k] = gmat.row_ptr[rid[k]]; + iend[k] = gmat.row_ptr[rid[k] + 1]; + stat[k] = gpair[rid[k]]; + } + for (int k = 0; k < kUnroll; ++k) { + for (size_t j = ibegin[k]; j < iend[k]; ++j) { + const uint32_t bin = gmat.index[j]; + p_hist[bin].Add(stat[k].GetGrad(), stat[k].GetHess()); + } } } - } - for (size_t i = nrows - rest; i < nrows; ++i) { - const size_t rid = row_indices.begin[i]; - const size_t ibegin = gmat.row_ptr[rid]; - const size_t iend = gmat.row_ptr[rid + 1]; - const GradientPair stat = gpair[rid]; - for (size_t j = ibegin; j < iend; ++j) { - const uint32_t bin = gmat.index[j]; - p_hist[bin].Add(stat.GetGrad(), stat.GetHess()); + for (size_t i = nrows - rest; i < nrows; ++i) { + const size_t rid = row_indices.begin[i]; + const size_t ibegin = gmat.row_ptr[rid]; + const size_t iend = gmat.row_ptr[rid + 1]; + const GradientPair stat = gpair[rid]; + for (size_t j = ibegin; j < iend; ++j) { + const uint32_t bin = gmat.index[j]; + p_hist[bin].Add(stat.GetGrad(), stat.GetHess()); + } } - } + }); } + exc.Rethrow(); } template void GHistBuilder::BuildBlockHist(const std::vector& gpair, @@ -768,12 +779,11 @@ void GHistBuilder::SubtractionTrick(GHistRowT self, const size_t block_size = 1024; // aproximatly 1024 values per block size_t n_blocks = size/block_size + !!(size%block_size); -#pragma omp parallel for - for (omp_ulong iblock = 0; iblock < n_blocks; ++iblock) { + ParallelFor(omp_ulong(n_blocks), [&](omp_ulong iblock) { const size_t ibegin = iblock*block_size; const size_t iend = (((iblock+1)*block_size > size) ? size : ibegin + block_size); SubtractionHist(self, parent, sibling, ibegin, iend); - } + }); } template void GHistBuilder::SubtractionTrick(GHistRow self, diff --git a/src/common/hist_util.h b/src/common/hist_util.h index eadf3e379..a4c9a0cc1 100644 --- a/src/common/hist_util.h +++ b/src/common/hist_util.h @@ -257,8 +257,7 @@ struct GHistIndexMatrix { const size_t batch_size = batch.Size(); CHECK_LT(batch_size, offset_vec.size()); BinIdxType* index_data = index_data_span.data(); -#pragma omp parallel for num_threads(batch_threads) schedule(static) - for (omp_ulong i = 0; i < batch_size; ++i) { + ParallelFor(omp_ulong(batch_size), batch_threads, [&](omp_ulong i) { const int tid = omp_get_thread_num(); size_t ibegin = row_ptr[rbegin + i]; size_t iend = row_ptr[rbegin + i + 1]; @@ -270,7 +269,7 @@ struct GHistIndexMatrix { index_data[ibegin + j] = get_offset(idx, j); ++hit_count_tloc_[tid * nbins + idx]; } - } + }); } void ResizeIndex(const size_t n_index, diff --git a/src/common/quantile.cc b/src/common/quantile.cc index 9ab48a304..335206993 100644 --- a/src/common/quantile.cc +++ b/src/common/quantile.cc @@ -35,7 +35,7 @@ HostSketchContainer::CalcColumnSize(SparsePage const &batch, column.resize(n_columns, 0); } - ParallelFor(page.Size(), nthreads, [&](size_t i) { + ParallelFor(omp_ulong(page.Size()), nthreads, [&](omp_ulong i) { auto &local_column_sizes = column_sizes.at(omp_get_thread_num()); auto row = page[i]; auto const *p_row = row.data(); @@ -44,7 +44,7 @@ HostSketchContainer::CalcColumnSize(SparsePage const &batch, } }); std::vector entries_per_columns(n_columns, 0); - ParallelFor(n_columns, nthreads, [&](size_t i) { + ParallelFor(bst_omp_uint(n_columns), nthreads, [&](bst_omp_uint i) { for (auto const &thread : column_sizes) { entries_per_columns[i] += thread[i]; } @@ -99,15 +99,15 @@ void HostSketchContainer::PushRowPage(SparsePage const &page, std::vector const &group_ptr = info.group_ptr_; // Use group index for weights? auto batch = page.GetView(); - dmlc::OMPException exec; // Parallel over columns. Each thread owns a set of consecutive columns. auto const ncol = static_cast(info.num_col_); auto const is_dense = info.num_nonzero_ == info.num_col_ * info.num_row_; auto thread_columns_ptr = LoadBalance(page, info.num_col_, nthread); + dmlc::OMPException exc; #pragma omp parallel num_threads(nthread) { - exec.Run([&]() { + exc.Run([&]() { auto tid = static_cast(omp_get_thread_num()); auto const begin = thread_columns_ptr[tid]; auto const end = thread_columns_ptr[tid + 1]; @@ -140,7 +140,7 @@ void HostSketchContainer::PushRowPage(SparsePage const &page, } }); } - exec.Rethrow(); + exc.Rethrow(); monitor_.Stop(__func__); } @@ -242,7 +242,7 @@ size_t nbytes = 0; &global_sketches); std::vector final_sketches(n_columns); - ParallelFor(n_columns, omp_get_max_threads(), [&](size_t fidx) { + ParallelFor(omp_ulong(n_columns), [&](omp_ulong fidx) { int32_t intermediate_num_cuts = num_cuts[fidx]; auto nbytes = WQSketch::SummaryContainer::CalcMemCost(intermediate_num_cuts); diff --git a/src/common/threading_utils.h b/src/common/threading_utils.h index 896098aac..21ba4b41b 100644 --- a/src/common/threading_utils.h +++ b/src/common/threading_utils.h @@ -115,11 +115,10 @@ void ParallelFor2d(const BlockedSpace2d& space, int nthreads, Func func) { nthreads = std::min(nthreads, omp_get_max_threads()); nthreads = std::max(nthreads, 1); - dmlc::OMPException omp_exc; + dmlc::OMPException exc; #pragma omp parallel num_threads(nthreads) { - omp_exc.Run( - [](size_t num_blocks_in_space, const BlockedSpace2d& space, int nthreads, Func func) { + exc.Run([&]() { size_t tid = omp_get_thread_num(); size_t chunck_size = num_blocks_in_space / nthreads + !!(num_blocks_in_space % nthreads); @@ -129,19 +128,24 @@ void ParallelFor2d(const BlockedSpace2d& space, int nthreads, Func func) { for (auto i = begin; i < end; i++) { func(space.GetFirstDimension(i), space.GetRange(i)); } - }, num_blocks_in_space, space, nthreads, func); + }); } - omp_exc.Rethrow(); + exc.Rethrow(); } -template -void ParallelFor(size_t size, size_t nthreads, Func fn) { - dmlc::OMPException omp_exc; -#pragma omp parallel for num_threads(nthreads) - for (omp_ulong i = 0; i < size; ++i) { - omp_exc.Run(fn, i); +template +void ParallelFor(Index size, size_t nthreads, Func fn) { + dmlc::OMPException exc; +#pragma omp parallel for num_threads(nthreads) schedule(static) + for (Index i = 0; i < size; ++i) { + exc.Run(fn, i); } - omp_exc.Rethrow(); + exc.Rethrow(); +} + +template +void ParallelFor(Index size, Func fn) { + ParallelFor(size, omp_get_max_threads(), fn); } /* \brief Configure parallel threads. diff --git a/src/common/transform.h b/src/common/transform.h index b3ad7fdb9..85f1c0c43 100644 --- a/src/common/transform.h +++ b/src/common/transform.h @@ -16,6 +16,7 @@ #include "xgboost/span.h" #include "common.h" +#include "threading_utils.h" #if defined (__CUDACC__) #include "device_helpers.cuh" @@ -168,13 +169,10 @@ class Transform { template void LaunchCPU(Functor func, HDV*... vectors) const { omp_ulong end = static_cast(*(range_.end())); - dmlc::OMPException omp_exc; SyncHost(vectors...); -#pragma omp parallel for schedule(static) - for (omp_ulong idx = 0; idx < end; ++idx) { - omp_exc.Run(func, idx, UnpackHDV(vectors)...); - } - omp_exc.Rethrow(); + ParallelFor(end, [&](omp_ulong idx) { + func(idx, UnpackHDV(vectors)...); + }); } private: diff --git a/src/data/data.cc b/src/data/data.cc index 0e90584e0..2d5996331 100644 --- a/src/data/data.cc +++ b/src/data/data.cc @@ -829,18 +829,16 @@ SparsePage SparsePage::GetTranspose(int num_columns) const { const int nthread = omp_get_max_threads(); builder.InitBudget(num_columns, nthread); long batch_size = static_cast(this->Size()); // NOLINT(*) - auto page = this->GetView(); -#pragma omp parallel for default(none) shared(batch_size, builder, page) schedule(static) - for (long i = 0; i < batch_size; ++i) { // NOLINT(*) + auto page = this->GetView(); + common::ParallelFor(batch_size, [&](long i) { // NOLINT(*) int tid = omp_get_thread_num(); auto inst = page[i]; for (const auto& entry : inst) { builder.AddBudget(entry.index, tid); } - } + }); builder.InitStorage(); -#pragma omp parallel for default(none) shared(batch_size, builder, page) schedule(static) - for (long i = 0; i < batch_size; ++i) { // NOLINT(*) + common::ParallelFor(batch_size, [&](long i) { // NOLINT(*) int tid = omp_get_thread_num(); auto inst = page[i]; for (const auto& entry : inst) { @@ -849,7 +847,7 @@ SparsePage SparsePage::GetTranspose(int num_columns) const { Entry(static_cast(this->base_rowid + i), entry.fvalue), tid); } - } + }); return transpose; } void SparsePage::Push(const SparsePage &batch) { @@ -900,11 +898,11 @@ uint64_t SparsePage::Push(const AdapterBatchT& batch, float missing, int nthread return max_columns; } std::vector> max_columns_vector(nthread); - dmlc::OMPException exec; + dmlc::OMPException exc; // First-pass over the batch counting valid elements #pragma omp parallel num_threads(nthread) { - exec.Run([&]() { + exc.Run([&]() { int tid = omp_get_thread_num(); size_t begin = tid*thread_size; size_t end = tid != (nthread-1) ? (tid+1)*thread_size : batch_size; @@ -929,7 +927,7 @@ uint64_t SparsePage::Push(const AdapterBatchT& batch, float missing, int nthread } }); } - exec.Rethrow(); + exc.Rethrow(); for (const auto & max : max_columns_vector) { max_columns = std::max(max_columns, max[0]); } @@ -940,7 +938,7 @@ uint64_t SparsePage::Push(const AdapterBatchT& batch, float missing, int nthread #pragma omp parallel num_threads(nthread) { - exec.Run([&]() { + exc.Run([&]() { int tid = omp_get_thread_num(); size_t begin = tid*thread_size; size_t end = tid != (nthread-1) ? (tid+1)*thread_size : batch_size; @@ -956,7 +954,7 @@ uint64_t SparsePage::Push(const AdapterBatchT& batch, float missing, int nthread } }); } - exec.Rethrow(); + exc.Rethrow(); omp_set_num_threads(nthread_original); return max_columns; diff --git a/src/gbm/gblinear.cc b/src/gbm/gblinear.cc index e9c2f27aa..2cf2edf7f 100644 --- a/src/gbm/gblinear.cc +++ b/src/gbm/gblinear.cc @@ -23,6 +23,7 @@ #include "gblinear_model.h" #include "../common/timer.h" #include "../common/common.h" +#include "../common/threading_utils.h" namespace xgboost { namespace gbm { @@ -178,8 +179,7 @@ class GBLinear : public GradientBooster { // parallel over local batch const auto nsize = static_cast(batch.Size()); auto page = batch.GetView(); -#pragma omp parallel for schedule(static) - for (bst_omp_uint i = 0; i < nsize; ++i) { + common::ParallelFor(nsize, [&](bst_omp_uint i) { auto inst = page[i]; auto row_idx = static_cast(batch.base_rowid + i); // loop over output groups @@ -195,7 +195,7 @@ class GBLinear : public GradientBooster { ((base_margin.size() != 0) ? base_margin[row_idx * ngroup + gid] : learner_model_param_->base_score); } - } + }); } } @@ -246,8 +246,7 @@ class GBLinear : public GradientBooster { if (base_margin.size() != 0) { CHECK_EQ(base_margin.size(), nsize * ngroup); } -#pragma omp parallel for schedule(static) - for (omp_ulong i = 0; i < nsize; ++i) { + common::ParallelFor(nsize, [&](omp_ulong i) { const size_t ridx = page.base_rowid + i; // loop over output groups for (int gid = 0; gid < ngroup; ++gid) { @@ -256,7 +255,7 @@ class GBLinear : public GradientBooster { base_margin[ridx * ngroup + gid] : learner_model_param_->base_score; this->Pred(batch[i], &preds[ridx * ngroup], gid, margin); } - } + }); } monitor_.Stop("PredictBatchInternal"); } diff --git a/src/gbm/gbtree.cc b/src/gbm/gbtree.cc index a8385fa86..be23f4645 100644 --- a/src/gbm/gbtree.cc +++ b/src/gbm/gbtree.cc @@ -27,6 +27,7 @@ #include "../common/common.h" #include "../common/random.h" #include "../common/timer.h" +#include "../common/threading_utils.h" namespace xgboost { namespace gbm { @@ -219,10 +220,9 @@ void GBTree::DoBoost(DMatrix* p_fmat, bool update_predict = true; for (int gid = 0; gid < ngroup; ++gid) { std::vector& tmp_h = tmp.HostVector(); -#pragma omp parallel for schedule(static) - for (bst_omp_uint i = 0; i < nsize; ++i) { + common::ParallelFor(nsize, [&](bst_omp_uint i) { tmp_h[i] = gpair_h[i * ngroup + gid]; - } + }); std::vector > ret; BoostNewTrees(&tmp, p_fmat, gid, &ret); const size_t num_new_trees = ret.size(); diff --git a/src/linear/coordinate_common.h b/src/linear/coordinate_common.h index 7974babbe..e301d062c 100644 --- a/src/linear/coordinate_common.h +++ b/src/linear/coordinate_common.h @@ -14,6 +14,7 @@ #include "./param.h" #include "../gbm/gblinear_model.h" #include "../common/random.h" +#include "../common/threading_utils.h" namespace xgboost { namespace linear { @@ -115,14 +116,18 @@ inline std::pair GetGradientParallel(int group_idx, int num_grou auto page = batch.GetView(); auto col = page[fidx]; const auto ndata = static_cast(col.size()); + dmlc::OMPException exc; #pragma omp parallel for schedule(static) reduction(+ : sum_grad, sum_hess) for (bst_omp_uint j = 0; j < ndata; ++j) { - const bst_float v = col[j].fvalue; - auto &p = gpair[col[j].index * num_group + group_idx]; - if (p.GetHess() < 0.0f) continue; - sum_grad += p.GetGrad() * v; - sum_hess += p.GetHess() * v * v; + exc.Run([&]() { + const bst_float v = col[j].fvalue; + auto &p = gpair[col[j].index * num_group + group_idx]; + if (p.GetHess() < 0.0f) return; + sum_grad += p.GetGrad() * v; + sum_hess += p.GetHess() * v * v; + }); } + exc.Rethrow(); } return std::make_pair(sum_grad, sum_hess); } @@ -142,14 +147,18 @@ inline std::pair GetBiasGradientParallel(int group_idx, int num_ DMatrix *p_fmat) { double sum_grad = 0.0, sum_hess = 0.0; const auto ndata = static_cast(p_fmat->Info().num_row_); + dmlc::OMPException exc; #pragma omp parallel for schedule(static) reduction(+ : sum_grad, sum_hess) for (bst_omp_uint i = 0; i < ndata; ++i) { - auto &p = gpair[i * num_group + group_idx]; - if (p.GetHess() >= 0.0f) { - sum_grad += p.GetGrad(); - sum_hess += p.GetHess(); - } + exc.Run([&]() { + auto &p = gpair[i * num_group + group_idx]; + if (p.GetHess() >= 0.0f) { + sum_grad += p.GetGrad(); + sum_hess += p.GetHess(); + } + }); } + exc.Rethrow(); return std::make_pair(sum_grad, sum_hess); } @@ -172,12 +181,16 @@ inline void UpdateResidualParallel(int fidx, int group_idx, int num_group, auto col = page[fidx]; // update grad value const auto num_row = static_cast(col.size()); + dmlc::OMPException exc; #pragma omp parallel for schedule(static) for (bst_omp_uint j = 0; j < num_row; ++j) { - GradientPair &p = (*in_gpair)[col[j].index * num_group + group_idx]; - if (p.GetHess() < 0.0f) continue; - p += GradientPair(p.GetHess() * col[j].fvalue * dw, 0); + exc.Run([&]() { + GradientPair &p = (*in_gpair)[col[j].index * num_group + group_idx]; + if (p.GetHess() < 0.0f) return; + p += GradientPair(p.GetHess() * col[j].fvalue * dw, 0); + }); } + exc.Rethrow(); } } @@ -195,12 +208,16 @@ inline void UpdateBiasResidualParallel(int group_idx, int num_group, float dbias DMatrix *p_fmat) { if (dbias == 0.0f) return; const auto ndata = static_cast(p_fmat->Info().num_row_); + dmlc::OMPException exc; #pragma omp parallel for schedule(static) for (bst_omp_uint i = 0; i < ndata; ++i) { - GradientPair &g = (*in_gpair)[i * num_group + group_idx]; - if (g.GetHess() < 0.0f) continue; - g += GradientPair(g.GetHess() * dbias, 0); + exc.Run([&]() { + GradientPair &g = (*in_gpair)[i * num_group + group_idx]; + if (g.GetHess() < 0.0f) return; + g += GradientPair(g.GetHess() * dbias, 0); + }); } + exc.Rethrow(); } /** @@ -336,10 +353,9 @@ class GreedyFeatureSelector : public FeatureSelector { const bst_omp_uint nfeat = model.learner_model_param->num_feature; // Calculate univariate gradient sums std::fill(gpair_sums_.begin(), gpair_sums_.end(), std::make_pair(0., 0.)); - for (const auto &batch : p_fmat->GetBatches()) { - auto page = batch.GetView(); -#pragma omp parallel for schedule(static) - for (bst_omp_uint i = 0; i < nfeat; ++i) { + for (const auto &batch : p_fmat->GetBatches()) { + auto page = batch.GetView(); + common::ParallelFor(nfeat, [&](bst_omp_uint i) { const auto col = page[i]; const bst_uint ndata = col.size(); auto &sums = gpair_sums_[group_idx * nfeat + i]; @@ -350,7 +366,7 @@ class GreedyFeatureSelector : public FeatureSelector { sums.first += p.GetGrad() * v; sums.second += p.GetHess() * v * v; } - } + }); } // Find a feature with the largest magnitude of weight change int best_fidx = 0; @@ -405,8 +421,7 @@ class ThriftyFeatureSelector : public FeatureSelector { for (const auto &batch : p_fmat->GetBatches()) { auto page = batch.GetView(); // column-parallel is usually fastaer than row-parallel -#pragma omp parallel for schedule(static) - for (bst_omp_uint i = 0; i < nfeat; ++i) { + common::ParallelFor(nfeat, [&](bst_omp_uint i) { const auto col = page[i]; const bst_uint ndata = col.size(); for (bst_uint gid = 0u; gid < ngroup; ++gid) { @@ -419,7 +434,7 @@ class ThriftyFeatureSelector : public FeatureSelector { sums.second += p.GetHess() * v * v; } } - } + }); } // rank by descending weight magnitude within the groups std::fill(deltaw_.begin(), deltaw_.end(), 0.f); diff --git a/src/linear/updater_shotgun.cc b/src/linear/updater_shotgun.cc index a9321ba9a..70f4e98d0 100644 --- a/src/linear/updater_shotgun.cc +++ b/src/linear/updater_shotgun.cc @@ -54,38 +54,42 @@ class ShotgunUpdater : public LinearUpdater { for (const auto &batch : p_fmat->GetBatches()) { auto page = batch.GetView(); const auto nfeat = static_cast(batch.Size()); + dmlc::OMPException exc; #pragma omp parallel for schedule(static) for (bst_omp_uint i = 0; i < nfeat; ++i) { - int ii = selector_->NextFeature - (i, *model, 0, in_gpair->ConstHostVector(), p_fmat, param_.reg_alpha_denorm, - param_.reg_lambda_denorm); - if (ii < 0) continue; - const bst_uint fid = ii; - auto col = page[ii]; - for (int gid = 0; gid < ngroup; ++gid) { - double sum_grad = 0.0, sum_hess = 0.0; - for (auto& c : col) { - const GradientPair &p = gpair[c.index * ngroup + gid]; - if (p.GetHess() < 0.0f) continue; - const bst_float v = c.fvalue; - sum_grad += p.GetGrad() * v; - sum_hess += p.GetHess() * v * v; + exc.Run([&]() { + int ii = selector_->NextFeature + (i, *model, 0, in_gpair->ConstHostVector(), p_fmat, param_.reg_alpha_denorm, + param_.reg_lambda_denorm); + if (ii < 0) return; + const bst_uint fid = ii; + auto col = page[ii]; + for (int gid = 0; gid < ngroup; ++gid) { + double sum_grad = 0.0, sum_hess = 0.0; + for (auto& c : col) { + const GradientPair &p = gpair[c.index * ngroup + gid]; + if (p.GetHess() < 0.0f) continue; + const bst_float v = c.fvalue; + sum_grad += p.GetGrad() * v; + sum_hess += p.GetHess() * v * v; + } + bst_float &w = (*model)[fid][gid]; + auto dw = static_cast( + param_.learning_rate * + CoordinateDelta(sum_grad, sum_hess, w, param_.reg_alpha_denorm, + param_.reg_lambda_denorm)); + if (dw == 0.f) continue; + w += dw; + // update grad values + for (auto& c : col) { + GradientPair &p = gpair[c.index * ngroup + gid]; + if (p.GetHess() < 0.0f) continue; + p += GradientPair(p.GetHess() * c.fvalue * dw, 0); + } } - bst_float &w = (*model)[fid][gid]; - auto dw = static_cast( - param_.learning_rate * - CoordinateDelta(sum_grad, sum_hess, w, param_.reg_alpha_denorm, - param_.reg_lambda_denorm)); - if (dw == 0.f) continue; - w += dw; - // update grad values - for (auto& c : col) { - GradientPair &p = gpair[c.index * ngroup + gid]; - if (p.GetHess() < 0.0f) continue; - p += GradientPair(p.GetHess() * c.fvalue * dw, 0); - } - } + }); } + exc.Rethrow(); } } diff --git a/src/metric/elementwise_metric.cu b/src/metric/elementwise_metric.cu index 7e4e47444..f106fc547 100644 --- a/src/metric/elementwise_metric.cu +++ b/src/metric/elementwise_metric.cu @@ -47,12 +47,16 @@ class ElementWiseMetricsReduction { bst_float residue_sum = 0; bst_float weights_sum = 0; + dmlc::OMPException exc; #pragma omp parallel for reduction(+: residue_sum, weights_sum) schedule(static) for (omp_ulong i = 0; i < ndata; ++i) { - const bst_float wt = h_weights.size() > 0 ? h_weights[i] : 1.0f; - residue_sum += policy_.EvalRow(h_labels[i], h_preds[i]) * wt; - weights_sum += wt; + exc.Run([&]() { + const bst_float wt = h_weights.size() > 0 ? h_weights[i] : 1.0f; + residue_sum += policy_.EvalRow(h_labels[i], h_preds[i]) * wt; + weights_sum += wt; + }); } + exc.Rethrow(); PackedReduceResult res { residue_sum, weights_sum }; return res; } diff --git a/src/metric/multiclass_metric.cu b/src/metric/multiclass_metric.cu index 177402ca1..3b20361b0 100644 --- a/src/metric/multiclass_metric.cu +++ b/src/metric/multiclass_metric.cu @@ -53,18 +53,23 @@ class MultiClassMetricsReduction { int label_error = 0; bool const is_null_weight = weights.Size() == 0; + dmlc::OMPException exc; #pragma omp parallel for reduction(+: residue_sum, weights_sum) schedule(static) for (omp_ulong idx = 0; idx < ndata; ++idx) { - bst_float weight = is_null_weight ? 1.0f : h_weights[idx]; - auto label = static_cast(h_labels[idx]); - if (label >= 0 && label < static_cast(n_class)) { - residue_sum += EvalRowPolicy::EvalRow( - label, h_preds.data() + idx * n_class, n_class) * weight; - weights_sum += weight; - } else { - label_error = label; - } + exc.Run([&]() { + bst_float weight = is_null_weight ? 1.0f : h_weights[idx]; + auto label = static_cast(h_labels[idx]); + if (label >= 0 && label < static_cast(n_class)) { + residue_sum += EvalRowPolicy::EvalRow( + label, h_preds.data() + idx * n_class, n_class) * weight; + weights_sum += weight; + } else { + label_error = label; + } + }); } + exc.Rethrow(); + CheckLabelError(label_error, n_class); PackedReduceResult res { residue_sum, weights_sum }; diff --git a/src/metric/rank_metric.cc b/src/metric/rank_metric.cc index cfbae468c..f0ea9b8a6 100644 --- a/src/metric/rank_metric.cc +++ b/src/metric/rank_metric.cc @@ -29,6 +29,7 @@ #include "xgboost/host_device_vector.h" #include "../common/math.h" +#include "../common/threading_utils.h" #include "metric_common.h" namespace { @@ -111,10 +112,9 @@ struct EvalAMS : public Metric { PredIndPairContainer rec(ndata); const auto &h_preds = preds.ConstHostVector(); - #pragma omp parallel for schedule(static) - for (bst_omp_uint i = 0; i < ndata; ++i) { + common::ParallelFor(ndata, [&](bst_omp_uint i) { rec[i] = std::make_pair(h_preds[i], i); - } + }); XGBOOST_PARALLEL_SORT(rec.begin(), rec.end(), common::CmpFirst); auto ntop = static_cast(ratio_ * ndata); if (ntop == 0) ntop = ndata; @@ -175,49 +175,57 @@ struct EvalAuc : public Metric { const auto& labels = info.labels_.ConstHostVector(); const auto &h_preds = preds.ConstHostVector(); + dmlc::OMPException exc; #pragma omp parallel reduction(+:sum_auc, auc_error) if (ngroups > 1) { - // Each thread works on a distinct group and sorts the predictions in that group - PredIndPairContainer rec; - #pragma omp for schedule(static) - for (bst_omp_uint group_id = 0; group_id < ngroups; ++group_id) { - // Same thread can work on multiple groups one after another; hence, resize - // the predictions array based on the current group - rec.resize(gptr[group_id + 1] - gptr[group_id]); - #pragma omp parallel for schedule(static) if (!omp_in_parallel()) - for (bst_omp_uint j = gptr[group_id]; j < gptr[group_id + 1]; ++j) { - rec[j - gptr[group_id]] = {h_preds[j], j}; - } + exc.Run([&]() { + // Each thread works on a distinct group and sorts the predictions in that group + PredIndPairContainer rec; + #pragma omp for schedule(static) + for (bst_omp_uint group_id = 0; group_id < ngroups; ++group_id) { + exc.Run([&]() { + // Same thread can work on multiple groups one after another; hence, resize + // the predictions array based on the current group + rec.resize(gptr[group_id + 1] - gptr[group_id]); + #pragma omp parallel for schedule(static) if (!omp_in_parallel()) + for (bst_omp_uint j = gptr[group_id]; j < gptr[group_id + 1]; ++j) { + exc.Run([&]() { + rec[j - gptr[group_id]] = {h_preds[j], j}; + }); + } - XGBOOST_PARALLEL_SORT(rec.begin(), rec.end(), common::CmpFirst); - // calculate AUC - double sum_pospair = 0.0; - double sum_npos = 0.0, sum_nneg = 0.0, buf_pos = 0.0, buf_neg = 0.0; - for (size_t j = 0; j < rec.size(); ++j) { - const bst_float wt = WeightPolicy::GetWeightOfSortedRecord(info, rec, j, group_id); - const bst_float ctr = labels[rec[j].second]; - // keep bucketing predictions in same bucket - if (j != 0 && rec[j].first != rec[j - 1].first) { + XGBOOST_PARALLEL_SORT(rec.begin(), rec.end(), common::CmpFirst); + // calculate AUC + double sum_pospair = 0.0; + double sum_npos = 0.0, sum_nneg = 0.0, buf_pos = 0.0, buf_neg = 0.0; + for (size_t j = 0; j < rec.size(); ++j) { + const bst_float wt = WeightPolicy::GetWeightOfSortedRecord(info, rec, j, group_id); + const bst_float ctr = labels[rec[j].second]; + // keep bucketing predictions in same bucket + if (j != 0 && rec[j].first != rec[j - 1].first) { + sum_pospair += buf_neg * (sum_npos + buf_pos * 0.5); + sum_npos += buf_pos; + sum_nneg += buf_neg; + buf_neg = buf_pos = 0.0f; + } + buf_pos += ctr * wt; + buf_neg += (1.0f - ctr) * wt; + } sum_pospair += buf_neg * (sum_npos + buf_pos * 0.5); sum_npos += buf_pos; sum_nneg += buf_neg; - buf_neg = buf_pos = 0.0f; - } - buf_pos += ctr * wt; - buf_neg += (1.0f - ctr) * wt; + // check weird conditions + if (sum_npos <= 0.0 || sum_nneg <= 0.0) { + auc_error += 1; + } else { + // this is the AUC + sum_auc += sum_pospair / (sum_npos * sum_nneg); + } + }); } - sum_pospair += buf_neg * (sum_npos + buf_pos * 0.5); - sum_npos += buf_pos; - sum_nneg += buf_neg; - // check weird conditions - if (sum_npos <= 0.0 || sum_nneg <= 0.0) { - auc_error += 1; - } else { - // this is the AUC - sum_auc += sum_pospair / (sum_npos * sum_nneg); - } - } + }); } + exc.Rethrow(); // Report average AUC across all groups // In distributed mode, workers which only contains pos or neg samples @@ -316,19 +324,25 @@ struct EvalRank : public Metric, public EvalRankConfig { const auto &labels = info.labels_.ConstHostVector(); const auto &h_preds = preds.ConstHostVector(); + dmlc::OMPException exc; #pragma omp parallel reduction(+:sum_metric) { - // each thread takes a local rec - PredIndPairContainer rec; - #pragma omp for schedule(static) - for (bst_omp_uint k = 0; k < ngroups; ++k) { - rec.clear(); - for (unsigned j = gptr[k]; j < gptr[k + 1]; ++j) { - rec.emplace_back(h_preds[j], static_cast(labels[j])); + exc.Run([&]() { + // each thread takes a local rec + PredIndPairContainer rec; + #pragma omp for schedule(static) + for (bst_omp_uint k = 0; k < ngroups; ++k) { + exc.Run([&]() { + rec.clear(); + for (unsigned j = gptr[k]; j < gptr[k + 1]; ++j) { + rec.emplace_back(h_preds[j], static_cast(labels[j])); + } + sum_metric += this->EvalGroup(&rec); + }); } - sum_metric += this->EvalGroup(&rec); - } + }); } + exc.Rethrow(); } if (distributed) { @@ -526,66 +540,75 @@ struct EvalAucPR : public Metric { const auto &h_labels = info.labels_.ConstHostVector(); const auto &h_preds = preds.ConstHostVector(); + dmlc::OMPException exc; #pragma omp parallel reduction(+:sum_auc, auc_error) if (ngroups > 1) { - // Each thread works on a distinct group and sorts the predictions in that group - PredIndPairContainer rec; - #pragma omp for schedule(static) - for (bst_omp_uint group_id = 0; group_id < ngroups; ++group_id) { - double total_pos = 0.0; - double total_neg = 0.0; - // Same thread can work on multiple groups one after another; hence, resize - // the predictions array based on the current group - rec.resize(gptr[group_id + 1] - gptr[group_id]); - #pragma omp parallel for schedule(static) reduction(+:total_pos, total_neg) \ - if (!omp_in_parallel()) // NOLINT - for (bst_omp_uint j = gptr[group_id]; j < gptr[group_id + 1]; ++j) { - const bst_float wt = WeightPolicy::GetWeightOfInstance(info, j, group_id); - total_pos += wt * h_labels[j]; - total_neg += wt * (1.0f - h_labels[j]); - rec[j - gptr[group_id]] = {h_preds[j], j}; - } - - // we need pos > 0 && neg > 0 - if (total_pos <= 0.0 || total_neg <= 0.0) { - auc_error += 1; - continue; - } - - XGBOOST_PARALLEL_SORT(rec.begin(), rec.end(), common::CmpFirst); - - // calculate AUC - double tp = 0.0, prevtp = 0.0, fp = 0.0, prevfp = 0.0, h = 0.0, a = 0.0, b = 0.0; - for (size_t j = 0; j < rec.size(); ++j) { - const bst_float wt = WeightPolicy::GetWeightOfSortedRecord(info, rec, j, group_id); - tp += wt * h_labels[rec[j].second]; - fp += wt * (1.0f - h_labels[rec[j].second]); - if ((j < rec.size() - 1 && rec[j].first != rec[j + 1].first) || j == rec.size() - 1) { - if (tp == prevtp) { - a = 1.0; - b = 0.0; - } else { - h = (fp - prevfp) / (tp - prevtp); - a = 1.0 + h; - b = (prevfp - h * prevtp) / total_pos; + exc.Run([&]() { + // Each thread works on a distinct group and sorts the predictions in that group + PredIndPairContainer rec; + #pragma omp for schedule(static) + for (bst_omp_uint group_id = 0; group_id < ngroups; ++group_id) { + exc.Run([&]() { + double total_pos = 0.0; + double total_neg = 0.0; + // Same thread can work on multiple groups one after another; hence, resize + // the predictions array based on the current group + rec.resize(gptr[group_id + 1] - gptr[group_id]); + #pragma omp parallel for schedule(static) reduction(+:total_pos, total_neg) \ + if (!omp_in_parallel()) // NOLINT + for (bst_omp_uint j = gptr[group_id]; j < gptr[group_id + 1]; ++j) { + exc.Run([&]() { + const bst_float wt = WeightPolicy::GetWeightOfInstance(info, j, group_id); + total_pos += wt * h_labels[j]; + total_neg += wt * (1.0f - h_labels[j]); + rec[j - gptr[group_id]] = {h_preds[j], j}; + }); } - if (0.0 != b) { - sum_auc += (tp / total_pos - prevtp / total_pos - - b / a * (std::log(a * tp / total_pos + b) - - std::log(a * prevtp / total_pos + b))) / a; - } else { - sum_auc += (tp / total_pos - prevtp / total_pos) / a; + + // we need pos > 0 && neg > 0 + if (total_pos <= 0.0 || total_neg <= 0.0) { + auc_error += 1; + return; } - prevtp = tp; - prevfp = fp; - } + + XGBOOST_PARALLEL_SORT(rec.begin(), rec.end(), common::CmpFirst); + + // calculate AUC + double tp = 0.0, prevtp = 0.0, fp = 0.0, prevfp = 0.0, h = 0.0, a = 0.0, b = 0.0; + for (size_t j = 0; j < rec.size(); ++j) { + const bst_float wt = WeightPolicy::GetWeightOfSortedRecord(info, rec, j, group_id); + tp += wt * h_labels[rec[j].second]; + fp += wt * (1.0f - h_labels[rec[j].second]); + if ((j < rec.size() - 1 && rec[j].first != rec[j + 1].first) || + j == rec.size() - 1) { + if (tp == prevtp) { + a = 1.0; + b = 0.0; + } else { + h = (fp - prevfp) / (tp - prevtp); + a = 1.0 + h; + b = (prevfp - h * prevtp) / total_pos; + } + if (0.0 != b) { + sum_auc += (tp / total_pos - prevtp / total_pos - + b / a * (std::log(a * tp / total_pos + b) - + std::log(a * prevtp / total_pos + b))) / a; + } else { + sum_auc += (tp / total_pos - prevtp / total_pos) / a; + } + prevtp = tp; + prevfp = fp; + } + } + // sanity check + if (tp < 0 || prevtp < 0 || fp < 0 || prevfp < 0) { + CHECK(!auc_error) << "AUC-PR: error in calculation"; + } + }); } - // sanity check - if (tp < 0 || prevtp < 0 || fp < 0 || prevfp < 0) { - CHECK(!auc_error) << "AUC-PR: error in calculation"; - } - } + }); } + exc.Rethrow(); // Report average AUC-PR across all groups // In distributed mode, workers which only contains pos or neg samples diff --git a/src/metric/survival_metric.cu b/src/metric/survival_metric.cu index ed82505f5..c53344c5e 100644 --- a/src/metric/survival_metric.cu +++ b/src/metric/survival_metric.cu @@ -58,15 +58,19 @@ class ElementWiseSurvivalMetricsReduction { double residue_sum = 0; double weights_sum = 0; + dmlc::OMPException exc; #pragma omp parallel for reduction(+: residue_sum, weights_sum) schedule(static) for (omp_ulong i = 0; i < ndata; ++i) { - const double wt = h_weights.empty() ? 1.0 : static_cast(h_weights[i]); - residue_sum += policy_.EvalRow( - static_cast(h_labels_lower_bound[i]), - static_cast(h_labels_upper_bound[i]), - static_cast(h_preds[i])) * wt; - weights_sum += wt; + exc.Run([&]() { + const double wt = h_weights.empty() ? 1.0 : static_cast(h_weights[i]); + residue_sum += policy_.EvalRow( + static_cast(h_labels_lower_bound[i]), + static_cast(h_labels_upper_bound[i]), + static_cast(h_preds[i])) * wt; + weights_sum += wt; + }); } + exc.Rethrow(); PackedReduceResult res{residue_sum, weights_sum}; return res; } diff --git a/src/objective/rank_obj.cu b/src/objective/rank_obj.cu index 2482f9e95..f1b350bb3 100644 --- a/src/objective/rank_obj.cu +++ b/src/objective/rank_obj.cu @@ -823,72 +823,80 @@ class LambdaRankObj : public ObjFunction { const auto ngroup = static_cast(gptr.size() - 1); out_gpair->Resize(preds.Size()); + dmlc::OMPException exc; #pragma omp parallel { - // parallel construct, declare random number generator here, so that each - // thread use its own random number generator, seed by thread id and current iteration - std::minstd_rand rnd((iter + 1) * 1111); - std::vector pairs; - std::vector lst; - std::vector< std::pair > rec; + exc.Run([&]() { + // parallel construct, declare random number generator here, so that each + // thread use its own random number generator, seed by thread id and current iteration + std::minstd_rand rnd((iter + 1) * 1111); + std::vector pairs; + std::vector lst; + std::vector< std::pair > rec; - #pragma omp for schedule(static) - for (bst_omp_uint k = 0; k < ngroup; ++k) { - lst.clear(); pairs.clear(); - for (unsigned j = gptr[k]; j < gptr[k+1]; ++j) { - lst.emplace_back(preds_h[j], labels[j], j); - gpair[j] = GradientPair(0.0f, 0.0f); - } - std::stable_sort(lst.begin(), lst.end(), ListEntry::CmpPred); - rec.resize(lst.size()); - for (unsigned i = 0; i < lst.size(); ++i) { - rec[i] = std::make_pair(lst[i].label, i); - } - std::stable_sort(rec.begin(), rec.end(), common::CmpFirst); - // enumerate buckets with same label, for each item in the lst, grab another sample randomly - for (unsigned i = 0; i < rec.size(); ) { - unsigned j = i + 1; - while (j < rec.size() && rec[j].first == rec[i].first) ++j; - // bucket in [i,j), get a sample outside bucket - unsigned nleft = i, nright = static_cast(rec.size() - j); - if (nleft + nright != 0) { - int nsample = param_.num_pairsample; - while (nsample --) { - for (unsigned pid = i; pid < j; ++pid) { - unsigned ridx = std::uniform_int_distribution(0, nleft + nright - 1)(rnd); - if (ridx < nleft) { - pairs.emplace_back(rec[ridx].second, rec[pid].second, - info.GetWeight(k) * weight_normalization_factor); - } else { - pairs.emplace_back(rec[pid].second, rec[ridx+j-i].second, - info.GetWeight(k) * weight_normalization_factor); + #pragma omp for schedule(static) + for (bst_omp_uint k = 0; k < ngroup; ++k) { + exc.Run([&]() { + lst.clear(); pairs.clear(); + for (unsigned j = gptr[k]; j < gptr[k+1]; ++j) { + lst.emplace_back(preds_h[j], labels[j], j); + gpair[j] = GradientPair(0.0f, 0.0f); + } + std::stable_sort(lst.begin(), lst.end(), ListEntry::CmpPred); + rec.resize(lst.size()); + for (unsigned i = 0; i < lst.size(); ++i) { + rec[i] = std::make_pair(lst[i].label, i); + } + std::stable_sort(rec.begin(), rec.end(), common::CmpFirst); + // enumerate buckets with same label + // for each item in the lst, grab another sample randomly + for (unsigned i = 0; i < rec.size(); ) { + unsigned j = i + 1; + while (j < rec.size() && rec[j].first == rec[i].first) ++j; + // bucket in [i,j), get a sample outside bucket + unsigned nleft = i, nright = static_cast(rec.size() - j); + if (nleft + nright != 0) { + int nsample = param_.num_pairsample; + while (nsample --) { + for (unsigned pid = i; pid < j; ++pid) { + unsigned ridx = + std::uniform_int_distribution(0, nleft + nright - 1)(rnd); + if (ridx < nleft) { + pairs.emplace_back(rec[ridx].second, rec[pid].second, + info.GetWeight(k) * weight_normalization_factor); + } else { + pairs.emplace_back(rec[pid].second, rec[ridx+j-i].second, + info.GetWeight(k) * weight_normalization_factor); + } + } } } + i = j; } - } - i = j; + // get lambda weight for the pairs + LambdaWeightComputerT::GetLambdaWeight(lst, &pairs); + // rescale each gradient and hessian so that the lst have constant weighted + float scale = 1.0f / param_.num_pairsample; + if (param_.fix_list_weight != 0.0f) { + scale *= param_.fix_list_weight / (gptr[k + 1] - gptr[k]); + } + for (auto & pair : pairs) { + const ListEntry &pos = lst[pair.pos_index]; + const ListEntry &neg = lst[pair.neg_index]; + const bst_float w = pair.weight * scale; + const float eps = 1e-16f; + bst_float p = common::Sigmoid(pos.pred - neg.pred); + bst_float g = p - 1.0f; + bst_float h = std::max(p * (1.0f - p), eps); + // accumulate gradient and hessian in both pid, and nid + gpair[pos.rindex] += GradientPair(g * w, 2.0f*w*h); + gpair[neg.rindex] += GradientPair(-g * w, 2.0f*w*h); + } + }); } - // get lambda weight for the pairs - LambdaWeightComputerT::GetLambdaWeight(lst, &pairs); - // rescale each gradient and hessian so that the lst have constant weighted - float scale = 1.0f / param_.num_pairsample; - if (param_.fix_list_weight != 0.0f) { - scale *= param_.fix_list_weight / (gptr[k + 1] - gptr[k]); - } - for (auto & pair : pairs) { - const ListEntry &pos = lst[pair.pos_index]; - const ListEntry &neg = lst[pair.neg_index]; - const bst_float w = pair.weight * scale; - const float eps = 1e-16f; - bst_float p = common::Sigmoid(pos.pred - neg.pred); - bst_float g = p - 1.0f; - bst_float h = std::max(p * (1.0f - p), eps); - // accumulate gradient and hessian in both pid, and nid - gpair[pos.rindex] += GradientPair(g * w, 2.0f*w*h); - gpair[neg.rindex] += GradientPair(-g * w, 2.0f*w*h); - } - } + }); } + exc.Rethrow(); } #if defined(__CUDACC__) diff --git a/src/objective/regression_obj.cu b/src/objective/regression_obj.cu index c94b90aa7..a5bdc47ac 100644 --- a/src/objective/regression_obj.cu +++ b/src/objective/regression_obj.cu @@ -19,6 +19,7 @@ #include "../common/transform.h" #include "../common/common.h" +#include "../common/threading_utils.h" #include "./regression_loss.h" @@ -345,10 +346,9 @@ class CoxRegression : public ObjFunction { void PredTransform(HostDeviceVector *io_preds) override { std::vector &preds = io_preds->HostVector(); const long ndata = static_cast(preds.size()); // NOLINT(*) -#pragma omp parallel for schedule(static) - for (long j = 0; j < ndata; ++j) { // NOLINT(*) + common::ParallelFor(ndata, [&](long j) { // NOLINT(*) preds[j] = std::exp(preds[j]); - } + }); } void EvalTransform(HostDeviceVector *io_preds) override { PredTransform(io_preds); diff --git a/src/predictor/cpu_predictor.cc b/src/predictor/cpu_predictor.cc index 8473e89f9..41584a50e 100644 --- a/src/predictor/cpu_predictor.cc +++ b/src/predictor/cpu_predictor.cc @@ -18,6 +18,7 @@ #include "../data/adapter.h" #include "../common/math.h" +#include "../common/threading_utils.h" #include "../gbm/gbtree_model.h" namespace xgboost { @@ -157,8 +158,7 @@ void PredictBatchByBlockOfRowsKernel(DataView batch, std::vector *out const auto nsize = static_cast(batch.Size()); const int num_feature = model.learner_model_param->num_feature; const bst_omp_uint n_row_blocks = (nsize) / block_of_rows_size + !!((nsize) % block_of_rows_size); -#pragma omp parallel for schedule(static) - for (bst_omp_uint block_id = 0; block_id < n_row_blocks; ++block_id) { + common::ParallelFor(n_row_blocks, [&](bst_omp_uint block_id) { const size_t batch_offset = block_id * block_of_rows_size; const size_t block_size = std::min(nsize - batch_offset, block_of_rows_size); const size_t fvec_offset = omp_get_thread_num() * block_of_rows_size; @@ -168,7 +168,7 @@ void PredictBatchByBlockOfRowsKernel(DataView batch, std::vector *out PredictByAllTrees(model, tree_begin, tree_end, out_preds, batch_offset + batch.base_rowid, num_group, thread_temp, fvec_offset, block_size); FVecDrop(block_size, batch_offset, &batch, fvec_offset, p_thread_temp); - } + }); } class CPUPredictor : public Predictor { @@ -335,8 +335,7 @@ class CPUPredictor : public Predictor { // parallel over local batch auto page = batch.GetView(); const auto nsize = static_cast(batch.Size()); -#pragma omp parallel for schedule(static) - for (bst_omp_uint i = 0; i < nsize; ++i) { + common::ParallelFor(nsize, [&](bst_omp_uint i) { const int tid = omp_get_thread_num(); auto ridx = static_cast(batch.base_rowid + i); RegTree::FVec &feats = feat_vecs[tid]; @@ -349,7 +348,7 @@ class CPUPredictor : public Predictor { preds[ridx * ntree_limit + j] = static_cast(tid); } feats.Drop(page[i]); - } + }); } } @@ -378,18 +377,16 @@ class CPUPredictor : public Predictor { // allocated one std::fill(contribs.begin(), contribs.end(), 0); // initialize tree node mean values - #pragma omp parallel for schedule(static) - for (bst_omp_uint i = 0; i < ntree_limit; ++i) { + common::ParallelFor(bst_omp_uint(ntree_limit), [&](bst_omp_uint i) { model.trees[i]->FillNodeMeanValues(); - } + }); const std::vector& base_margin = info.base_margin_.HostVector(); // start collecting the contributions for (const auto &batch : p_fmat->GetBatches()) { auto page = batch.GetView(); // parallel over local batch const auto nsize = static_cast(batch.Size()); -#pragma omp parallel for schedule(static) - for (bst_omp_uint i = 0; i < nsize; ++i) { + common::ParallelFor(nsize, [&](bst_omp_uint i) { auto row_idx = static_cast(batch.base_rowid + i); RegTree::FVec &feats = feat_vecs[omp_get_thread_num()]; if (feats.Size() == 0) { @@ -425,7 +422,7 @@ class CPUPredictor : public Predictor { p_contribs[ncolumns - 1] += model.learner_model_param->base_score; } } - } + }); } } diff --git a/src/tree/updater_basemaker-inl.h b/src/tree/updater_basemaker-inl.h index b38ac2e7c..860ff60d7 100644 --- a/src/tree/updater_basemaker-inl.h +++ b/src/tree/updater_basemaker-inl.h @@ -25,6 +25,7 @@ #include "../common/io.h" #include "../common/random.h" #include "../common/quantile.h" +#include "../common/threading_utils.h" namespace xgboost { namespace tree { @@ -221,8 +222,7 @@ class BaseMaker: public TreeUpdater { // so that they are ignored in future statistics collection const auto ndata = static_cast(p_fmat->Info().num_row_); -#pragma omp parallel for schedule(static) - for (bst_omp_uint ridx = 0; ridx < ndata; ++ridx) { + common::ParallelFor(ndata, [&](bst_omp_uint ridx) { const int nid = this->DecodePosition(ridx); if (tree[nid].IsLeaf()) { // mark finish when it is not a fresh leaf @@ -237,7 +237,7 @@ class BaseMaker: public TreeUpdater { this->SetEncodePosition(ridx, tree[nid].RightChild()); } } - } + }); } /*! * \brief this is helper function uses column based data structure, @@ -257,8 +257,7 @@ class BaseMaker: public TreeUpdater { if (it != sorted_split_set.end() && *it == fid) { const auto ndata = static_cast(col.size()); - #pragma omp parallel for schedule(static) - for (bst_omp_uint j = 0; j < ndata; ++j) { + common::ParallelFor(ndata, [&](bst_omp_uint j) { const bst_uint ridx = col[j].index; const bst_float fvalue = col[j].fvalue; const int nid = this->DecodePosition(ridx); @@ -273,7 +272,7 @@ class BaseMaker: public TreeUpdater { this->SetEncodePosition(ridx, tree[pid].RightChild()); } } - } + }); } } } @@ -314,8 +313,7 @@ class BaseMaker: public TreeUpdater { for (auto fid : fsplits) { auto col = page[fid]; const auto ndata = static_cast(col.size()); -#pragma omp parallel for schedule(static) - for (bst_omp_uint j = 0; j < ndata; ++j) { + common::ParallelFor(ndata, [&](bst_omp_uint j) { const bst_uint ridx = col[j].index; const bst_float fvalue = col[j].fvalue; const int nid = this->DecodePosition(ridx); @@ -327,7 +325,7 @@ class BaseMaker: public TreeUpdater { this->SetEncodePosition(ridx, tree[nid].RightChild()); } } - } + }); } } } @@ -341,24 +339,27 @@ class BaseMaker: public TreeUpdater { std::vector< std::vector > &thread_temp = *p_thread_temp; thread_temp.resize(omp_get_max_threads()); p_node_stats->resize(tree.param.num_nodes); + dmlc::OMPException exc; #pragma omp parallel { - const int tid = omp_get_thread_num(); - thread_temp[tid].resize(tree.param.num_nodes, TStats()); - for (unsigned int nid : qexpand_) { - thread_temp[tid][nid] = TStats(); - } + exc.Run([&]() { + const int tid = omp_get_thread_num(); + thread_temp[tid].resize(tree.param.num_nodes, TStats()); + for (unsigned int nid : qexpand_) { + thread_temp[tid][nid] = TStats(); + } + }); } + exc.Rethrow(); // setup position const auto ndata = static_cast(fmat.Info().num_row_); -#pragma omp parallel for schedule(static) - for (bst_omp_uint ridx = 0; ridx < ndata; ++ridx) { + common::ParallelFor(ndata, [&](bst_omp_uint ridx) { const int nid = position_[ridx]; const int tid = omp_get_thread_num(); if (nid >= 0) { thread_temp[tid][nid].Add(gpair[ridx]); } - } + }); // sum the per thread statistics together for (int nid : qexpand_) { TStats &s = (*p_node_stats)[nid]; diff --git a/src/tree/updater_colmaker.cc b/src/tree/updater_colmaker.cc index 1997ecaf0..93bd0189e 100644 --- a/src/tree/updater_colmaker.cc +++ b/src/tree/updater_colmaker.cc @@ -264,12 +264,16 @@ class ColMaker: public TreeUpdater { const MetaInfo& info = fmat.Info(); // setup position const auto ndata = static_cast(info.num_row_); + dmlc::OMPException exc; #pragma omp parallel for schedule(static) for (bst_omp_uint ridx = 0; ridx < ndata; ++ridx) { - const int tid = omp_get_thread_num(); - if (position_[ridx] < 0) continue; - stemp_[tid][position_[ridx]].stats.Add(gpair[ridx]); + exc.Run([&]() { + const int tid = omp_get_thread_num(); + if (position_[ridx] < 0) return; + stemp_[tid][position_[ridx]].stats.Add(gpair[ridx]); + }); } + exc.Rethrow(); // sum the per thread statistics together for (int nid : qexpand) { GradStats stats; @@ -447,11 +451,11 @@ class ColMaker: public TreeUpdater { std::max(static_cast(num_features / this->nthread_ / 32), 1); #endif // defined(_OPENMP) { - dmlc::OMPException omp_handler; auto page = batch.GetView(); + dmlc::OMPException exc; #pragma omp parallel for schedule(dynamic, batch_size) for (bst_omp_uint i = 0; i < num_features; ++i) { - omp_handler.Run([&]() { + exc.Run([&]() { auto evaluator = tree_evaluator_.GetEvaluator(); bst_feature_t const fid = feat_set[i]; int32_t const tid = omp_get_thread_num(); @@ -461,16 +465,16 @@ class ColMaker: public TreeUpdater { if (colmaker_train_param_.NeedForwardSearch( param_.default_direction, column_densities_[fid], ind)) { this->EnumerateSplit(c.data(), c.data() + c.size(), +1, fid, - gpair, stemp_[tid], evaluator); + gpair, stemp_[tid], evaluator); } if (colmaker_train_param_.NeedBackwardSearch( param_.default_direction)) { this->EnumerateSplit(c.data() + c.size() - 1, c.data() - 1, -1, - fid, gpair, stemp_[tid], evaluator); + fid, gpair, stemp_[tid], evaluator); } }); } - omp_handler.Rethrow(); + exc.Rethrow(); } } // find splits at current level, do split per level @@ -521,8 +525,7 @@ class ColMaker: public TreeUpdater { // so that they are ignored in future statistics collection const auto ndata = static_cast(p_fmat->Info().num_row_); -#pragma omp parallel for schedule(static) - for (bst_omp_uint ridx = 0; ridx < ndata; ++ridx) { + common::ParallelFor(ndata, [&](bst_omp_uint ridx) { CHECK_LT(ridx, position_.size()) << "ridx exceed bound " << "ridx="<< ridx << " pos=" << position_.size(); const int nid = this->DecodePosition(ridx); @@ -539,7 +542,7 @@ class ColMaker: public TreeUpdater { this->SetEncodePosition(ridx, tree[nid].RightChild()); } } - } + }); } // customization part // synchronize the best solution of each node @@ -568,8 +571,7 @@ class ColMaker: public TreeUpdater { for (auto fid : fsplits) { auto col = page[fid]; const auto ndata = static_cast(col.size()); -#pragma omp parallel for schedule(static) - for (bst_omp_uint j = 0; j < ndata; ++j) { + common::ParallelFor(ndata, [&](bst_omp_uint j) { const bst_uint ridx = col[j].index; const int nid = this->DecodePosition(ridx); const bst_float fvalue = col[j].fvalue; @@ -581,7 +583,7 @@ class ColMaker: public TreeUpdater { this->SetEncodePosition(ridx, tree[nid].RightChild()); } } - } + }); } } } diff --git a/src/tree/updater_histmaker.cc b/src/tree/updater_histmaker.cc index fff642ba8..353b365f2 100644 --- a/src/tree/updater_histmaker.cc +++ b/src/tree/updater_histmaker.cc @@ -202,22 +202,26 @@ class HistMaker: public BaseMaker { std::vector sol(qexpand_.size()); std::vector left_sum(qexpand_.size()); auto nexpand = static_cast(qexpand_.size()); + dmlc::OMPException exc; #pragma omp parallel for schedule(dynamic, 1) for (bst_omp_uint wid = 0; wid < nexpand; ++wid) { - const int nid = qexpand_[wid]; - CHECK_EQ(node2workindex_[nid], static_cast(wid)); - SplitEntry &best = sol[wid]; - GradStats &node_sum = wspace_.hset[0][num_feature + wid * (num_feature + 1)].data[0]; - for (size_t i = 0; i < feature_set.size(); ++i) { - // Query is thread safe as it's a const function. - if (!this->interaction_constraints_.Query(nid, feature_set[i])) { - continue; - } + exc.Run([&]() { + const int nid = qexpand_[wid]; + CHECK_EQ(node2workindex_[nid], static_cast(wid)); + SplitEntry &best = sol[wid]; + GradStats &node_sum = wspace_.hset[0][num_feature + wid * (num_feature + 1)].data[0]; + for (size_t i = 0; i < feature_set.size(); ++i) { + // Query is thread safe as it's a const function. + if (!this->interaction_constraints_.Query(nid, feature_set[i])) { + continue; + } - EnumerateSplit(this->wspace_.hset[0][i + wid * (num_feature+1)], - node_sum, feature_set[i], &best, &left_sum[wid]); - } + EnumerateSplit(this->wspace_.hset[0][i + wid * (num_feature+1)], + node_sum, feature_set[i], &best, &left_sum[wid]); + } + }); } + exc.Rethrow(); // get the best result, we can synchronize the solution for (bst_omp_uint wid = 0; wid < nexpand; ++wid) { const bst_node_t nid = qexpand_[wid]; @@ -341,16 +345,20 @@ class CQHistMaker: public HistMaker { auto page = batch.GetView(); // start enumeration const auto nsize = static_cast(fset.size()); + dmlc::OMPException exc; #pragma omp parallel for schedule(dynamic, 1) for (bst_omp_uint i = 0; i < nsize; ++i) { - int fid = fset[i]; - int offset = feat2workindex_[fid]; - if (offset >= 0) { - this->UpdateHistCol(gpair, page[fid], info, tree, - fset, offset, - &thread_hist_[omp_get_thread_num()]); - } + exc.Run([&]() { + int fid = fset[i]; + int offset = feat2workindex_[fid]; + if (offset >= 0) { + this->UpdateHistCol(gpair, page[fid], info, tree, + fset, offset, + &thread_hist_[omp_get_thread_num()]); + } + }); } + exc.Rethrow(); } // update node statistics. this->GetNodeStats(gpair, *p_fmat, tree, @@ -417,16 +425,20 @@ class CQHistMaker: public HistMaker { auto page = batch.GetView(); // start enumeration const auto nsize = static_cast(work_set_.size()); + dmlc::OMPException exc; #pragma omp parallel for schedule(dynamic, 1) for (bst_omp_uint i = 0; i < nsize; ++i) { - int fid = work_set_[i]; - int offset = feat2workindex_[fid]; - if (offset >= 0) { - this->UpdateSketchCol(gpair, page[fid], tree, - work_set_size, offset, - &thread_sketch_[omp_get_thread_num()]); - } + exc.Run([&]() { + int fid = work_set_[i]; + int offset = feat2workindex_[fid]; + if (offset >= 0) { + this->UpdateSketchCol(gpair, page[fid], tree, + work_set_size, offset, + &thread_sketch_[omp_get_thread_num()]); + } + }); } + exc.Rethrow(); } for (size_t i = 0; i < sketchs_.size(); ++i) { common::WXQuantileSketch::SummaryContainer out; @@ -701,16 +713,20 @@ class GlobalProposalHistMaker: public CQHistMaker { // start enumeration const auto nsize = static_cast(this->work_set_.size()); + dmlc::OMPException exc; #pragma omp parallel for schedule(dynamic, 1) for (bst_omp_uint i = 0; i < nsize; ++i) { - int fid = this->work_set_[i]; - int offset = this->feat2workindex_[fid]; - if (offset >= 0) { - this->UpdateHistCol(gpair, page[fid], info, tree, - fset, offset, - &this->thread_hist_[omp_get_thread_num()]); - } + exc.Run([&]() { + int fid = this->work_set_[i]; + int offset = this->feat2workindex_[fid]; + if (offset >= 0) { + this->UpdateHistCol(gpair, page[fid], info, tree, + fset, offset, + &this->thread_hist_[omp_get_thread_num()]); + } + }); } + exc.Rethrow(); } // update node statistics. diff --git a/src/tree/updater_quantile_hist.cc b/src/tree/updater_quantile_hist.cc index 7248eae80..fe172a7e9 100644 --- a/src/tree/updater_quantile_hist.cc +++ b/src/tree/updater_quantile_hist.cc @@ -713,20 +713,24 @@ void QuantileHistMaker::Builder::InitSampling(const std::vector(std::numeric_limits::max()); uint32_t coin_flip_border = static_cast(upper_border * param_.subsample); + dmlc::OMPException exc; #pragma omp parallel num_threads(nthread) { - const size_t tid = omp_get_thread_num(); - const size_t ibegin = tid * discard_size; - const size_t iend = (tid == (nthread - 1)) ? - info.num_row_ : ibegin + discard_size; + exc.Run([&]() { + const size_t tid = omp_get_thread_num(); + const size_t ibegin = tid * discard_size; + const size_t iend = (tid == (nthread - 1)) ? + info.num_row_ : ibegin + discard_size; - rnds[tid].discard(discard_size * tid); - for (size_t i = ibegin; i < iend; ++i) { - if (gpair[i].GetHess() >= 0.0f && rnds[tid]() < coin_flip_border) { - p_row_indices[ibegin + row_offsets[tid]++] = i; + rnds[tid].discard(discard_size * tid); + for (size_t i = ibegin; i < iend; ++i) { + if (gpair[i].GetHess() >= 0.0f && rnds[tid]() < coin_flip_border) { + p_row_indices[ibegin + row_offsets[tid]++] = i; + } } - } + }); } + exc.Rethrow(); /* discard global engine */ rnd = rnds[nthread - 1]; size_t prefix_sum = row_offsets[0]; @@ -769,10 +773,14 @@ void QuantileHistMaker::Builder::InitData(const GHistIndexMatrix& hist_buffer_.Init(nbins); // initialize histogram builder + dmlc::OMPException exc; #pragma omp parallel { - this->nthread_ = omp_get_num_threads(); + exc.Run([&]() { + this->nthread_ = omp_get_num_threads(); + }); } + exc.Rethrow(); hist_builder_ = GHistBuilder(this->nthread_, nbins); std::vector& row_indices = *row_set_collection_.Data(); @@ -794,18 +802,21 @@ void QuantileHistMaker::Builder::InitData(const GHistIndexMatrix& #pragma omp parallel num_threads(this->nthread_) { - const size_t tid = omp_get_thread_num(); - const size_t ibegin = tid * block_size; - const size_t iend = std::min(static_cast(ibegin + block_size), - static_cast(info.num_row_)); + exc.Run([&]() { + const size_t tid = omp_get_thread_num(); + const size_t ibegin = tid * block_size; + const size_t iend = std::min(static_cast(ibegin + block_size), + static_cast(info.num_row_)); - for (size_t i = ibegin; i < iend; ++i) { - if (gpair[i].GetHess() < 0.0f) { - p_buff[tid] = true; - break; + for (size_t i = ibegin; i < iend; ++i) { + if (gpair[i].GetHess() < 0.0f) { + p_buff[tid] = true; + break; + } } - } + }); } + exc.Rethrow(); bool has_neg_hess = false; for (int32_t tid = 0; tid < this->nthread_; ++tid) { @@ -825,14 +836,17 @@ void QuantileHistMaker::Builder::InitData(const GHistIndexMatrix& } else { #pragma omp parallel num_threads(this->nthread_) { - const size_t tid = omp_get_thread_num(); - const size_t ibegin = tid * block_size; - const size_t iend = std::min(static_cast(ibegin + block_size), - static_cast(info.num_row_)); - for (size_t i = ibegin; i < iend; ++i) { - p_row_indices[i] = i; - } + exc.Run([&]() { + const size_t tid = omp_get_thread_num(); + const size_t ibegin = tid * block_size; + const size_t iend = std::min(static_cast(ibegin + block_size), + static_cast(info.num_row_)); + for (size_t i = ibegin; i < iend; ++i) { + p_row_indices[i] = i; + } + }); } + exc.Rethrow(); } } } diff --git a/src/tree/updater_refresh.cc b/src/tree/updater_refresh.cc index 20485c670..0d553638d 100644 --- a/src/tree/updater_refresh.cc +++ b/src/tree/updater_refresh.cc @@ -13,6 +13,7 @@ #include "xgboost/json.h" #include "./param.h" #include "../common/io.h" +#include "../common/threading_utils.h" namespace xgboost { namespace tree { @@ -52,17 +53,21 @@ class TreeRefresher: public TreeUpdater { const int nthread = omp_get_max_threads(); fvec_temp.resize(nthread, RegTree::FVec()); stemp.resize(nthread, std::vector()); + dmlc::OMPException exc; #pragma omp parallel { - int tid = omp_get_thread_num(); - int num_nodes = 0; - for (auto tree : trees) { - num_nodes += tree->param.num_nodes; - } - stemp[tid].resize(num_nodes, GradStats()); - std::fill(stemp[tid].begin(), stemp[tid].end(), GradStats()); - fvec_temp[tid].Init(trees[0]->param.num_feature); + exc.Run([&]() { + int tid = omp_get_thread_num(); + int num_nodes = 0; + for (auto tree : trees) { + num_nodes += tree->param.num_nodes; + } + stemp[tid].resize(num_nodes, GradStats()); + std::fill(stemp[tid].begin(), stemp[tid].end(), GradStats()); + fvec_temp[tid].Init(trees[0]->param.num_feature); + }); } + exc.Rethrow(); // if it is C++11, use lazy evaluation for Allreduce, // to gain speedup in recovery auto lazy_get_stats = [&]() { @@ -72,8 +77,7 @@ class TreeRefresher: public TreeUpdater { auto page = batch.GetView(); CHECK_LT(batch.Size(), std::numeric_limits::max()); const auto nbatch = static_cast(batch.Size()); -#pragma omp parallel for schedule(static) - for (bst_omp_uint i = 0; i < nbatch; ++i) { + common::ParallelFor(nbatch, [&](bst_omp_uint i) { SparsePage::Inst inst = page[i]; const int tid = omp_get_thread_num(); const auto ridx = static_cast(batch.base_rowid + i); @@ -86,16 +90,15 @@ class TreeRefresher: public TreeUpdater { offset += tree->param.num_nodes; } feats.Drop(inst); - } + }); } // aggregate the statistics auto num_nodes = static_cast(stemp[0].size()); - #pragma omp parallel for schedule(static) - for (int nid = 0; nid < num_nodes; ++nid) { + common::ParallelFor(num_nodes, [&](int nid) { for (int tid = 1; tid < nthread; ++tid) { stemp[0][nid].Add(stemp[tid][nid]); } - } + }); }; reducer_.Allreduce(dmlc::BeginPtr(stemp[0]), stemp[0].size(), lazy_get_stats); // rescale learning rate according to size of trees