Support distributed CPU env for categorical data. (#7575)

* Add support for cat data in sketch allreduce.
* Share tests between CPU and GPU.
This commit is contained in:
Jiaming Yuan
2022-01-18 21:56:07 +08:00
committed by GitHub
parent deab0e32ba
commit cc06fab9a7
5 changed files with 299 additions and 150 deletions

View File

@@ -1,13 +1,14 @@
/*!
* Copyright 2020-2022 by XGBoost Contributors
*/
#include "quantile.h"
#include <limits>
#include <utility>
#include "rabit/rabit.h"
#include "quantile.h"
#include "hist_util.h"
#include "categorical.h"
#include "hist_util.h"
#include "rabit/rabit.h"
namespace xgboost {
namespace common {
@@ -15,7 +16,7 @@ namespace common {
template <typename WQSketch>
SketchContainerImpl<WQSketch>::SketchContainerImpl(std::vector<bst_row_t> columns_size,
int32_t max_bins,
common::Span<FeatureType const> feature_types,
Span<FeatureType const> feature_types,
bool use_group, int32_t n_threads)
: feature_types_(feature_types.cbegin(), feature_types.cend()),
columns_size_{std::move(columns_size)},
@@ -67,7 +68,7 @@ std::vector<bst_feature_t> SketchContainerImpl<WQSketch>::LoadBalance(SparsePage
*/
auto page = batch.GetView();
size_t const total_entries = page.data.size();
size_t const entries_per_thread = common::DivRoundUp(total_entries, nthreads);
size_t const entries_per_thread = DivRoundUp(total_entries, nthreads);
std::vector<std::vector<bst_row_t>> column_sizes(nthreads);
for (auto& column : column_sizes) {
@@ -213,53 +214,162 @@ void SketchContainerImpl<WQSketch>::PushRowPage(SparsePage const &page, MetaInfo
monitor_.Stop(__func__);
}
namespace {
/**
* \brief A view over gathered sketch values.
*/
template <typename T>
struct QuantileAllreduce {
common::Span<T> global_values;
common::Span<size_t> worker_indptr;
common::Span<size_t> feature_indptr;
size_t n_features{0};
/**
* \brief Get sketch values of the a feature from a worker.
*
* \param rank rank of target worker
* \param fidx feature idx
*/
auto Values(int32_t rank, bst_feature_t fidx) const {
// get span for worker
auto wsize = worker_indptr[rank + 1] - worker_indptr[rank];
auto worker_values = global_values.subspan(worker_indptr[rank], wsize);
auto psize = n_features + 1;
auto worker_feat_indptr = feature_indptr.subspan(psize * rank, psize);
// get span for feature
auto feat_beg = worker_feat_indptr[fidx];
auto feat_size = worker_feat_indptr[fidx + 1] - feat_beg;
return worker_values.subspan(feat_beg, feat_size);
}
};
/**
* \brief Merge all categories from other workers.
*/
void AllreduceCategories(Span<FeatureType const> feature_types, int32_t n_threads,
std::vector<std::set<bst_cat_t>> *p_categories) {
auto &categories = *p_categories;
auto world_size = rabit::GetWorldSize();
auto rank = rabit::GetRank();
if (world_size == 1) {
return;
}
// CSC indptr to each feature
std::vector<size_t> feature_ptr(categories.size() + 1, 0);
for (size_t i = 0; i < categories.size(); ++i) {
auto const &feat = categories[i];
feature_ptr[i + 1] = feat.size();
}
std::partial_sum(feature_ptr.begin(), feature_ptr.end(), feature_ptr.begin());
CHECK_EQ(feature_ptr.front(), 0);
// gather all feature ptrs from workers
std::vector<size_t> global_feat_ptrs(feature_ptr.size() * world_size, 0);
size_t feat_begin = rank * feature_ptr.size(); // pointer to current worker
std::copy(feature_ptr.begin(), feature_ptr.end(), global_feat_ptrs.begin() + feat_begin);
rabit::Allreduce<rabit::op::Sum>(global_feat_ptrs.data(), global_feat_ptrs.size());
// move all categories into a flatten vector to prepare for allreduce
size_t total = feature_ptr.back();
std::vector<bst_cat_t> flatten(total, 0);
auto cursor{flatten.begin()};
for (auto const &feat : categories) {
cursor = std::copy(feat.cbegin(), feat.cend(), cursor);
}
// indptr for indexing workers
std::vector<size_t> global_worker_ptr(world_size + 1, 0);
global_worker_ptr[rank + 1] = total; // shift 1 to right for constructing the indptr
rabit::Allreduce<rabit::op::Sum>(global_worker_ptr.data(), global_worker_ptr.size());
std::partial_sum(global_worker_ptr.cbegin(), global_worker_ptr.cend(), global_worker_ptr.begin());
// total number of categories in all workers with all features
auto gtotal = global_worker_ptr.back();
// categories in all workers with all features.
std::vector<bst_cat_t> global_categories(gtotal, 0);
auto rank_begin = global_worker_ptr[rank];
auto rank_size = global_worker_ptr[rank + 1] - rank_begin;
CHECK_EQ(rank_size, total);
std::copy(flatten.cbegin(), flatten.cend(), global_categories.begin() + rank_begin);
// gather values from all workers.
rabit::Allreduce<rabit::op::Sum>(global_categories.data(), global_categories.size());
QuantileAllreduce<bst_cat_t> allreduce_result{global_categories, global_worker_ptr,
global_feat_ptrs, categories.size()};
ParallelFor(categories.size(), n_threads, [&](auto fidx) {
if (!IsCat(feature_types, fidx)) {
return;
}
for (int32_t r = 0; r < world_size; ++r) {
if (r == rank) {
// continue if it's current worker.
continue;
}
// 1 feature of 1 worker
auto worker_feature = allreduce_result.Values(r, fidx);
for (auto c : worker_feature) {
categories[fidx].emplace(c);
}
}
});
}
} // anonymous namespace
template <typename WQSketch>
void SketchContainerImpl<WQSketch>::GatherSketchInfo(
std::vector<typename WQSketch::SummaryContainer> const &reduced,
std::vector<size_t> *p_worker_segments,
std::vector<bst_row_t> *p_sketches_scan,
std::vector<size_t> *p_worker_segments, std::vector<bst_row_t> *p_sketches_scan,
std::vector<typename WQSketch::Entry> *p_global_sketches) {
auto& worker_segments = *p_worker_segments;
auto &worker_segments = *p_worker_segments;
worker_segments.resize(1, 0);
auto world = rabit::GetWorldSize();
auto rank = rabit::GetRank();
auto n_columns = sketches_.size();
// get the size of each feature.
std::vector<bst_row_t> sketch_size;
for (auto const& sketch : reduced) {
sketch_size.push_back(sketch.size);
for (size_t i = 0; i < reduced.size(); ++i) {
if (IsCat(feature_types_, i)) {
sketch_size.push_back(0);
} else {
sketch_size.push_back(reduced[i].size);
}
}
std::vector<bst_row_t>& sketches_scan = *p_sketches_scan;
// turn the size into CSC indptr
std::vector<bst_row_t> &sketches_scan = *p_sketches_scan;
sketches_scan.resize((n_columns + 1) * world, 0);
size_t beg_scan = rank * (n_columns + 1);
std::partial_sum(sketch_size.cbegin(), sketch_size.cend(),
sketches_scan.begin() + beg_scan + 1);
size_t beg_scan = rank * (n_columns + 1); // starting storage for current worker.
std::partial_sum(sketch_size.cbegin(), sketch_size.cend(), sketches_scan.begin() + beg_scan + 1);
// Gather all column pointers
rabit::Allreduce<rabit::op::Sum>(sketches_scan.data(), sketches_scan.size());
for (int32_t i = 0; i < world; ++i) {
size_t back = (i + 1) * (n_columns + 1) - 1;
auto n_entries = sketches_scan.at(back);
worker_segments.push_back(n_entries);
}
// Offset of sketch from each worker.
std::partial_sum(worker_segments.begin(), worker_segments.end(),
worker_segments.begin());
std::partial_sum(worker_segments.begin(), worker_segments.end(), worker_segments.begin());
CHECK_GE(worker_segments.size(), 1);
auto total = worker_segments.back();
auto& global_sketches = *p_global_sketches;
global_sketches.resize(total, typename WQSketch::Entry{0, 0, 0, 0});
auto &global_sketches = *p_global_sketches;
global_sketches.resize(total, typename WQSketch::Entry{0, 0, 0, 0});
auto worker_sketch = Span<typename WQSketch::Entry>{global_sketches}.subspan(
worker_segments[rank], worker_segments[rank + 1] - worker_segments[rank]);
size_t cursor = 0;
for (auto const &sketch : reduced) {
std::copy(sketch.data, sketch.data + sketch.size,
worker_sketch.begin() + cursor);
cursor += sketch.size;
auto cursor{worker_sketch.begin()};
for (size_t fidx = 0; fidx < reduced.size(); ++fidx) {
auto const &sketch = reduced[fidx];
if (IsCat(feature_types_, fidx)) {
// nothing to do if it's categorical feature, size is 0 so no need to change cursor
continue;
} else {
cursor = std::copy(sketch.data, sketch.data + sketch.size, cursor);
}
}
static_assert(sizeof(typename WQSketch::Entry) / 4 == sizeof(float), "");
static_assert(sizeof(typename WQSketch::Entry) / 4 == sizeof(float),
"Unexpected size of sketch entry.");
rabit::Allreduce<rabit::op::Sum>(
reinterpret_cast<float *>(global_sketches.data()),
global_sketches.size() * sizeof(typename WQSketch::Entry) / sizeof(float));
@@ -270,6 +380,13 @@ void SketchContainerImpl<WQSketch>::AllReduce(
std::vector<typename WQSketch::SummaryContainer> *p_reduced,
std::vector<int32_t>* p_num_cuts) {
monitor_.Start(__func__);
size_t n_columns = sketches_.size();
rabit::Allreduce<rabit::op::Max>(&n_columns, 1);
CHECK_EQ(n_columns, sketches_.size()) << "Number of columns differs across workers";
AllreduceCategories(feature_types_, n_threads_, &categories_);
auto& num_cuts = *p_num_cuts;
CHECK_EQ(num_cuts.size(), 0);
num_cuts.resize(sketches_.size());
@@ -277,19 +394,19 @@ void SketchContainerImpl<WQSketch>::AllReduce(
auto &reduced = *p_reduced;
reduced.resize(sketches_.size());
size_t n_columns = sketches_.size();
rabit::Allreduce<rabit::op::Max>(&n_columns, 1);
CHECK_EQ(n_columns, sketches_.size()) << "Number of columns differs across workers";
// Prune the intermediate num cuts for synchronization.
std::vector<bst_row_t> global_column_size(columns_size_);
rabit::Allreduce<rabit::op::Sum>(global_column_size.data(), global_column_size.size());
ParallelFor(sketches_.size(), n_threads_, [&](size_t i) {
int32_t intermediate_num_cuts = static_cast<int32_t>(
std::min(global_column_size[i],
static_cast<size_t>(max_bins_ * WQSketch::kFactor)));
if (global_column_size[i] != 0) {
std::min(global_column_size[i], static_cast<size_t>(max_bins_ * WQSketch::kFactor)));
if (global_column_size[i] == 0) {
return;
}
if (IsCat(feature_types_, i)) {
intermediate_num_cuts = categories_[i].size();
} else {
typename WQSketch::SummaryContainer out;
sketches_[i].GetSummary(&out);
reduced[i].Reserve(intermediate_num_cuts);
@@ -309,25 +426,21 @@ void SketchContainerImpl<WQSketch>::AllReduce(
std::vector<bst_row_t> sketches_scan((n_columns + 1) * world, 0);
std::vector<typename WQSketch::Entry> global_sketches;
this->GatherSketchInfo(reduced, &worker_segments, &sketches_scan,
&global_sketches);
this->GatherSketchInfo(reduced, &worker_segments, &sketches_scan, &global_sketches);
std::vector<typename WQSketch::SummaryContainer> final_sketches(n_columns);
QuantileAllreduce<typename WQSketch::Entry> allreduce_result{global_sketches, worker_segments,
sketches_scan, n_columns};
ParallelFor(n_columns, n_threads_, [&](auto fidx) {
int32_t intermediate_num_cuts = num_cuts[fidx];
auto nbytes =
WQSketch::SummaryContainer::CalcMemCost(intermediate_num_cuts);
auto nbytes = WQSketch::SummaryContainer::CalcMemCost(intermediate_num_cuts);
if (IsCat(feature_types_, fidx)) {
return;
}
for (int32_t i = 1; i < world + 1; ++i) {
auto size = worker_segments.at(i) - worker_segments[i - 1];
auto worker_sketches =
Span<typename WQSketch::Entry>{global_sketches}.subspan(worker_segments[i - 1], size);
auto worker_scan =
Span<bst_row_t>(sketches_scan)
.subspan((i - 1) * (n_columns + 1), (n_columns + 1));
auto worker_feature = worker_sketches.subspan(
worker_scan[fidx], worker_scan[fidx + 1] - worker_scan[fidx]);
for (int32_t r = 0; r < world; ++r) {
// 1 feature of 1 worker
auto worker_feature = allreduce_result.Values(r, fidx);
CHECK(worker_feature.data());
typename WQSketch::Summary summary(worker_feature.data(), worker_feature.size());
auto &out = final_sketches.at(fidx);