Optimize cpu sketch allreduce for sparse data. (#6009)
* Bypass RABIT serialization reducer and use custom allgather based merging.
This commit is contained in:
@@ -116,26 +116,14 @@ inline HistogramCuts SketchOnDMatrix(DMatrix *m, int32_t max_bins) {
|
||||
for (auto& column : column_sizes) {
|
||||
column.resize(info.num_col_, 0);
|
||||
}
|
||||
for (auto const& page : m->GetBatches<SparsePage>()) {
|
||||
page.data.HostVector();
|
||||
page.offset.HostVector();
|
||||
ParallelFor(page.Size(), threads, [&](size_t i) {
|
||||
auto &local_column_sizes = column_sizes.at(omp_get_thread_num());
|
||||
auto row = page[i];
|
||||
auto const *p_row = row.data();
|
||||
for (size_t j = 0; j < row.size(); ++j) {
|
||||
local_column_sizes.at(p_row[j].index)++;
|
||||
}
|
||||
});
|
||||
}
|
||||
std::vector<bst_row_t> reduced(info.num_col_, 0);
|
||||
|
||||
ParallelFor(info.num_col_, threads, [&](size_t i) {
|
||||
for (auto const &thread : column_sizes) {
|
||||
reduced[i] += thread[i];
|
||||
for (auto const& page : m->GetBatches<SparsePage>()) {
|
||||
auto const &entries_per_column =
|
||||
HostSketchContainer::CalcColumnSize(page, info.num_col_, threads);
|
||||
for (size_t i = 0; i < entries_per_column.size(); ++i) {
|
||||
reduced[i] += entries_per_column[i];
|
||||
}
|
||||
});
|
||||
|
||||
}
|
||||
HostSketchContainer container(reduced, max_bins,
|
||||
HostSketchContainer::UseGroup(info));
|
||||
for (auto const &page : m->GetBatches<SparsePage>()) {
|
||||
|
||||
@@ -25,34 +25,67 @@ HostSketchContainer::HostSketchContainer(std::vector<bst_row_t> columns_size,
|
||||
}
|
||||
}
|
||||
|
||||
std::vector<bst_feature_t> LoadBalance(SparsePage const &page,
|
||||
std::vector<size_t> columns_size,
|
||||
size_t const nthreads) {
|
||||
/* Some sparse datasets have their mass concentrating on small
|
||||
* number of features. To avoid wating for a few threads running
|
||||
* forever, we here distirbute different number of columns to
|
||||
* different threads according to number of entries. */
|
||||
size_t const total_entries = page.data.Size();
|
||||
std::vector<bst_row_t>
|
||||
HostSketchContainer::CalcColumnSize(SparsePage const &batch,
|
||||
bst_feature_t const n_columns,
|
||||
size_t const nthreads) {
|
||||
auto page = batch.GetView();
|
||||
std::vector<std::vector<bst_row_t>> column_sizes(nthreads);
|
||||
for (auto &column : column_sizes) {
|
||||
column.resize(n_columns, 0);
|
||||
}
|
||||
|
||||
ParallelFor(page.Size(), nthreads, [&](size_t i) {
|
||||
auto &local_column_sizes = column_sizes.at(omp_get_thread_num());
|
||||
auto row = page[i];
|
||||
auto const *p_row = row.data();
|
||||
for (size_t j = 0; j < row.size(); ++j) {
|
||||
local_column_sizes.at(p_row[j].index)++;
|
||||
}
|
||||
});
|
||||
std::vector<bst_row_t> entries_per_columns(n_columns, 0);
|
||||
ParallelFor(n_columns, nthreads, [&](size_t i) {
|
||||
for (auto const &thread : column_sizes) {
|
||||
entries_per_columns[i] += thread[i];
|
||||
}
|
||||
});
|
||||
return entries_per_columns;
|
||||
}
|
||||
|
||||
std::vector<bst_feature_t> HostSketchContainer::LoadBalance(
|
||||
SparsePage const &batch, bst_feature_t n_columns, size_t const nthreads) {
|
||||
/* Some sparse datasets have their mass concentrating on small number of features. To
|
||||
* avoid wating for a few threads running forever, we here distirbute different number
|
||||
* of columns to different threads according to number of entries.
|
||||
*/
|
||||
auto page = batch.GetView();
|
||||
size_t const total_entries = page.data.size();
|
||||
size_t const entries_per_thread = common::DivRoundUp(total_entries, nthreads);
|
||||
|
||||
std::vector<bst_feature_t> cols_ptr(nthreads+1, 0);
|
||||
std::vector<std::vector<bst_row_t>> column_sizes(nthreads);
|
||||
for (auto& column : column_sizes) {
|
||||
column.resize(n_columns, 0);
|
||||
}
|
||||
std::vector<bst_row_t> entries_per_columns =
|
||||
CalcColumnSize(batch, n_columns, nthreads);
|
||||
std::vector<bst_feature_t> cols_ptr(nthreads + 1, 0);
|
||||
size_t count {0};
|
||||
size_t current_thread {1};
|
||||
|
||||
for (auto col : columns_size) {
|
||||
cols_ptr[current_thread]++; // add one column to thread
|
||||
for (auto col : entries_per_columns) {
|
||||
cols_ptr.at(current_thread)++; // add one column to thread
|
||||
count += col;
|
||||
if (count > entries_per_thread + 1) {
|
||||
CHECK_LE(count, total_entries);
|
||||
if (count > entries_per_thread) {
|
||||
current_thread++;
|
||||
count = 0;
|
||||
cols_ptr[current_thread] = cols_ptr[current_thread-1];
|
||||
cols_ptr.at(current_thread) = cols_ptr[current_thread-1];
|
||||
}
|
||||
}
|
||||
// Idle threads.
|
||||
for (; current_thread < cols_ptr.size() - 1; ++current_thread) {
|
||||
cols_ptr[current_thread+1] = cols_ptr[current_thread];
|
||||
}
|
||||
|
||||
return cols_ptr;
|
||||
}
|
||||
|
||||
@@ -67,11 +100,10 @@ void HostSketchContainer::PushRowPage(SparsePage const &page,
|
||||
// Use group index for weights?
|
||||
auto batch = page.GetView();
|
||||
dmlc::OMPException exec;
|
||||
// Parallel over columns. Asumming the data is dense, each thread owns a set of
|
||||
// consecutive columns.
|
||||
// Parallel over columns. Each thread owns a set of consecutive columns.
|
||||
auto const ncol = static_cast<uint32_t>(info.num_col_);
|
||||
auto const is_dense = info.num_nonzero_ == info.num_col_ * info.num_row_;
|
||||
auto thread_columns_ptr = LoadBalance(page, columns_size_, nthread);
|
||||
auto thread_columns_ptr = LoadBalance(page, info.num_col_, nthread);
|
||||
|
||||
#pragma omp parallel num_threads(nthread)
|
||||
{
|
||||
@@ -112,6 +144,132 @@ void HostSketchContainer::PushRowPage(SparsePage const &page,
|
||||
monitor_.Stop(__func__);
|
||||
}
|
||||
|
||||
void HostSketchContainer::GatherSketchInfo(
|
||||
std::vector<WQSketch::SummaryContainer> const &reduced,
|
||||
std::vector<size_t> *p_worker_segments,
|
||||
std::vector<bst_row_t> *p_sketches_scan,
|
||||
std::vector<WQSketch::Entry> *p_global_sketches) {
|
||||
auto& worker_segments = *p_worker_segments;
|
||||
worker_segments.resize(1, 0);
|
||||
auto world = rabit::GetWorldSize();
|
||||
auto rank = rabit::GetRank();
|
||||
auto n_columns = sketches_.size();
|
||||
|
||||
std::vector<bst_row_t> sketch_size;
|
||||
for (auto const& sketch : reduced) {
|
||||
sketch_size.push_back(sketch.size);
|
||||
}
|
||||
std::vector<bst_row_t>& sketches_scan = *p_sketches_scan;
|
||||
sketches_scan.resize((n_columns + 1) * world, 0);
|
||||
size_t beg_scan = rank * (n_columns + 1);
|
||||
std::partial_sum(sketch_size.cbegin(), sketch_size.cend(),
|
||||
sketches_scan.begin() + beg_scan + 1);
|
||||
// Gather all column pointers
|
||||
rabit::Allreduce<rabit::op::Sum>(sketches_scan.data(), sketches_scan.size());
|
||||
|
||||
for (int32_t i = 0; i < world; ++i) {
|
||||
size_t back = (i + 1) * (n_columns + 1) - 1;
|
||||
auto n_entries = sketches_scan.at(back);
|
||||
worker_segments.push_back(n_entries);
|
||||
}
|
||||
// Offset of sketch from each worker.
|
||||
std::partial_sum(worker_segments.begin(), worker_segments.end(),
|
||||
worker_segments.begin());
|
||||
CHECK_GE(worker_segments.size(), 1);
|
||||
auto total = worker_segments.back();
|
||||
|
||||
auto& global_sketches = *p_global_sketches;
|
||||
global_sketches.resize(total, WQSketch::Entry{0, 0, 0, 0});
|
||||
auto worker_sketch = Span<WQSketch::Entry>{global_sketches}.subspan(
|
||||
worker_segments[rank], worker_segments[rank + 1] - worker_segments[rank]);
|
||||
size_t cursor = 0;
|
||||
for (auto const &sketch : reduced) {
|
||||
std::copy(sketch.data, sketch.data + sketch.size,
|
||||
worker_sketch.begin() + cursor);
|
||||
cursor += sketch.size;
|
||||
}
|
||||
|
||||
static_assert(sizeof(WQSketch::Entry) / 4 == sizeof(float), "");
|
||||
rabit::Allreduce<rabit::op::Sum>(
|
||||
reinterpret_cast<float *>(global_sketches.data()),
|
||||
global_sketches.size() * sizeof(WQSketch::Entry) / sizeof(float));
|
||||
}
|
||||
|
||||
void HostSketchContainer::AllReduce(
|
||||
std::vector<WQSketch::SummaryContainer> *p_reduced,
|
||||
std::vector<int32_t>* p_num_cuts) {
|
||||
monitor_.Start(__func__);
|
||||
auto& num_cuts = *p_num_cuts;
|
||||
CHECK_EQ(num_cuts.size(), 0);
|
||||
auto &reduced = *p_reduced;
|
||||
reduced.resize(sketches_.size());
|
||||
|
||||
size_t n_columns = sketches_.size();
|
||||
rabit::Allreduce<rabit::op::Max>(&n_columns, 1);
|
||||
CHECK_EQ(n_columns, sketches_.size()) << "Number of columns differs across workers";
|
||||
|
||||
// Prune the intermediate num cuts for synchronization.
|
||||
std::vector<bst_row_t> global_column_size(columns_size_);
|
||||
rabit::Allreduce<rabit::op::Sum>(global_column_size.data(), global_column_size.size());
|
||||
|
||||
size_t nbytes = 0;
|
||||
for (size_t i = 0; i < sketches_.size(); ++i) {
|
||||
int32_t intermediate_num_cuts = static_cast<int32_t>(std::min(
|
||||
global_column_size[i], static_cast<size_t>(max_bins_ * WQSketch::kFactor)));
|
||||
if (global_column_size[i] != 0) {
|
||||
WQSketch::SummaryContainer out;
|
||||
sketches_[i].GetSummary(&out);
|
||||
reduced[i].Reserve(intermediate_num_cuts);
|
||||
CHECK(reduced[i].data);
|
||||
reduced[i].SetPrune(out, intermediate_num_cuts);
|
||||
nbytes = std::max(
|
||||
WQSketch::SummaryContainer::CalcMemCost(intermediate_num_cuts),
|
||||
nbytes);
|
||||
}
|
||||
|
||||
num_cuts.push_back(intermediate_num_cuts);
|
||||
}
|
||||
auto world = rabit::GetWorldSize();
|
||||
if (world == 1) {
|
||||
return;
|
||||
}
|
||||
|
||||
std::vector<size_t> worker_segments(1, 0); // CSC pointer to sketches.
|
||||
std::vector<bst_row_t> sketches_scan((n_columns + 1) * world, 0);
|
||||
|
||||
std::vector<WQSketch::Entry> global_sketches;
|
||||
this->GatherSketchInfo(reduced, &worker_segments, &sketches_scan,
|
||||
&global_sketches);
|
||||
|
||||
std::vector<WQSketch::SummaryContainer> final_sketches(n_columns);
|
||||
ParallelFor(n_columns, omp_get_max_threads(), [&](size_t fidx) {
|
||||
int32_t intermediate_num_cuts = num_cuts[fidx];
|
||||
auto nbytes =
|
||||
WQSketch::SummaryContainer::CalcMemCost(intermediate_num_cuts);
|
||||
|
||||
for (int32_t i = 1; i < world + 1; ++i) {
|
||||
auto size = worker_segments.at(i) - worker_segments[i - 1];
|
||||
auto worker_sketches = Span<WQSketch::Entry>{global_sketches}.subspan(
|
||||
worker_segments[i - 1], size);
|
||||
auto worker_scan =
|
||||
Span<bst_row_t>(sketches_scan)
|
||||
.subspan((i - 1) * (n_columns + 1), (n_columns + 1));
|
||||
|
||||
auto worker_feature = worker_sketches.subspan(
|
||||
worker_scan[fidx], worker_scan[fidx + 1] - worker_scan[fidx]);
|
||||
CHECK(worker_feature.data());
|
||||
WQSummary<float, float> summary(worker_feature.data(),
|
||||
worker_feature.size());
|
||||
auto &out = final_sketches.at(fidx);
|
||||
out.Reduce(summary, nbytes);
|
||||
}
|
||||
|
||||
reduced.at(fidx).Reserve(intermediate_num_cuts);
|
||||
reduced.at(fidx).SetPrune(final_sketches.at(fidx), intermediate_num_cuts);
|
||||
});
|
||||
monitor_.Stop(__func__);
|
||||
}
|
||||
|
||||
void AddCutPoint(WQuantileSketch<float, float>::SummaryContainer const &summary,
|
||||
int max_bin, HistogramCuts *cuts) {
|
||||
size_t required_cuts = std::min(summary.size, static_cast<size_t>(max_bin));
|
||||
@@ -126,44 +284,18 @@ void AddCutPoint(WQuantileSketch<float, float>::SummaryContainer const &summary,
|
||||
|
||||
void HostSketchContainer::MakeCuts(HistogramCuts* cuts) {
|
||||
monitor_.Start(__func__);
|
||||
rabit::Allreduce<rabit::op::Sum>(columns_size_.data(), columns_size_.size());
|
||||
std::vector<WQSketch::SummaryContainer> reduced(sketches_.size());
|
||||
std::vector<WQSketch::SummaryContainer> reduced;
|
||||
std::vector<int32_t> num_cuts;
|
||||
size_t nbytes = 0;
|
||||
for (size_t i = 0; i < sketches_.size(); ++i) {
|
||||
int32_t intermediate_num_cuts = static_cast<int32_t>(std::min(
|
||||
columns_size_[i], static_cast<size_t>(max_bins_ * WQSketch::kFactor)));
|
||||
if (columns_size_[i] != 0) {
|
||||
WQSketch::SummaryContainer out;
|
||||
sketches_[i].GetSummary(&out);
|
||||
reduced[i].Reserve(intermediate_num_cuts);
|
||||
CHECK(reduced[i].data);
|
||||
reduced[i].SetPrune(out, intermediate_num_cuts);
|
||||
}
|
||||
num_cuts.push_back(intermediate_num_cuts);
|
||||
nbytes = std::max(
|
||||
WQSketch::SummaryContainer::CalcMemCost(intermediate_num_cuts), nbytes);
|
||||
}
|
||||
|
||||
if (rabit::IsDistributed()) {
|
||||
// FIXME(trivialfis): This call will allocate nbytes * num_columns on rabit, which
|
||||
// may generate oom error when data is sparse. To fix it, we need to:
|
||||
// - gather the column offsets over all workers.
|
||||
// - run rabit::allgather on sketch data to collect all data.
|
||||
// - merge all gathered sketches based on worker offsets and column offsets of data
|
||||
// from each worker.
|
||||
// See GPU implementation for details.
|
||||
rabit::SerializeReducer<WQSketch::SummaryContainer> sreducer;
|
||||
sreducer.Allreduce(dmlc::BeginPtr(reduced), nbytes, reduced.size());
|
||||
}
|
||||
this->AllReduce(&reduced, &num_cuts);
|
||||
|
||||
cuts->min_vals_.HostVector().resize(sketches_.size(), 0.0f);
|
||||
|
||||
for (size_t fid = 0; fid < reduced.size(); ++fid) {
|
||||
WQSketch::SummaryContainer a;
|
||||
size_t max_num_bins = std::min(num_cuts[fid], max_bins_);
|
||||
a.Reserve(max_num_bins + 1);
|
||||
CHECK(a.data);
|
||||
if (columns_size_[fid] != 0) {
|
||||
if (num_cuts[fid] != 0) {
|
||||
a.SetPrune(reduced[fid], max_num_bins + 1);
|
||||
CHECK(a.data && reduced[fid].data);
|
||||
const bst_float mval = a.data[0].value;
|
||||
@@ -173,6 +305,7 @@ void HostSketchContainer::MakeCuts(HistogramCuts* cuts) {
|
||||
const float mval = 1e-5f;
|
||||
cuts->min_vals_.HostVector()[fid] = mval;
|
||||
}
|
||||
|
||||
AddCutPoint(a, max_num_bins, cuts);
|
||||
// push a value that is greater than anything
|
||||
const bst_float cpt
|
||||
|
||||
@@ -166,6 +166,16 @@ struct WQSummary {
|
||||
* \param src source sketch
|
||||
*/
|
||||
inline void CopyFrom(const WQSummary &src) {
|
||||
if (!src.data) {
|
||||
CHECK_EQ(src.size, 0);
|
||||
size = 0;
|
||||
return;
|
||||
}
|
||||
if (!data) {
|
||||
CHECK_EQ(this->size, 0);
|
||||
CHECK_EQ(src.size, 0);
|
||||
return;
|
||||
}
|
||||
size = src.size;
|
||||
std::memcpy(data, src.data, sizeof(Entry) * size);
|
||||
}
|
||||
@@ -721,6 +731,14 @@ class HostSketchContainer {
|
||||
return use_group_ind;
|
||||
}
|
||||
|
||||
static std::vector<bst_row_t> CalcColumnSize(SparsePage const &page,
|
||||
bst_feature_t const n_columns,
|
||||
size_t const nthreads);
|
||||
|
||||
static std::vector<bst_feature_t> LoadBalance(SparsePage const &page,
|
||||
bst_feature_t n_columns,
|
||||
size_t const nthreads);
|
||||
|
||||
static uint32_t SearchGroupIndFromRow(std::vector<bst_uint> const &group_ptr,
|
||||
size_t const base_rowid) {
|
||||
CHECK_LT(base_rowid, group_ptr.back())
|
||||
@@ -730,6 +748,14 @@ class HostSketchContainer {
|
||||
group_ptr.cbegin() - 1;
|
||||
return group_ind;
|
||||
}
|
||||
// Gather sketches from all workers.
|
||||
void GatherSketchInfo(std::vector<WQSketch::SummaryContainer> const &reduced,
|
||||
std::vector<bst_row_t> *p_worker_segments,
|
||||
std::vector<bst_row_t> *p_sketches_scan,
|
||||
std::vector<WQSketch::Entry> *p_global_sketches);
|
||||
// Merge sketches from all workers.
|
||||
void AllReduce(std::vector<WQSketch::SummaryContainer> *p_reduced,
|
||||
std::vector<int32_t>* p_num_cuts);
|
||||
|
||||
/* \brief Push a CSR matrix. */
|
||||
void PushRowPage(SparsePage const& page, MetaInfo const& info);
|
||||
|
||||
Reference in New Issue
Block a user