/*! * Copyright 2020-2021 by XGBoost Contributors */ #include #include #include "rabit/rabit.h" #include "quantile.h" #include "hist_util.h" #include "categorical.h" namespace xgboost { namespace common { template SketchContainerImpl::SketchContainerImpl(std::vector columns_size, int32_t max_bins, common::Span feature_types, bool use_group, int32_t n_threads) : feature_types_(feature_types.cbegin(), feature_types.cend()), columns_size_{std::move(columns_size)}, max_bins_{max_bins}, use_group_ind_{use_group}, n_threads_{n_threads} { monitor_.Init(__func__); CHECK_NE(columns_size_.size(), 0); sketches_.resize(columns_size_.size()); CHECK_GE(n_threads_, 1); categories_.resize(columns_size_.size()); } template std::vector SketchContainerImpl::CalcColumnSize(SparsePage const &batch, bst_feature_t const n_columns, size_t const nthreads) { auto page = batch.GetView(); std::vector> column_sizes(nthreads); for (auto &column : column_sizes) { column.resize(n_columns, 0); } ParallelFor(page.Size(), nthreads, [&](omp_ulong i) { auto &local_column_sizes = column_sizes.at(omp_get_thread_num()); auto row = page[i]; auto const *p_row = row.data(); for (size_t j = 0; j < row.size(); ++j) { local_column_sizes.at(p_row[j].index)++; } }); std::vector entries_per_columns(n_columns, 0); ParallelFor(n_columns, nthreads, [&](bst_omp_uint i) { for (auto const &thread : column_sizes) { entries_per_columns[i] += thread[i]; } }); return entries_per_columns; } template std::vector SketchContainerImpl::LoadBalance(SparsePage const &batch, bst_feature_t n_columns, size_t const nthreads) { /* Some sparse datasets have their mass concentrating on small number of features. To * avoid waiting for a few threads running forever, we here distribute different number * of columns to different threads according to number of entries. */ auto page = batch.GetView(); size_t const total_entries = page.data.size(); size_t const entries_per_thread = common::DivRoundUp(total_entries, nthreads); std::vector> column_sizes(nthreads); for (auto& column : column_sizes) { column.resize(n_columns, 0); } std::vector entries_per_columns = CalcColumnSize(batch, n_columns, nthreads); std::vector cols_ptr(nthreads + 1, 0); size_t count {0}; size_t current_thread {1}; for (auto col : entries_per_columns) { cols_ptr.at(current_thread)++; // add one column to thread count += col; CHECK_LE(count, total_entries); if (count > entries_per_thread) { current_thread++; count = 0; cols_ptr.at(current_thread) = cols_ptr[current_thread-1]; } } // Idle threads. for (; current_thread < cols_ptr.size() - 1; ++current_thread) { cols_ptr[current_thread+1] = cols_ptr[current_thread]; } return cols_ptr; } namespace { // Function to merge hessian and sample weights std::vector MergeWeights(MetaInfo const &info, Span hessian, bool use_group, int32_t n_threads) { CHECK_EQ(hessian.size(), info.num_row_); std::vector results(hessian.size()); auto const &group_ptr = info.group_ptr_; auto const& weights = info.weights_.HostVector(); auto get_weight = [&](size_t i) { return weights.empty() ? 1.0f : weights[i]; }; if (use_group) { CHECK_GE(group_ptr.size(), 2); CHECK_EQ(group_ptr.back(), hessian.size()); size_t cur_group = 0; for (size_t i = 0; i < hessian.size(); ++i) { results[i] = hessian[i] * get_weight(cur_group); if (i == group_ptr[cur_group + 1]) { cur_group++; } } } else { ParallelFor(hessian.size(), n_threads, Sched::Auto(), [&](auto i) { results[i] = hessian[i] * get_weight(i); }); } return results; } std::vector UnrollGroupWeights(MetaInfo const &info) { std::vector const &group_weights = info.weights_.HostVector(); if (group_weights.empty()) { return group_weights; } size_t n_samples = info.num_row_; auto const &group_ptr = info.group_ptr_; std::vector results(n_samples); CHECK_GE(group_ptr.size(), 2); CHECK_EQ(group_ptr.back(), n_samples); size_t cur_group = 0; for (size_t i = 0; i < n_samples; ++i) { results[i] = group_weights[cur_group]; if (i == group_ptr[cur_group + 1]) { cur_group++; } } return results; } } // anonymous namespace template void SketchContainerImpl::PushRowPage(SparsePage const &page, MetaInfo const &info, Span hessian) { monitor_.Start(__func__); bst_feature_t n_columns = info.num_col_; auto is_dense = info.num_nonzero_ == info.num_col_ * info.num_row_; CHECK_GE(n_threads_, 1); CHECK_EQ(sketches_.size(), n_columns); // glue these conditions using ternary operator to avoid making data copies. auto const &weights = hessian.empty() ? (use_group_ind_ ? UnrollGroupWeights(info) // use group weight : info.weights_.HostVector()) // use sample weight : MergeWeights( info, hessian, use_group_ind_, n_threads_); // use hessian merged with group/sample weights if (!weights.empty()) { CHECK_EQ(weights.size(), info.num_row_); } auto batch = page.GetView(); // Parallel over columns. Each thread owns a set of consecutive columns. auto const ncol = static_cast(info.num_col_); auto thread_columns_ptr = LoadBalance(page, info.num_col_, n_threads_); dmlc::OMPException exc; #pragma omp parallel num_threads(n_threads_) { exc.Run([&]() { auto tid = static_cast(omp_get_thread_num()); auto const begin = thread_columns_ptr[tid]; auto const end = thread_columns_ptr[tid + 1]; // do not iterate if no columns are assigned to the thread if (begin < end && end <= ncol) { for (size_t i = 0; i < batch.Size(); ++i) { size_t const ridx = page.base_rowid + i; SparsePage::Inst const inst = batch[i]; auto w = weights.empty() ? 1.0f : weights[ridx]; auto p_inst = inst.data(); if (is_dense) { for (size_t ii = begin; ii < end; ii++) { if (IsCat(feature_types_, ii)) { categories_[ii].emplace(AsCat(p_inst[ii].fvalue)); } else { sketches_[ii].Push(p_inst[ii].fvalue, w); } } } else { for (size_t i = 0; i < inst.size(); ++i) { auto const& entry = p_inst[i]; if (entry.index >= begin && entry.index < end) { if (IsCat(feature_types_, entry.index)) { categories_[entry.index].emplace(AsCat(entry.fvalue)); } else { sketches_[entry.index].Push(entry.fvalue, w); } } } } } } }); } exc.Rethrow(); monitor_.Stop(__func__); } template void SketchContainerImpl::GatherSketchInfo( std::vector const &reduced, std::vector *p_worker_segments, std::vector *p_sketches_scan, std::vector *p_global_sketches) { auto& worker_segments = *p_worker_segments; worker_segments.resize(1, 0); auto world = rabit::GetWorldSize(); auto rank = rabit::GetRank(); auto n_columns = sketches_.size(); std::vector sketch_size; for (auto const& sketch : reduced) { sketch_size.push_back(sketch.size); } std::vector& sketches_scan = *p_sketches_scan; sketches_scan.resize((n_columns + 1) * world, 0); size_t beg_scan = rank * (n_columns + 1); std::partial_sum(sketch_size.cbegin(), sketch_size.cend(), sketches_scan.begin() + beg_scan + 1); // Gather all column pointers rabit::Allreduce(sketches_scan.data(), sketches_scan.size()); for (int32_t i = 0; i < world; ++i) { size_t back = (i + 1) * (n_columns + 1) - 1; auto n_entries = sketches_scan.at(back); worker_segments.push_back(n_entries); } // Offset of sketch from each worker. std::partial_sum(worker_segments.begin(), worker_segments.end(), worker_segments.begin()); CHECK_GE(worker_segments.size(), 1); auto total = worker_segments.back(); auto& global_sketches = *p_global_sketches; global_sketches.resize(total, typename WQSketch::Entry{0, 0, 0, 0}); auto worker_sketch = Span{global_sketches}.subspan( worker_segments[rank], worker_segments[rank + 1] - worker_segments[rank]); size_t cursor = 0; for (auto const &sketch : reduced) { std::copy(sketch.data, sketch.data + sketch.size, worker_sketch.begin() + cursor); cursor += sketch.size; } static_assert(sizeof(typename WQSketch::Entry) / 4 == sizeof(float), ""); rabit::Allreduce( reinterpret_cast(global_sketches.data()), global_sketches.size() * sizeof(typename WQSketch::Entry) / sizeof(float)); } template void SketchContainerImpl::AllReduce( std::vector *p_reduced, std::vector* p_num_cuts) { monitor_.Start(__func__); auto& num_cuts = *p_num_cuts; CHECK_EQ(num_cuts.size(), 0); num_cuts.resize(sketches_.size()); auto &reduced = *p_reduced; reduced.resize(sketches_.size()); size_t n_columns = sketches_.size(); rabit::Allreduce(&n_columns, 1); CHECK_EQ(n_columns, sketches_.size()) << "Number of columns differs across workers"; // Prune the intermediate num cuts for synchronization. std::vector global_column_size(columns_size_); rabit::Allreduce(global_column_size.data(), global_column_size.size()); ParallelFor(sketches_.size(), n_threads_, [&](size_t i) { int32_t intermediate_num_cuts = static_cast( std::min(global_column_size[i], static_cast(max_bins_ * WQSketch::kFactor))); if (global_column_size[i] != 0) { typename WQSketch::SummaryContainer out; sketches_[i].GetSummary(&out); reduced[i].Reserve(intermediate_num_cuts); CHECK(reduced[i].data); reduced[i].SetPrune(out, intermediate_num_cuts); } num_cuts[i] = intermediate_num_cuts; }); auto world = rabit::GetWorldSize(); if (world == 1) { monitor_.Stop(__func__); return; } std::vector worker_segments(1, 0); // CSC pointer to sketches. std::vector sketches_scan((n_columns + 1) * world, 0); std::vector global_sketches; this->GatherSketchInfo(reduced, &worker_segments, &sketches_scan, &global_sketches); std::vector final_sketches(n_columns); ParallelFor(n_columns, n_threads_, [&](auto fidx) { int32_t intermediate_num_cuts = num_cuts[fidx]; auto nbytes = WQSketch::SummaryContainer::CalcMemCost(intermediate_num_cuts); for (int32_t i = 1; i < world + 1; ++i) { auto size = worker_segments.at(i) - worker_segments[i - 1]; auto worker_sketches = Span{global_sketches}.subspan(worker_segments[i - 1], size); auto worker_scan = Span(sketches_scan) .subspan((i - 1) * (n_columns + 1), (n_columns + 1)); auto worker_feature = worker_sketches.subspan( worker_scan[fidx], worker_scan[fidx + 1] - worker_scan[fidx]); CHECK(worker_feature.data()); typename WQSketch::Summary summary(worker_feature.data(), worker_feature.size()); auto &out = final_sketches.at(fidx); out.Reduce(summary, nbytes); } reduced.at(fidx).Reserve(intermediate_num_cuts); reduced.at(fidx).SetPrune(final_sketches.at(fidx), intermediate_num_cuts); }); monitor_.Stop(__func__); } template void AddCutPoint(typename SketchType::SummaryContainer const &summary, int max_bin, HistogramCuts *cuts) { size_t required_cuts = std::min(summary.size, static_cast(max_bin)); auto &cut_values = cuts->cut_values_.HostVector(); for (size_t i = 1; i < required_cuts; ++i) { bst_float cpt = summary.data[i].value; if (i == 1 || cpt > cut_values.back()) { cut_values.push_back(cpt); } } } void AddCategories(std::set const &categories, HistogramCuts *cuts) { auto &cut_values = cuts->cut_values_.HostVector(); for (auto const &v : categories) { cut_values.push_back(v); } } template void SketchContainerImpl::MakeCuts(HistogramCuts* cuts) { monitor_.Start(__func__); std::vector reduced; std::vector num_cuts; this->AllReduce(&reduced, &num_cuts); cuts->min_vals_.HostVector().resize(sketches_.size(), 0.0f); std::vector final_summaries(reduced.size()); ParallelFor(reduced.size(), n_threads_, Sched::Guided(), [&](size_t fidx) { if (IsCat(feature_types_, fidx)) { return; } typename WQSketch::SummaryContainer &a = final_summaries[fidx]; size_t max_num_bins = std::min(num_cuts[fidx], max_bins_); a.Reserve(max_num_bins + 1); CHECK(a.data); if (num_cuts[fidx] != 0) { a.SetPrune(reduced[fidx], max_num_bins + 1); CHECK(a.data && reduced[fidx].data); const bst_float mval = a.data[0].value; cuts->min_vals_.HostVector()[fidx] = mval - fabs(mval) - 1e-5f; } else { // Empty column. const float mval = 1e-5f; cuts->min_vals_.HostVector()[fidx] = mval; } }); for (size_t fid = 0; fid < reduced.size(); ++fid) { size_t max_num_bins = std::min(num_cuts[fid], max_bins_); typename WQSketch::SummaryContainer const& a = final_summaries[fid]; if (IsCat(feature_types_, fid)) { AddCategories(categories_.at(fid), cuts); } else { AddCutPoint(a, max_num_bins, cuts); // push a value that is greater than anything const bst_float cpt = (a.size > 0) ? a.data[a.size - 1].value : cuts->min_vals_.HostVector()[fid]; // this must be bigger than last value in a scale const bst_float last = cpt + (fabs(cpt) + 1e-5f); cuts->cut_values_.HostVector().push_back(last); } // Ensure that every feature gets at least one quantile point CHECK_LE(cuts->cut_values_.HostVector().size(), std::numeric_limits::max()); auto cut_size = static_cast(cuts->cut_values_.HostVector().size()); CHECK_GT(cut_size, cuts->cut_ptrs_.HostVector().back()); cuts->cut_ptrs_.HostVector().push_back(cut_size); } monitor_.Stop(__func__); } template class SketchContainerImpl>; template class SketchContainerImpl>; HostSketchContainer::HostSketchContainer(int32_t max_bins, MetaInfo const &info, std::vector columns_size, bool use_group, Span hessian, int32_t n_threads) : SketchContainerImpl{columns_size, max_bins, info.feature_types.ConstHostSpan(), use_group, n_threads} { monitor_.Init(__func__); ParallelFor(sketches_.size(), n_threads_, Sched::Auto(), [&](auto i) { auto n_bins = std::min(static_cast(max_bins_), columns_size_[i]); n_bins = std::max(n_bins, static_cast(1)); auto eps = 1.0 / (static_cast(n_bins) * WQSketch::kFactor); if (!IsCat(this->feature_types_, i)) { sketches_[i].Init(columns_size_[i], eps); sketches_[i].inqueue.queue.resize(sketches_[i].limit_size * 2); } }); } void SortedSketchContainer::PushColPage(SparsePage const &page, MetaInfo const &info, Span hessian) { monitor_.Start(__func__); // glue these conditions using ternary operator to avoid making data copies. auto const &weights = hessian.empty() ? (use_group_ind_ ? UnrollGroupWeights(info) // use group weight : info.weights_.HostVector()) // use sample weight : MergeWeights(info, hessian, use_group_ind_, n_threads_); // use hessian merged with group/sample weights CHECK_EQ(weights.size(), info.num_row_); auto view = page.GetView(); ParallelFor(view.Size(), n_threads_, [&](size_t fidx) { auto column = view[fidx]; auto &sketch = sketches_[fidx]; sketch.Init(max_bins_); // first pass sketch.sum_total = 0.0; for (auto c : column) { sketch.sum_total += weights[c.index]; } // second pass if (IsCat(feature_types_, fidx)) { for (auto c : column) { categories_[fidx].emplace(AsCat(c.fvalue)); } } else { for (auto c : column) { sketch.Push(c.fvalue, weights[c.index], max_bins_); } } if (!IsCat(feature_types_, fidx) && !column.empty()) { sketch.Finalize(max_bins_); } }); monitor_.Stop(__func__); } } // namespace common } // namespace xgboost