diff --git a/src/common/column_matrix.h b/src/common/column_matrix.h index 18fcdb337..510206f50 100644 --- a/src/common/column_matrix.h +++ b/src/common/column_matrix.h @@ -71,7 +71,7 @@ class ColumnMatrix { // construct column matrix from GHistIndexMatrix inline void Init(const GHistIndexMatrix& gmat, double sparse_threshold) { - const auto nfeature = static_cast(gmat.cut.row_ptr.size() - 1); + const int32_t nfeature = static_cast(gmat.cut.row_ptr.size() - 1); const size_t nrow = gmat.row_ptr.size() - 1; // identify type of each column @@ -86,7 +86,7 @@ class ColumnMatrix { gmat.GetFeatureCounts(&feature_counts_[0]); // classify features - for (bst_uint fid = 0; fid < nfeature; ++fid) { + for (int32_t fid = 0; fid < nfeature; ++fid) { if (static_cast(feature_counts_[fid]) < sparse_threshold * nrow) { type_[fid] = kSparseColumn; @@ -100,7 +100,7 @@ class ColumnMatrix { boundary_.resize(nfeature); size_t accum_index_ = 0; size_t accum_row_ind_ = 0; - for (bst_uint fid = 0; fid < nfeature; ++fid) { + for (int32_t fid = 0; fid < nfeature; ++fid) { boundary_[fid].index_begin = accum_index_; boundary_[fid].row_ind_begin = accum_row_ind_; if (type_[fid] == kDenseColumn) { @@ -124,7 +124,9 @@ class ColumnMatrix { } // pre-fill index_ for dense columns - for (bst_uint fid = 0; fid < nfeature; ++fid) { + + #pragma omp parallel for + for (int32_t fid = 0; fid < nfeature; ++fid) { if (type_[fid] == kDenseColumn) { const size_t ibegin = boundary_[fid].index_begin; uint32_t* begin = &index_[ibegin]; @@ -184,8 +186,8 @@ class ColumnMatrix { std::vector feature_counts_; std::vector type_; - std::vector index_; // index_: may store smaller integers; needs padding - std::vector row_ind_; + SimpleArray index_; // index_: may store smaller integers; needs padding + SimpleArray row_ind_; std::vector boundary_; // index_base_[fid]: least bin id for feature fid diff --git a/src/common/hist_util.cc b/src/common/hist_util.cc index f1c3e412b..a473f9240 100644 --- a/src/common/hist_util.cc +++ b/src/common/hist_util.cc @@ -11,6 +11,7 @@ #include "./column_matrix.h" #include "./hist_util.h" #include "./quantile.h" +#include "./../tree/updater_quantile_hist.h" #if defined(XGBOOST_MM_PREFETCH_PRESENT) #include @@ -49,7 +50,7 @@ void HistCutMatrix::Init(DMatrix* p_fmat, uint32_t max_num_bins) { constexpr int kFactor = 8; std::vector sketchs; - const int nthread = omp_get_max_threads(); + const size_t nthread = omp_get_max_threads(); unsigned const nstep = static_cast((info.num_col_ + nthread - 1) / nthread); @@ -67,34 +68,85 @@ void HistCutMatrix::Init(DMatrix* p_fmat, uint32_t max_num_bins) { // Use group index for weights? bool const use_group_ind = num_groups != 0 && weights.size() != info.num_row_; - for (const auto &batch : p_fmat->GetRowBatches()) { - size_t group_ind = 0; - if (use_group_ind) { - group_ind = this->SearchGroupIndFromBaseRow(group_ptr, batch.base_rowid); - } -#pragma omp parallel num_threads(nthread) firstprivate(group_ind, use_group_ind) - { - CHECK_EQ(nthread, omp_get_num_threads()); - auto tid = static_cast(omp_get_thread_num()); - unsigned begin = std::min(nstep * tid, ncol); - unsigned end = std::min(nstep * (tid + 1), ncol); + if (use_group_ind) { + for (const auto &batch : p_fmat->GetRowBatches()) { + size_t group_ind = this->SearchGroupIndFromBaseRow(group_ptr, batch.base_rowid); + #pragma omp parallel num_threads(nthread) firstprivate(group_ind, use_group_ind) + { + CHECK_EQ(nthread, omp_get_num_threads()); + auto tid = static_cast(omp_get_thread_num()); + unsigned begin = std::min(nstep * tid, ncol); + unsigned end = std::min(nstep * (tid + 1), ncol); - // do not iterate if no columns are assigned to the thread - if (begin < end && end <= ncol) { - for (size_t i = 0; i < batch.Size(); ++i) { // NOLINT(*) - size_t const ridx = batch.base_rowid + i; - SparsePage::Inst const inst = batch[i]; - if (use_group_ind && - group_ptr[group_ind] == ridx && - // maximum equals to weights.size() - 1 - group_ind < num_groups - 1) { - // move to next group - group_ind++; + // do not iterate if no columns are assigned to the thread + if (begin < end && end <= ncol) { + for (size_t i = 0; i < batch.Size(); ++i) { // NOLINT(*) + size_t const ridx = batch.base_rowid + i; + SparsePage::Inst const inst = batch[i]; + if (group_ptr[group_ind] == ridx && + // maximum equals to weights.size() - 1 + group_ind < num_groups - 1) { + // move to next group + group_ind++; + } + for (auto const& entry : inst) { + if (entry.index >= begin && entry.index < end) { + size_t w_idx = group_ind; + sketchs[entry.index].Push(entry.fvalue, info.GetWeight(w_idx)); + } + } } - for (auto const& entry : inst) { - if (entry.index >= begin && entry.index < end) { - size_t w_idx = use_group_ind ? group_ind : ridx; - sketchs[entry.index].Push(entry.fvalue, info.GetWeight(w_idx)); + } + } + } + } else { + for (const auto &batch : p_fmat->GetRowBatches()) { + const size_t size = batch.Size(); + const size_t block_size = 512; + const size_t block_size_iter = block_size * nthread; + const size_t n_blocks = size / block_size_iter + !!(size % block_size_iter); + + std::vector>> buff(nthread); + for (size_t tid = 0; tid < nthread; ++tid) { + buff[tid].resize(block_size * ncol); + } + + std::vector sizes(nthread * ncol, 0); + + for (size_t iblock = 0; iblock < n_blocks; ++iblock) { + #pragma omp parallel num_threads(nthread) + { + int tid = omp_get_thread_num(); + + const size_t ibegin = iblock * block_size_iter + tid * block_size; + const size_t iend = std::min(ibegin + block_size, size); + + auto* p_sizes = sizes.data() + ncol * tid; + auto* p_buff = buff[tid].data(); + + for (size_t i = ibegin; i < iend; ++i) { + size_t const ridx = batch.base_rowid + i; + bst_float w = info.GetWeight(ridx); + SparsePage::Inst const inst = batch[i]; + + for (auto const& entry : inst) { + const size_t idx = entry.index; + p_buff[idx * block_size + p_sizes[idx]] = { entry.fvalue, w }; + p_sizes[idx]++; + } + } + #pragma omp barrier + #pragma omp for schedule(static) + for (int32_t icol = 0; icol < static_cast(ncol); ++icol) { + for (size_t tid = 0; tid < nthread; ++tid) { + auto* p_sizes = sizes.data() + ncol * tid; + auto* p_buff = buff[tid].data() + icol * block_size; + + for (size_t i = 0; i < p_sizes[icol]; ++i) { + sketchs[icol].Push(p_buff[i].first, p_buff[i].second); + } + + p_sizes[icol] = 0; } } } @@ -177,22 +229,66 @@ uint32_t HistCutMatrix::GetBinIdx(const Entry& e) { void GHistIndexMatrix::Init(DMatrix* p_fmat, int max_num_bins) { cut.Init(p_fmat, max_num_bins); - - const int nthread = omp_get_max_threads(); + const int32_t nthread = omp_get_max_threads(); + // const int nthread = 1; const uint32_t nbins = cut.row_ptr.back(); hit_count.resize(nbins, 0); hit_count_tloc_.resize(nthread * nbins, 0); - row_ptr.push_back(0); + + size_t new_size = 1; for (const auto &batch : p_fmat->GetRowBatches()) { - const size_t rbegin = row_ptr.size() - 1; - for (size_t i = 0; i < batch.Size(); ++i) { - row_ptr.push_back(batch[i].size() + row_ptr.back()); + new_size += batch.Size(); + } + + row_ptr.resize(new_size); + row_ptr[0] = 0; + + size_t rbegin = 0; + size_t prev_sum = 0; + + for (const auto &batch : p_fmat->GetRowBatches()) { + MemStackAllocator partial_sums(nthread); + size_t* p_part = partial_sums.Get(); + + size_t block_size = batch.Size() / nthread; + + #pragma omp parallel num_threads(nthread) + { + #pragma omp for + for (int32_t tid = 0; tid < nthread; ++tid) { + size_t ibegin = block_size * tid; + size_t iend = (tid == (nthread-1) ? batch.Size() : (block_size * (tid+1))); + + size_t sum = 0; + for (size_t i = ibegin; i < iend; ++i) { + sum += batch[i].size(); + row_ptr[rbegin + 1 + i] = sum; + } + } + + #pragma omp single + { + p_part[0] = prev_sum; + for (int32_t i = 1; i < nthread; ++i) { + p_part[i] = p_part[i - 1] + row_ptr[rbegin + i*block_size]; + } + } + + #pragma omp for + for (int32_t tid = 0; tid < nthread; ++tid) { + size_t ibegin = block_size * tid; + size_t iend = (tid == (nthread-1) ? batch.Size() : (block_size * (tid+1))); + + for (size_t i = ibegin; i < iend; ++i) { + row_ptr[rbegin + 1 + i] += p_part[tid]; + } + } } + index.resize(row_ptr.back()); CHECK_GT(cut.cut.size(), 0U); - CHECK_EQ(cut.row_ptr.back(), cut.cut.size()); auto bsize = static_cast(batch.Size()); #pragma omp parallel for num_threads(nthread) schedule(static) @@ -203,7 +299,6 @@ void GHistIndexMatrix::Init(DMatrix* p_fmat, int max_num_bins) { SparsePage::Inst inst = batch[i]; CHECK_EQ(ibegin + inst.size(), iend); - for (bst_uint j = 0; j < inst.size(); ++j) { uint32_t idx = cut.GetBinIdx(inst[j]); @@ -215,10 +310,13 @@ void GHistIndexMatrix::Init(DMatrix* p_fmat, int max_num_bins) { #pragma omp parallel for num_threads(nthread) schedule(static) for (bst_omp_uint idx = 0; idx < bst_omp_uint(nbins); ++idx) { - for (int tid = 0; tid < nthread; ++tid) { + for (size_t tid = 0; tid < nthread; ++tid) { hit_count[idx] += hit_count_tloc_[tid * nbins + idx]; } } + + prev_sum = row_ptr[rbegin + batch.Size()]; + rbegin += batch.Size(); } } diff --git a/src/common/hist_util.h b/src/common/hist_util.h index 3aa2f232d..09c7d7653 100644 --- a/src/common/hist_util.h +++ b/src/common/hist_util.h @@ -19,6 +19,74 @@ namespace xgboost { namespace common { +/* + * \brief A thin wrapper around dynamically allocated C-style array. + * Make sure to call resize() before use. + */ +template +struct SimpleArray { + ~SimpleArray() { + free(ptr_); + ptr_ = nullptr; + } + + void resize(size_t n) { + T* ptr = static_cast(malloc(n*sizeof(T))); + memcpy(ptr, ptr_, n_ * sizeof(T)); + free(ptr_); + ptr_ = ptr; + n_ = n; + } + + T& operator[](size_t idx) { + return ptr_[idx]; + } + + T& operator[](size_t idx) const { + return ptr_[idx]; + } + + size_t size() const { + return n_; + } + + T back() const { + return ptr_[n_-1]; + } + + T* data() { + return ptr_; + } + + const T* data() const { + return ptr_; + } + + + T* begin() { + return ptr_; + } + + const T* begin() const { + return ptr_; + } + + T* end() { + return ptr_ + n_; + } + + const T* end() const { + return ptr_ + n_; + } + + private: + T* ptr_ = nullptr; + size_t n_ = 0; +}; + + + + /*! \brief Cut configuration for all the features. */ struct HistCutMatrix { /*! \brief Unit pointer to rows by element position */ diff --git a/src/common/row_set.h b/src/common/row_set.h index 01e27a39c..285988b15 100644 --- a/src/common/row_set.h +++ b/src/common/row_set.h @@ -59,7 +59,6 @@ class RowSetCollection { } // clear up things inline void Clear() { - row_indices_.clear(); elem_of_each_node_.clear(); } // initialize node id 0->everything diff --git a/src/tree/updater_quantile_hist.cc b/src/tree/updater_quantile_hist.cc index f06cc76c3..c6381c95f 100644 --- a/src/tree/updater_quantile_hist.cc +++ b/src/tree/updater_quantile_hist.cc @@ -420,25 +420,74 @@ void QuantileHistMaker::Builder::InitData(const GHistIndexMatrix& gmat, CHECK_EQ(info.root_index_.size(), 0U); std::vector& row_indices = row_set_collection_.row_indices_; + row_indices.resize(info.num_row_); + auto* p_row_indices = row_indices.data(); // mark subsample and build list of member rows + if (param_.subsample < 1.0f) { std::bernoulli_distribution coin_flip(param_.subsample); auto& rnd = common::GlobalRandom(); + size_t j = 0; for (size_t i = 0; i < info.num_row_; ++i) { if (gpair[i].GetHess() >= 0.0f && coin_flip(rnd)) { - row_indices.push_back(i); + p_row_indices[j++] = i; } } + row_indices.resize(j); } else { - for (size_t i = 0; i < info.num_row_; ++i) { - if (gpair[i].GetHess() >= 0.0f) { - row_indices.push_back(i); + MemStackAllocator buff(this->nthread_); + bool* p_buff = buff.Get(); + std::fill(p_buff, p_buff + this->nthread_, false); + + const size_t block_size = info.num_row_ / this->nthread_ + !!(info.num_row_ % this->nthread_); + + #pragma omp parallel num_threads(this->nthread_) + { + const size_t tid = omp_get_thread_num(); + const size_t ibegin = tid * block_size; + const size_t iend = std::min(static_cast(ibegin + block_size), + static_cast(info.num_row_)); + + for (size_t i = ibegin; i < iend; ++i) { + if (gpair[i].GetHess() < 0.0f) { + p_buff[tid] = true; + break; + } + } + } + + bool has_neg_hess = false; + for (size_t tid = 0; tid < this->nthread_; ++tid) { + if (p_buff[tid]) { + has_neg_hess = true; + } + } + + if (has_neg_hess) { + size_t j = 0; + for (size_t i = 0; i < info.num_row_; ++i) { + if (gpair[i].GetHess() >= 0.0f) { + p_row_indices[j++] = i; + } + } + row_indices.resize(j); + } else { + #pragma omp parallel num_threads(this->nthread_) + { + const size_t tid = omp_get_thread_num(); + const size_t ibegin = tid * block_size; + const size_t iend = std::min(static_cast(ibegin + block_size), + static_cast(info.num_row_)); + for (size_t i = ibegin; i < iend; ++i) { + p_row_indices[i] = i; + } } } } - row_set_collection_.Init(); } + row_set_collection_.Init(); + { /* determine layout of data */ const size_t nrow = info.num_row_; diff --git a/src/tree/updater_quantile_hist.h b/src/tree/updater_quantile_hist.h index 7f346fe58..17688f86a 100644 --- a/src/tree/updater_quantile_hist.h +++ b/src/tree/updater_quantile_hist.h @@ -28,6 +28,41 @@ #include "../common/column_matrix.h" namespace xgboost { + +/*! + * \brief A C-style array with in-stack allocation. As long as the array is smaller than MaxStackSize, it will be allocated inside the stack. Otherwise, it will be heap-allocated. + */ +template +class MemStackAllocator { + public: + explicit MemStackAllocator(size_t required_size): required_size_(required_size) { + } + + T* Get() { + if (!ptr_) { + if (MaxStackSize >= required_size_) { + ptr_ = stack_mem_; + } else { + ptr_ = reinterpret_cast(malloc(required_size_ * sizeof(T))); + do_free_ = true; + } + } + + return ptr_; + } + + ~MemStackAllocator() { + if (do_free_) free(ptr_); + } + + + private: + T* ptr_ = nullptr; + bool do_free_ = false; + size_t required_size_; + T stack_mem_[MaxStackSize]; +}; + namespace tree { using xgboost::common::HistCutMatrix;