Refactor for GHistIndex. (#7923)
* Pass sparse page as adapter, which prepares for quantile dmatrix. * Remove old external memory code like `rbegin` and extra `Init` function. * Simplify type dispatch.
This commit is contained in:
@@ -17,85 +17,15 @@ namespace xgboost {
|
||||
|
||||
GHistIndexMatrix::GHistIndexMatrix() : columns_{std::make_unique<common::ColumnMatrix>()} {}
|
||||
|
||||
GHistIndexMatrix::GHistIndexMatrix(DMatrix *x, int32_t max_bin, double sparse_thresh,
|
||||
bool sorted_sketch, int32_t n_threads,
|
||||
GHistIndexMatrix::GHistIndexMatrix(DMatrix *p_fmat, bst_bin_t max_bins_per_feat,
|
||||
double sparse_thresh, bool sorted_sketch, int32_t n_threads,
|
||||
common::Span<float> hess) {
|
||||
this->Init(x, max_bin, sparse_thresh, sorted_sketch, n_threads, hess);
|
||||
}
|
||||
|
||||
GHistIndexMatrix::~GHistIndexMatrix() = default;
|
||||
|
||||
void GHistIndexMatrix::PushBatch(SparsePage const &batch,
|
||||
common::Span<FeatureType const> ft,
|
||||
size_t rbegin, size_t prev_sum, uint32_t nbins,
|
||||
int32_t n_threads) {
|
||||
auto page = batch.GetView();
|
||||
auto it = common::MakeIndexTransformIter([&](size_t ridx) { return page[ridx].size(); });
|
||||
common::PartialSum(n_threads, it, it + page.Size(), prev_sum, row_ptr.begin() + rbegin);
|
||||
// The number of threads is pegged to the batch size. If the OMP block is parallelized
|
||||
// on anything other than the batch/block size, it should be reassigned
|
||||
const size_t batch_threads =
|
||||
std::max(static_cast<size_t>(1), std::min(batch.Size(), static_cast<size_t>(n_threads)));
|
||||
|
||||
const size_t n_index = row_ptr[rbegin + batch.Size()]; // number of entries in this page
|
||||
ResizeIndex(n_index, isDense_);
|
||||
|
||||
CHECK_GT(cut.Values().size(), 0U);
|
||||
|
||||
if (isDense_) {
|
||||
index.SetBinOffset(cut.Ptrs());
|
||||
}
|
||||
uint32_t const *offsets = index.Offset();
|
||||
|
||||
if (isDense_) {
|
||||
// Inside the lambda functions, bin_idx is the index for cut value across all
|
||||
// features. By subtracting it with starting pointer of each feature, we can reduce
|
||||
// it to smaller value and compress it to smaller types.
|
||||
common::BinTypeSize curent_bin_size = index.GetBinTypeSize();
|
||||
if (curent_bin_size == common::kUint8BinsTypeSize) {
|
||||
common::Span<uint8_t> index_data_span = {index.data<uint8_t>(), n_index};
|
||||
SetIndexData(index_data_span, ft, batch_threads, batch, rbegin, nbins,
|
||||
[offsets](auto bin_idx, auto fidx) {
|
||||
return static_cast<uint8_t>(bin_idx - offsets[fidx]);
|
||||
});
|
||||
} else if (curent_bin_size == common::kUint16BinsTypeSize) {
|
||||
common::Span<uint16_t> index_data_span = {index.data<uint16_t>(), n_index};
|
||||
SetIndexData(index_data_span, ft, batch_threads, batch, rbegin, nbins,
|
||||
[offsets](auto bin_idx, auto fidx) {
|
||||
return static_cast<uint16_t>(bin_idx - offsets[fidx]);
|
||||
});
|
||||
} else {
|
||||
CHECK_EQ(curent_bin_size, common::kUint32BinsTypeSize);
|
||||
common::Span<uint32_t> index_data_span = {index.data<uint32_t>(), n_index};
|
||||
SetIndexData(index_data_span, ft, batch_threads, batch, rbegin, nbins,
|
||||
[offsets](auto bin_idx, auto fidx) {
|
||||
return static_cast<uint32_t>(bin_idx - offsets[fidx]);
|
||||
});
|
||||
}
|
||||
} else {
|
||||
/* For sparse DMatrix we have to store index of feature for each bin
|
||||
in index field to chose right offset. So offset is nullptr and index is
|
||||
not reduced */
|
||||
common::Span<uint32_t> index_data_span = {index.data<uint32_t>(), n_index};
|
||||
SetIndexData(index_data_span, ft, batch_threads, batch, rbegin, nbins,
|
||||
[](auto idx, auto) { return idx; });
|
||||
}
|
||||
|
||||
common::ParallelFor(nbins, n_threads, [&](bst_omp_uint idx) {
|
||||
for (int32_t tid = 0; tid < n_threads; ++tid) {
|
||||
hit_count[idx] += hit_count_tloc_[tid * nbins + idx];
|
||||
hit_count_tloc_[tid * nbins + idx] = 0; // reset for next batch
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
void GHistIndexMatrix::Init(DMatrix *p_fmat, int max_bins, double sparse_thresh, bool sorted_sketch,
|
||||
int32_t n_threads, common::Span<float> hess) {
|
||||
CHECK(p_fmat->SingleColBlock());
|
||||
// We use sorted sketching for approx tree method since it's more efficient in
|
||||
// computation time (but higher memory usage).
|
||||
cut = common::SketchOnDMatrix(p_fmat, max_bins, n_threads, sorted_sketch, hess);
|
||||
cut = common::SketchOnDMatrix(p_fmat, max_bins_per_feat, n_threads, sorted_sketch, hess);
|
||||
|
||||
max_num_bins = max_bins;
|
||||
max_num_bins = max_bins_per_feat;
|
||||
const uint32_t nbins = cut.Ptrs().back();
|
||||
hit_count.resize(nbins, 0);
|
||||
hit_count_tloc_.resize(n_threads * nbins, 0);
|
||||
@@ -108,16 +38,12 @@ void GHistIndexMatrix::Init(DMatrix *p_fmat, int max_bins, double sparse_thresh,
|
||||
row_ptr.resize(new_size);
|
||||
row_ptr[0] = 0;
|
||||
|
||||
size_t rbegin = 0;
|
||||
size_t prev_sum = 0;
|
||||
const bool isDense = p_fmat->IsDense();
|
||||
this->isDense_ = isDense;
|
||||
auto ft = p_fmat->Info().feature_types.ConstHostSpan();
|
||||
|
||||
for (const auto &batch : p_fmat->GetBatches<SparsePage>()) {
|
||||
this->PushBatch(batch, ft, rbegin, prev_sum, nbins, n_threads);
|
||||
prev_sum = row_ptr[rbegin + batch.Size()];
|
||||
rbegin += batch.Size();
|
||||
this->PushBatch(batch, ft, nbins, n_threads);
|
||||
}
|
||||
this->columns_ = std::make_unique<common::ColumnMatrix>();
|
||||
|
||||
@@ -131,6 +57,59 @@ void GHistIndexMatrix::Init(DMatrix *p_fmat, int max_bins, double sparse_thresh,
|
||||
}
|
||||
}
|
||||
|
||||
GHistIndexMatrix::~GHistIndexMatrix() = default;
|
||||
|
||||
void GHistIndexMatrix::PushBatch(SparsePage const &batch, common::Span<FeatureType const> ft,
|
||||
bst_bin_t n_total_bins, int32_t n_threads) {
|
||||
auto page = batch.GetView();
|
||||
auto it = common::MakeIndexTransformIter([&](size_t ridx) { return page[ridx].size(); });
|
||||
common::PartialSum(n_threads, it, it + page.Size(), static_cast<size_t>(0), row_ptr.begin());
|
||||
// The number of threads is pegged to the batch size. If the OMP block is parallelized
|
||||
// on anything other than the batch/block size, it should be reassigned
|
||||
const size_t batch_threads =
|
||||
std::max(static_cast<size_t>(1), std::min(batch.Size(), static_cast<size_t>(n_threads)));
|
||||
|
||||
const size_t n_index = row_ptr[batch.Size()]; // number of entries in this page
|
||||
ResizeIndex(n_index, isDense_);
|
||||
|
||||
CHECK_GT(cut.Values().size(), 0U);
|
||||
|
||||
if (isDense_) {
|
||||
index.SetBinOffset(cut.Ptrs());
|
||||
}
|
||||
uint32_t const *offsets = index.Offset();
|
||||
|
||||
auto n_bins_total = cut.TotalBins();
|
||||
auto is_valid = [](auto) { return true; }; // SparsePage always contains valid entries
|
||||
data::SparsePageAdapterBatch adapter_batch{page};
|
||||
if (isDense_) {
|
||||
// Inside the lambda functions, bin_idx is the index for cut value across all
|
||||
// features. By subtracting it with starting pointer of each feature, we can reduce
|
||||
// it to smaller value and compress it to smaller types.
|
||||
common::DispatchBinType(index.GetBinTypeSize(), [&](auto dtype) {
|
||||
using T = decltype(dtype);
|
||||
common::Span<T> index_data_span = {index.data<T>(), index.Size()};
|
||||
SetIndexData(
|
||||
index_data_span, ft, batch_threads, adapter_batch, is_valid, n_bins_total,
|
||||
[offsets](auto bin_idx, auto fidx) { return static_cast<T>(bin_idx - offsets[fidx]); });
|
||||
});
|
||||
} else {
|
||||
/* For sparse DMatrix we have to store index of feature for each bin
|
||||
in index field to chose right offset. So offset is nullptr and index is
|
||||
not reduced */
|
||||
common::Span<uint32_t> index_data_span = {index.data<uint32_t>(), n_index};
|
||||
SetIndexData(index_data_span, ft, batch_threads, adapter_batch, is_valid, n_bins_total,
|
||||
[](auto idx, auto) { return idx; });
|
||||
}
|
||||
|
||||
common::ParallelFor(n_total_bins, n_threads, [&](bst_omp_uint idx) {
|
||||
for (int32_t tid = 0; tid < n_threads; ++tid) {
|
||||
hit_count[idx] += hit_count_tloc_[tid * n_total_bins + idx];
|
||||
hit_count_tloc_[tid * n_total_bins + idx] = 0; // reset for next batch
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
void GHistIndexMatrix::Init(SparsePage const &batch, common::Span<FeatureType const> ft,
|
||||
common::HistogramCuts const &cuts, int32_t max_bins_per_feat,
|
||||
bool isDense, double sparse_thresh, int32_t n_threads) {
|
||||
@@ -148,10 +127,7 @@ void GHistIndexMatrix::Init(SparsePage const &batch, common::Span<FeatureType co
|
||||
hit_count.resize(nbins, 0);
|
||||
hit_count_tloc_.resize(n_threads * nbins, 0);
|
||||
|
||||
size_t rbegin = 0;
|
||||
size_t prev_sum = 0;
|
||||
|
||||
this->PushBatch(batch, ft, rbegin, prev_sum, nbins, n_threads);
|
||||
this->PushBatch(batch, ft, nbins, n_threads);
|
||||
this->columns_ = std::make_unique<common::ColumnMatrix>();
|
||||
if (!std::isnan(sparse_thresh)) {
|
||||
this->columns_->Init(batch, *this, sparse_thresh, n_threads);
|
||||
|
||||
Reference in New Issue
Block a user