Support cpu quantile sketch with column-wise data split (#8742)

This commit is contained in:
Rong Ou
2023-02-04 22:26:24 -08:00
committed by GitHub
parent c1786849e3
commit 66191e9926
15 changed files with 250 additions and 118 deletions

View File

@@ -45,14 +45,16 @@ HistogramCuts SketchOnDMatrix(DMatrix *m, int32_t max_bins, int32_t n_threads, b
if (!use_sorted) {
HostSketchContainer container(max_bins, m->Info().feature_types.ConstHostSpan(), reduced,
HostSketchContainer::UseGroup(info), n_threads);
HostSketchContainer::UseGroup(info),
m->Info().data_split_mode == DataSplitMode::kCol, n_threads);
for (auto const& page : m->GetBatches<SparsePage>()) {
container.PushRowPage(page, info, hessian);
}
container.MakeCuts(&out);
} else {
SortedSketchContainer container{max_bins, m->Info().feature_types.ConstHostSpan(), reduced,
HostSketchContainer::UseGroup(info), n_threads};
HostSketchContainer::UseGroup(info),
m->Info().data_split_mode == DataSplitMode::kCol, n_threads};
for (auto const& page : m->GetBatches<SortedCSCPage>()) {
container.PushColPage(page, info, hessian);
}

View File

@@ -18,11 +18,13 @@ template <typename WQSketch>
SketchContainerImpl<WQSketch>::SketchContainerImpl(std::vector<bst_row_t> columns_size,
int32_t max_bins,
Span<FeatureType const> feature_types,
bool use_group, int32_t n_threads)
bool use_group, bool col_split,
int32_t n_threads)
: feature_types_(feature_types.cbegin(), feature_types.cend()),
columns_size_{std::move(columns_size)},
max_bins_{max_bins},
use_group_ind_{use_group},
col_split_{col_split},
n_threads_{n_threads} {
monitor_.Init(__func__);
CHECK_NE(columns_size_.size(), 0);
@@ -137,80 +139,6 @@ struct QuantileAllreduce {
return worker_values.subspan(feat_beg, feat_size);
}
};
/**
* \brief Merge all categories from other workers.
*/
void AllreduceCategories(Span<FeatureType const> feature_types, int32_t n_threads,
std::vector<std::set<float>> *p_categories) {
auto &categories = *p_categories;
auto world_size = collective::GetWorldSize();
auto rank = collective::GetRank();
if (world_size == 1) {
return;
}
// CSC indptr to each feature
std::vector<size_t> feature_ptr(categories.size() + 1, 0);
for (size_t i = 0; i < categories.size(); ++i) {
auto const &feat = categories[i];
feature_ptr[i + 1] = feat.size();
}
std::partial_sum(feature_ptr.begin(), feature_ptr.end(), feature_ptr.begin());
CHECK_EQ(feature_ptr.front(), 0);
// gather all feature ptrs from workers
std::vector<size_t> global_feat_ptrs(feature_ptr.size() * world_size, 0);
size_t feat_begin = rank * feature_ptr.size(); // pointer to current worker
std::copy(feature_ptr.begin(), feature_ptr.end(), global_feat_ptrs.begin() + feat_begin);
collective::Allreduce<collective::Operation::kSum>(global_feat_ptrs.data(),
global_feat_ptrs.size());
// move all categories into a flatten vector to prepare for allreduce
size_t total = feature_ptr.back();
std::vector<float> flatten(total, 0);
auto cursor{flatten.begin()};
for (auto const &feat : categories) {
cursor = std::copy(feat.cbegin(), feat.cend(), cursor);
}
// indptr for indexing workers
std::vector<size_t> global_worker_ptr(world_size + 1, 0);
global_worker_ptr[rank + 1] = total; // shift 1 to right for constructing the indptr
collective::Allreduce<collective::Operation::kSum>(global_worker_ptr.data(),
global_worker_ptr.size());
std::partial_sum(global_worker_ptr.cbegin(), global_worker_ptr.cend(), global_worker_ptr.begin());
// total number of categories in all workers with all features
auto gtotal = global_worker_ptr.back();
// categories in all workers with all features.
std::vector<float> global_categories(gtotal, 0);
auto rank_begin = global_worker_ptr[rank];
auto rank_size = global_worker_ptr[rank + 1] - rank_begin;
CHECK_EQ(rank_size, total);
std::copy(flatten.cbegin(), flatten.cend(), global_categories.begin() + rank_begin);
// gather values from all workers.
collective::Allreduce<collective::Operation::kSum>(global_categories.data(),
global_categories.size());
QuantileAllreduce<float> allreduce_result{global_categories, global_worker_ptr, global_feat_ptrs,
categories.size()};
ParallelFor(categories.size(), n_threads, [&](auto fidx) {
if (!IsCat(feature_types, fidx)) {
return;
}
for (int32_t r = 0; r < world_size; ++r) {
if (r == rank) {
// continue if it's current worker.
continue;
}
// 1 feature of 1 worker
auto worker_feature = allreduce_result.Values(r, fidx);
for (auto c : worker_feature) {
categories[fidx].emplace(c);
}
}
});
}
} // anonymous namespace
template <typename WQSketch>
@@ -273,6 +201,76 @@ void SketchContainerImpl<WQSketch>::GatherSketchInfo(
global_sketches.size() * sizeof(typename WQSketch::Entry) / sizeof(float));
}
template <typename WQSketch>
void SketchContainerImpl<WQSketch>::AllreduceCategories() {
auto world_size = collective::GetWorldSize();
auto rank = collective::GetRank();
if (world_size == 1 || col_split_) {
return;
}
// CSC indptr to each feature
std::vector<size_t> feature_ptr(categories_.size() + 1, 0);
for (size_t i = 0; i < categories_.size(); ++i) {
auto const &feat = categories_[i];
feature_ptr[i + 1] = feat.size();
}
std::partial_sum(feature_ptr.begin(), feature_ptr.end(), feature_ptr.begin());
CHECK_EQ(feature_ptr.front(), 0);
// gather all feature ptrs from workers
std::vector<size_t> global_feat_ptrs(feature_ptr.size() * world_size, 0);
size_t feat_begin = rank * feature_ptr.size(); // pointer to current worker
std::copy(feature_ptr.begin(), feature_ptr.end(), global_feat_ptrs.begin() + feat_begin);
collective::Allreduce<collective::Operation::kSum>(global_feat_ptrs.data(),
global_feat_ptrs.size());
// move all categories into a flatten vector to prepare for allreduce
size_t total = feature_ptr.back();
std::vector<float> flatten(total, 0);
auto cursor{flatten.begin()};
for (auto const &feat : categories_) {
cursor = std::copy(feat.cbegin(), feat.cend(), cursor);
}
// indptr for indexing workers
std::vector<size_t> global_worker_ptr(world_size + 1, 0);
global_worker_ptr[rank + 1] = total; // shift 1 to right for constructing the indptr
collective::Allreduce<collective::Operation::kSum>(global_worker_ptr.data(),
global_worker_ptr.size());
std::partial_sum(global_worker_ptr.cbegin(), global_worker_ptr.cend(), global_worker_ptr.begin());
// total number of categories in all workers with all features
auto gtotal = global_worker_ptr.back();
// categories in all workers with all features.
std::vector<float> global_categories(gtotal, 0);
auto rank_begin = global_worker_ptr[rank];
auto rank_size = global_worker_ptr[rank + 1] - rank_begin;
CHECK_EQ(rank_size, total);
std::copy(flatten.cbegin(), flatten.cend(), global_categories.begin() + rank_begin);
// gather values from all workers.
collective::Allreduce<collective::Operation::kSum>(global_categories.data(),
global_categories.size());
QuantileAllreduce<float> allreduce_result{global_categories, global_worker_ptr, global_feat_ptrs,
categories_.size()};
ParallelFor(categories_.size(), n_threads_, [&](auto fidx) {
if (!IsCat(feature_types_, fidx)) {
return;
}
for (int32_t r = 0; r < world_size; ++r) {
if (r == rank) {
// continue if it's current worker.
continue;
}
// 1 feature of 1 worker
auto worker_feature = allreduce_result.Values(r, fidx);
for (auto c : worker_feature) {
categories_[fidx].emplace(c);
}
}
});
}
template <typename WQSketch>
void SketchContainerImpl<WQSketch>::AllReduce(
std::vector<typename WQSketch::SummaryContainer> *p_reduced,
@@ -283,7 +281,7 @@ void SketchContainerImpl<WQSketch>::AllReduce(
collective::Allreduce<collective::Operation::kMax>(&n_columns, 1);
CHECK_EQ(n_columns, sketches_.size()) << "Number of columns differs across workers";
AllreduceCategories(feature_types_, n_threads_, &categories_);
AllreduceCategories();
auto& num_cuts = *p_num_cuts;
CHECK_EQ(num_cuts.size(), 0);
@@ -294,8 +292,10 @@ void SketchContainerImpl<WQSketch>::AllReduce(
// Prune the intermediate num cuts for synchronization.
std::vector<bst_row_t> global_column_size(columns_size_);
collective::Allreduce<collective::Operation::kSum>(global_column_size.data(),
global_column_size.size());
if (!col_split_) {
collective::Allreduce<collective::Operation::kSum>(global_column_size.data(),
global_column_size.size());
}
ParallelFor(sketches_.size(), n_threads_, [&](size_t i) {
int32_t intermediate_num_cuts = static_cast<int32_t>(
@@ -316,7 +316,7 @@ void SketchContainerImpl<WQSketch>::AllReduce(
});
auto world = collective::GetWorldSize();
if (world == 1) {
if (world == 1 || col_split_) {
monitor_.Stop(__func__);
return;
}
@@ -442,8 +442,8 @@ template class SketchContainerImpl<WXQuantileSketch<float, float>>;
HostSketchContainer::HostSketchContainer(int32_t max_bins, common::Span<FeatureType const> ft,
std::vector<size_t> columns_size, bool use_group,
int32_t n_threads)
: SketchContainerImpl{columns_size, max_bins, ft, use_group, n_threads} {
bool col_split, int32_t n_threads)
: SketchContainerImpl{columns_size, max_bins, ft, use_group, col_split, n_threads} {
monitor_.Init(__func__);
ParallelFor(sketches_.size(), n_threads_, Sched::Auto(), [&](auto i) {
auto n_bins = std::min(static_cast<size_t>(max_bins_), columns_size_[i]);

View File

@@ -802,6 +802,7 @@ class SketchContainerImpl {
std::vector<bst_row_t> columns_size_;
int32_t max_bins_;
bool use_group_ind_{false};
bool col_split_;
int32_t n_threads_;
bool has_categorical_{false};
Monitor monitor_;
@@ -814,7 +815,7 @@ class SketchContainerImpl {
* \param use_group whether is assigned to group to data instance.
*/
SketchContainerImpl(std::vector<bst_row_t> columns_size, int32_t max_bins,
common::Span<FeatureType const> feature_types, bool use_group,
common::Span<FeatureType const> feature_types, bool use_group, bool col_split,
int32_t n_threads);
static bool UseGroup(MetaInfo const &info) {
@@ -896,6 +897,10 @@ class SketchContainerImpl {
void PushRowPage(SparsePage const &page, MetaInfo const &info, Span<float const> hessian = {});
void MakeCuts(HistogramCuts* cuts);
private:
// Merge all categories from other workers.
void AllreduceCategories();
};
class HostSketchContainer : public SketchContainerImpl<WQuantileSketch<float, float>> {
@@ -904,7 +909,8 @@ class HostSketchContainer : public SketchContainerImpl<WQuantileSketch<float, fl
public:
HostSketchContainer(int32_t max_bins, common::Span<FeatureType const> ft,
std::vector<size_t> columns_size, bool use_group, int32_t n_threads);
std::vector<size_t> columns_size, bool use_group, bool col_split,
int32_t n_threads);
template <typename Batch>
void PushAdapterBatch(Batch const &batch, size_t base_rowid, MetaInfo const &info, float missing);
@@ -1000,9 +1006,9 @@ class SortedSketchContainer : public SketchContainerImpl<WXQuantileSketch<float,
public:
explicit SortedSketchContainer(int32_t max_bins, common::Span<FeatureType const> ft,
std::vector<size_t> columns_size, bool use_group,
std::vector<size_t> columns_size, bool use_group, bool col_split,
int32_t n_threads)
: SketchContainerImpl{columns_size, max_bins, ft, use_group, n_threads} {
: SketchContainerImpl{columns_size, max_bins, ft, use_group, col_split, n_threads} {
monitor_.Init(__func__);
sketches_.resize(columns_size.size());
size_t i = 0;