Implement iterative DMatrix for CPU. (#8116)
This commit is contained in:
parent
546de5efd2
commit
2c70751d1e
@ -43,6 +43,7 @@
|
||||
#include "../src/data/gradient_index_format.cc"
|
||||
#include "../src/data/sparse_page_dmatrix.cc"
|
||||
#include "../src/data/proxy_dmatrix.cc"
|
||||
#include "../src/data/iterative_dmatrix.cc"
|
||||
|
||||
// prediction
|
||||
#include "../src/predictor/predictor.cc"
|
||||
|
||||
@ -559,6 +559,7 @@ class DMatrix {
|
||||
*
|
||||
* \param iter External data iterator
|
||||
* \param proxy A hanlde to ProxyDMatrix
|
||||
* \param ref Reference Quantile DMatrix.
|
||||
* \param reset Callback for reset
|
||||
* \param next Callback for next
|
||||
* \param missing Value that should be treated as missing.
|
||||
@ -567,13 +568,11 @@ class DMatrix {
|
||||
*
|
||||
* \return A created quantile based DMatrix.
|
||||
*/
|
||||
template <typename DataIterHandle, typename DMatrixHandle,
|
||||
typename DataIterResetCallback, typename XGDMatrixCallbackNext>
|
||||
static DMatrix *Create(DataIterHandle iter, DMatrixHandle proxy,
|
||||
DataIterResetCallback *reset,
|
||||
XGDMatrixCallbackNext *next, float missing,
|
||||
int nthread,
|
||||
int max_bin);
|
||||
template <typename DataIterHandle, typename DMatrixHandle, typename DataIterResetCallback,
|
||||
typename XGDMatrixCallbackNext>
|
||||
static DMatrix* Create(DataIterHandle iter, DMatrixHandle proxy, std::shared_ptr<DMatrix> ref,
|
||||
DataIterResetCallback* reset, XGDMatrixCallbackNext* next, float missing,
|
||||
int nthread, bst_bin_t max_bin);
|
||||
|
||||
/**
|
||||
* \brief Create an external memory DMatrix with callbacks.
|
||||
@ -613,6 +612,7 @@ class DMatrix {
|
||||
virtual BatchSet<GHistIndexMatrix> GetGradientIndex(const BatchParam& param) = 0;
|
||||
|
||||
virtual bool EllpackExists() const = 0;
|
||||
virtual bool GHistIndexExists() const = 0;
|
||||
virtual bool SparsePageExists() const = 0;
|
||||
};
|
||||
|
||||
@ -621,11 +621,16 @@ inline BatchSet<SparsePage> DMatrix::GetBatches() {
|
||||
return GetRowBatches();
|
||||
}
|
||||
|
||||
template<>
|
||||
template <>
|
||||
inline bool DMatrix::PageExists<EllpackPage>() const {
|
||||
return this->EllpackExists();
|
||||
}
|
||||
|
||||
template <>
|
||||
inline bool DMatrix::PageExists<GHistIndexMatrix>() const {
|
||||
return this->GHistIndexExists();
|
||||
}
|
||||
|
||||
template<>
|
||||
inline bool DMatrix::PageExists<SparsePage>() const {
|
||||
return this->SparsePageExists();
|
||||
|
||||
@ -275,13 +275,14 @@ XGB_DLL int XGDMatrixCreateFromCallback(DataIterHandle iter, DMatrixHandle proxy
|
||||
API_END();
|
||||
}
|
||||
|
||||
XGB_DLL int XGDeviceQuantileDMatrixCreateFromCallback(
|
||||
DataIterHandle iter, DMatrixHandle proxy, DataIterResetCallback *reset,
|
||||
XGDMatrixCallbackNext *next, float missing, int nthread,
|
||||
int max_bin, DMatrixHandle *out) {
|
||||
XGB_DLL int XGDeviceQuantileDMatrixCreateFromCallback(DataIterHandle iter, DMatrixHandle proxy,
|
||||
DataIterResetCallback *reset,
|
||||
XGDMatrixCallbackNext *next, float missing,
|
||||
int nthread, int max_bin,
|
||||
DMatrixHandle *out) {
|
||||
API_BEGIN();
|
||||
*out = new std::shared_ptr<xgboost::DMatrix>{
|
||||
xgboost::DMatrix::Create(iter, proxy, reset, next, missing, nthread, max_bin)};
|
||||
xgboost::DMatrix::Create(iter, proxy, nullptr, reset, next, missing, nthread, max_bin)};
|
||||
API_END();
|
||||
}
|
||||
|
||||
|
||||
@ -931,15 +931,13 @@ DMatrix* DMatrix::Load(const std::string& uri,
|
||||
}
|
||||
return dmat;
|
||||
}
|
||||
template <typename DataIterHandle, typename DMatrixHandle,
|
||||
typename DataIterResetCallback, typename XGDMatrixCallbackNext>
|
||||
DMatrix *DMatrix::Create(DataIterHandle iter, DMatrixHandle proxy,
|
||||
DataIterResetCallback *reset,
|
||||
XGDMatrixCallbackNext *next, float missing,
|
||||
int nthread,
|
||||
int max_bin) {
|
||||
return new data::IterativeDMatrix(iter, proxy, reset, next, missing,
|
||||
nthread, max_bin);
|
||||
|
||||
template <typename DataIterHandle, typename DMatrixHandle, typename DataIterResetCallback,
|
||||
typename XGDMatrixCallbackNext>
|
||||
DMatrix* DMatrix::Create(DataIterHandle iter, DMatrixHandle proxy, std::shared_ptr<DMatrix> ref,
|
||||
DataIterResetCallback* reset, XGDMatrixCallbackNext* next, float missing,
|
||||
int nthread, bst_bin_t max_bin) {
|
||||
return new data::IterativeDMatrix(iter, proxy, ref, reset, next, missing, nthread, max_bin);
|
||||
}
|
||||
|
||||
template <typename DataIterHandle, typename DMatrixHandle,
|
||||
@ -953,11 +951,12 @@ DMatrix *DMatrix::Create(DataIterHandle iter, DMatrixHandle proxy,
|
||||
cache);
|
||||
}
|
||||
|
||||
template DMatrix *DMatrix::Create<DataIterHandle, DMatrixHandle,
|
||||
DataIterResetCallback, XGDMatrixCallbackNext>(
|
||||
DataIterHandle iter, DMatrixHandle proxy, DataIterResetCallback *reset,
|
||||
XGDMatrixCallbackNext *next, float missing, int nthread,
|
||||
int max_bin);
|
||||
template DMatrix* DMatrix::Create<DataIterHandle, DMatrixHandle, DataIterResetCallback,
|
||||
XGDMatrixCallbackNext>(DataIterHandle iter, DMatrixHandle proxy,
|
||||
std::shared_ptr<DMatrix> ref,
|
||||
DataIterResetCallback* reset,
|
||||
XGDMatrixCallbackNext* next, float missing,
|
||||
int nthread, int max_bin);
|
||||
|
||||
template DMatrix *DMatrix::Create<DataIterHandle, DMatrixHandle,
|
||||
DataIterResetCallback, XGDMatrixCallbackNext>(
|
||||
|
||||
214
src/data/iterative_dmatrix.cc
Normal file
214
src/data/iterative_dmatrix.cc
Normal file
@ -0,0 +1,214 @@
|
||||
/*!
|
||||
* Copyright 2022 XGBoost contributors
|
||||
*/
|
||||
#include "iterative_dmatrix.h"
|
||||
|
||||
#include <rabit/rabit.h>
|
||||
|
||||
#include "../common/column_matrix.h"
|
||||
#include "../common/hist_util.h"
|
||||
#include "gradient_index.h"
|
||||
#include "proxy_dmatrix.h"
|
||||
#include "simple_batch_iterator.h"
|
||||
|
||||
namespace xgboost {
|
||||
namespace data {
|
||||
|
||||
void GetCutsFromRef(std::shared_ptr<DMatrix> ref_, bst_feature_t n_features, BatchParam p,
|
||||
common::HistogramCuts* p_cuts) {
|
||||
CHECK(ref_);
|
||||
CHECK(p_cuts);
|
||||
auto csr = [&]() {
|
||||
for (auto const& page : ref_->GetBatches<GHistIndexMatrix>(p)) {
|
||||
*p_cuts = page.cut;
|
||||
break;
|
||||
}
|
||||
};
|
||||
auto ellpack = [&]() {
|
||||
for (auto const& page : ref_->GetBatches<EllpackPage>(p)) {
|
||||
GetCutsFromEllpack(page, p_cuts);
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
||||
if (ref_->PageExists<GHistIndexMatrix>()) {
|
||||
csr();
|
||||
} else if (ref_->PageExists<EllpackPage>()) {
|
||||
ellpack();
|
||||
} else {
|
||||
if (p.gpu_id == Context::kCpuId) {
|
||||
csr();
|
||||
} else {
|
||||
ellpack();
|
||||
}
|
||||
}
|
||||
CHECK_EQ(ref_->Info().num_col_, n_features)
|
||||
<< "Invalid ref DMatrix, different number of features.";
|
||||
}
|
||||
|
||||
void IterativeDMatrix::InitFromCPU(DataIterHandle iter_handle, float missing,
|
||||
std::shared_ptr<DMatrix> ref) {
|
||||
DMatrixProxy* proxy = MakeProxy(proxy_);
|
||||
CHECK(proxy);
|
||||
|
||||
// The external iterator
|
||||
auto iter =
|
||||
DataIterProxy<DataIterResetCallback, XGDMatrixCallbackNext>{iter_handle, reset_, next_};
|
||||
common::HistogramCuts cuts;
|
||||
|
||||
auto num_rows = [&]() {
|
||||
return HostAdapterDispatch(proxy, [](auto const& value) { return value.Size(); });
|
||||
};
|
||||
auto num_cols = [&]() {
|
||||
return HostAdapterDispatch(proxy, [](auto const& value) { return value.NumCols(); });
|
||||
};
|
||||
|
||||
std::vector<size_t> column_sizes;
|
||||
auto const is_valid = data::IsValidFunctor{missing};
|
||||
auto nnz_cnt = [&]() {
|
||||
return HostAdapterDispatch(proxy, [&](auto const& value) {
|
||||
size_t n_threads = ctx_.Threads();
|
||||
size_t n_features = column_sizes.size();
|
||||
linalg::Tensor<size_t, 2> column_sizes_tloc({n_threads, n_features}, Context::kCpuId);
|
||||
auto view = column_sizes_tloc.HostView();
|
||||
common::ParallelFor(value.Size(), n_threads, common::Sched::Static(256), [&](auto i) {
|
||||
auto const& line = value.GetLine(i);
|
||||
for (size_t j = 0; j < line.Size(); ++j) {
|
||||
data::COOTuple const& elem = line.GetElement(j);
|
||||
if (is_valid(elem)) {
|
||||
view(omp_get_thread_num(), elem.column_idx)++;
|
||||
}
|
||||
}
|
||||
});
|
||||
auto ptr = column_sizes_tloc.Data()->HostPointer();
|
||||
auto result = std::accumulate(ptr, ptr + column_sizes_tloc.Size(), static_cast<size_t>(0));
|
||||
for (size_t tidx = 0; tidx < n_threads; ++tidx) {
|
||||
for (size_t fidx = 0; fidx < n_features; ++fidx) {
|
||||
column_sizes[fidx] += view(tidx, fidx);
|
||||
}
|
||||
}
|
||||
return result;
|
||||
});
|
||||
};
|
||||
|
||||
size_t n_features = 0;
|
||||
size_t n_batches = 0;
|
||||
size_t accumulated_rows{0};
|
||||
size_t nnz{0};
|
||||
|
||||
/**
|
||||
* CPU impl needs an additional loop for accumulating the column size.
|
||||
*/
|
||||
std::unique_ptr<common::HostSketchContainer> p_sketch;
|
||||
std::vector<size_t> batch_nnz;
|
||||
do {
|
||||
// We use do while here as the first batch is fetched in ctor
|
||||
if (n_features == 0) {
|
||||
n_features = num_cols();
|
||||
rabit::Allreduce<rabit::op::Max>(&n_features, 1);
|
||||
column_sizes.resize(n_features);
|
||||
info_.num_col_ = n_features;
|
||||
} else {
|
||||
CHECK_EQ(n_features, num_cols()) << "Inconsistent number of columns.";
|
||||
}
|
||||
|
||||
size_t batch_size = num_rows();
|
||||
batch_nnz.push_back(nnz_cnt());
|
||||
nnz += batch_nnz.back();
|
||||
accumulated_rows += batch_size;
|
||||
n_batches++;
|
||||
} while (iter.Next());
|
||||
iter.Reset();
|
||||
|
||||
// From here on Info() has the correct data shape
|
||||
Info().num_row_ = accumulated_rows;
|
||||
Info().num_nonzero_ = nnz;
|
||||
rabit::Allreduce<rabit::op::Max>(&info_.num_col_, 1);
|
||||
CHECK(std::none_of(column_sizes.cbegin(), column_sizes.cend(), [&](auto f) {
|
||||
return f > accumulated_rows;
|
||||
})) << "Something went wrong during iteration.";
|
||||
|
||||
/**
|
||||
* Generate quantiles
|
||||
*/
|
||||
accumulated_rows = 0;
|
||||
if (ref) {
|
||||
GetCutsFromRef(ref, Info().num_col_, batch_param_, &cuts);
|
||||
} else {
|
||||
size_t i = 0;
|
||||
while (iter.Next()) {
|
||||
if (!p_sketch) {
|
||||
p_sketch.reset(new common::HostSketchContainer{batch_param_.max_bin,
|
||||
proxy->Info().feature_types.ConstHostSpan(),
|
||||
column_sizes, false, ctx_.Threads()});
|
||||
}
|
||||
HostAdapterDispatch(proxy, [&](auto const& batch) {
|
||||
proxy->Info().num_nonzero_ = batch_nnz[i];
|
||||
// We don't need base row idx here as Info is from proxy and the number of rows in
|
||||
// it is consistent with data batch.
|
||||
p_sketch->PushAdapterBatch(batch, 0, proxy->Info(), missing);
|
||||
});
|
||||
accumulated_rows += num_rows();
|
||||
++i;
|
||||
}
|
||||
iter.Reset();
|
||||
CHECK_EQ(accumulated_rows, Info().num_row_);
|
||||
|
||||
CHECK(p_sketch);
|
||||
p_sketch->MakeCuts(&cuts);
|
||||
}
|
||||
|
||||
/**
|
||||
* Generate gradient index.
|
||||
*/
|
||||
this->ghist_ = std::make_unique<GHistIndexMatrix>(Info(), std::move(cuts), batch_param_.max_bin);
|
||||
size_t rbegin = 0;
|
||||
size_t prev_sum = 0;
|
||||
size_t i = 0;
|
||||
while (iter.Next()) {
|
||||
HostAdapterDispatch(proxy, [&](auto const& batch) {
|
||||
proxy->Info().num_nonzero_ = batch_nnz[i];
|
||||
this->ghist_->PushAdapterBatch(&ctx_, rbegin, prev_sum, batch, missing,
|
||||
proxy->Info().feature_types.ConstHostSpan(),
|
||||
batch_param_.sparse_thresh, Info().num_row_);
|
||||
});
|
||||
if (n_batches != 1) {
|
||||
this->info_.Extend(std::move(proxy->Info()), false, true);
|
||||
}
|
||||
size_t batch_size = num_rows();
|
||||
prev_sum = this->ghist_->row_ptr[rbegin + batch_size];
|
||||
rbegin += batch_size;
|
||||
++i;
|
||||
}
|
||||
iter.Reset();
|
||||
CHECK_EQ(rbegin, Info().num_row_);
|
||||
|
||||
/**
|
||||
* Generate column matrix
|
||||
*/
|
||||
accumulated_rows = 0;
|
||||
while (iter.Next()) {
|
||||
HostAdapterDispatch(proxy, [&](auto const& batch) {
|
||||
this->ghist_->PushAdapterBatchColumns(&ctx_, batch, missing, accumulated_rows);
|
||||
});
|
||||
accumulated_rows += num_rows();
|
||||
}
|
||||
iter.Reset();
|
||||
CHECK_EQ(accumulated_rows, Info().num_row_);
|
||||
|
||||
if (n_batches == 1) {
|
||||
this->info_ = std::move(proxy->Info());
|
||||
this->info_.num_nonzero_ = nnz;
|
||||
CHECK_EQ(proxy->Info().labels.Size(), 0);
|
||||
}
|
||||
}
|
||||
|
||||
BatchSet<GHistIndexMatrix> IterativeDMatrix::GetGradientIndex(BatchParam const& param) {
|
||||
CheckParam(param);
|
||||
CHECK(ghist_) << "Not initialized with CPU data";
|
||||
auto begin_iter =
|
||||
BatchIterator<GHistIndexMatrix>(new SimpleBatchIteratorImpl<GHistIndexMatrix>(ghist_));
|
||||
return BatchSet<GHistIndexMatrix>(begin_iter);
|
||||
}
|
||||
} // namespace data
|
||||
} // namespace xgboost
|
||||
@ -1,44 +1,43 @@
|
||||
/*!
|
||||
* Copyright 2020-2022 XGBoost contributors
|
||||
*/
|
||||
#include <algorithm>
|
||||
#include <memory>
|
||||
#include <type_traits>
|
||||
#include <algorithm>
|
||||
|
||||
#include "../common/hist_util.cuh"
|
||||
#include "simple_batch_iterator.h"
|
||||
#include "iterative_dmatrix.h"
|
||||
#include "sparse_page_source.h"
|
||||
#include "ellpack_page.cuh"
|
||||
#include "proxy_dmatrix.h"
|
||||
#include "proxy_dmatrix.cuh"
|
||||
#include "device_adapter.cuh"
|
||||
#include "ellpack_page.cuh"
|
||||
#include "iterative_dmatrix.h"
|
||||
#include "proxy_dmatrix.cuh"
|
||||
#include "proxy_dmatrix.h"
|
||||
#include "simple_batch_iterator.h"
|
||||
#include "sparse_page_source.h"
|
||||
|
||||
namespace xgboost {
|
||||
namespace data {
|
||||
void IterativeDMatrix::InitFromCUDA(DataIterHandle iter_handle, float missing) {
|
||||
void IterativeDMatrix::InitFromCUDA(DataIterHandle iter_handle, float missing,
|
||||
std::shared_ptr<DMatrix> ref) {
|
||||
// A handle passed to external iterator.
|
||||
DMatrixProxy* proxy = MakeProxy(proxy_);
|
||||
CHECK(proxy);
|
||||
|
||||
// The external iterator
|
||||
auto iter = DataIterProxy<DataIterResetCallback, XGDMatrixCallbackNext>{
|
||||
iter_handle, reset_, next_};
|
||||
auto iter =
|
||||
DataIterProxy<DataIterResetCallback, XGDMatrixCallbackNext>{iter_handle, reset_, next_};
|
||||
|
||||
dh::XGBCachingDeviceAllocator<char> alloc;
|
||||
|
||||
auto num_rows = [&]() {
|
||||
return Dispatch(proxy, [](auto const &value) { return value.NumRows(); });
|
||||
return Dispatch(proxy, [](auto const& value) { return value.NumRows(); });
|
||||
};
|
||||
auto num_cols = [&]() {
|
||||
return Dispatch(proxy, [](auto const &value) { return value.NumCols(); });
|
||||
return Dispatch(proxy, [](auto const& value) { return value.NumCols(); });
|
||||
};
|
||||
|
||||
size_t row_stride = 0;
|
||||
size_t nnz = 0;
|
||||
// Sketch for all batches.
|
||||
iter.Reset();
|
||||
|
||||
std::vector<common::SketchContainer> sketch_containers;
|
||||
size_t batches = 0;
|
||||
size_t accumulated_rows = 0;
|
||||
@ -52,69 +51,77 @@ void IterativeDMatrix::InitFromCUDA(DataIterHandle iter_handle, float missing) {
|
||||
return d;
|
||||
};
|
||||
|
||||
while (iter.Next()) {
|
||||
/**
|
||||
* Generate quantiles
|
||||
*/
|
||||
common::HistogramCuts cuts;
|
||||
do {
|
||||
// We use do while here as the first batch is fetched in ctor
|
||||
ctx_.gpu_id = proxy->DeviceIdx();
|
||||
CHECK_LT(ctx_.gpu_id, common::AllVisibleGPUs());
|
||||
dh::safe_cuda(cudaSetDevice(get_device()));
|
||||
if (cols == 0) {
|
||||
cols = num_cols();
|
||||
rabit::Allreduce<rabit::op::Max>(&cols, 1);
|
||||
this->info_.num_col_ = cols;
|
||||
} else {
|
||||
CHECK_EQ(cols, num_cols()) << "Inconsistent number of columns.";
|
||||
}
|
||||
sketch_containers.emplace_back(proxy->Info().feature_types,
|
||||
batch_param_.max_bin, cols, num_rows(), get_device());
|
||||
auto* p_sketch = &sketch_containers.back();
|
||||
proxy->Info().weights_.SetDevice(get_device());
|
||||
Dispatch(proxy, [&](auto const &value) {
|
||||
common::AdapterDeviceSketch(value, batch_param_.max_bin,
|
||||
proxy->Info(), missing, p_sketch);
|
||||
if (!ref) {
|
||||
sketch_containers.emplace_back(proxy->Info().feature_types, batch_param_.max_bin, cols,
|
||||
num_rows(), get_device());
|
||||
auto* p_sketch = &sketch_containers.back();
|
||||
proxy->Info().weights_.SetDevice(get_device());
|
||||
Dispatch(proxy, [&](auto const& value) {
|
||||
common::AdapterDeviceSketch(value, batch_param_.max_bin, proxy->Info(), missing, p_sketch);
|
||||
});
|
||||
}
|
||||
auto batch_rows = num_rows();
|
||||
accumulated_rows += batch_rows;
|
||||
dh::caching_device_vector<size_t> row_counts(batch_rows + 1, 0);
|
||||
common::Span<size_t> row_counts_span(row_counts.data().get(),
|
||||
row_counts.size());
|
||||
row_stride = std::max(row_stride, Dispatch(proxy, [=](auto const &value) {
|
||||
return GetRowCounts(value, row_counts_span,
|
||||
get_device(), missing);
|
||||
}));
|
||||
nnz += thrust::reduce(thrust::cuda::par(alloc), row_counts.begin(),
|
||||
row_counts.end());
|
||||
common::Span<size_t> row_counts_span(row_counts.data().get(), row_counts.size());
|
||||
row_stride = std::max(row_stride, Dispatch(proxy, [=](auto const& value) {
|
||||
return GetRowCounts(value, row_counts_span, get_device(), missing);
|
||||
}));
|
||||
nnz += thrust::reduce(thrust::cuda::par(alloc), row_counts.begin(), row_counts.end());
|
||||
batches++;
|
||||
}
|
||||
} while (iter.Next());
|
||||
iter.Reset();
|
||||
|
||||
dh::safe_cuda(cudaSetDevice(get_device()));
|
||||
HostDeviceVector<FeatureType> ft;
|
||||
common::SketchContainer final_sketch(
|
||||
sketch_containers.empty() ? ft : sketch_containers.front().FeatureTypes(),
|
||||
batch_param_.max_bin, cols, accumulated_rows, get_device());
|
||||
for (auto const& sketch : sketch_containers) {
|
||||
final_sketch.Merge(sketch.ColumnsPtr(), sketch.Data());
|
||||
final_sketch.FixError();
|
||||
if (!ref) {
|
||||
HostDeviceVector<FeatureType> ft;
|
||||
common::SketchContainer final_sketch(
|
||||
sketch_containers.empty() ? ft : sketch_containers.front().FeatureTypes(),
|
||||
batch_param_.max_bin, cols, accumulated_rows, get_device());
|
||||
for (auto const& sketch : sketch_containers) {
|
||||
final_sketch.Merge(sketch.ColumnsPtr(), sketch.Data());
|
||||
final_sketch.FixError();
|
||||
}
|
||||
sketch_containers.clear();
|
||||
sketch_containers.shrink_to_fit();
|
||||
|
||||
final_sketch.MakeCuts(&cuts);
|
||||
} else {
|
||||
GetCutsFromRef(ref, Info().num_col_, batch_param_, &cuts);
|
||||
}
|
||||
sketch_containers.clear();
|
||||
sketch_containers.shrink_to_fit();
|
||||
|
||||
common::HistogramCuts cuts;
|
||||
final_sketch.MakeCuts(&cuts);
|
||||
|
||||
this->info_.num_col_ = cols;
|
||||
this->info_.num_row_ = accumulated_rows;
|
||||
this->info_.num_nonzero_ = nnz;
|
||||
|
||||
auto init_page = [this, &proxy, &cuts, row_stride, accumulated_rows,
|
||||
get_device]() {
|
||||
if (!page_) {
|
||||
auto init_page = [this, &proxy, &cuts, row_stride, accumulated_rows, get_device]() {
|
||||
if (!ellpack_) {
|
||||
// Should be put inside the while loop to protect against empty batch. In
|
||||
// that case device id is invalid.
|
||||
page_.reset(new EllpackPage);
|
||||
*(page_->Impl()) = EllpackPageImpl(get_device(), cuts, this->IsDense(),
|
||||
row_stride, accumulated_rows);
|
||||
ellpack_.reset(new EllpackPage);
|
||||
*(ellpack_->Impl()) =
|
||||
EllpackPageImpl(get_device(), cuts, this->IsDense(), row_stride, accumulated_rows);
|
||||
}
|
||||
};
|
||||
|
||||
// Construct the final ellpack page.
|
||||
/**
|
||||
* Generate gradient index.
|
||||
*/
|
||||
size_t offset = 0;
|
||||
iter.Reset();
|
||||
size_t n_batches_for_verification = 0;
|
||||
@ -123,11 +130,10 @@ void IterativeDMatrix::InitFromCUDA(DataIterHandle iter_handle, float missing) {
|
||||
dh::safe_cuda(cudaSetDevice(get_device()));
|
||||
auto rows = num_rows();
|
||||
dh::caching_device_vector<size_t> row_counts(rows + 1, 0);
|
||||
common::Span<size_t> row_counts_span(row_counts.data().get(),
|
||||
row_counts.size());
|
||||
common::Span<size_t> row_counts_span(row_counts.data().get(), row_counts.size());
|
||||
Dispatch(proxy, [=](auto const& value) {
|
||||
return GetRowCounts(value, row_counts_span, get_device(), missing);
|
||||
});
|
||||
return GetRowCounts(value, row_counts_span, get_device(), missing);
|
||||
});
|
||||
auto is_dense = this->IsDense();
|
||||
|
||||
proxy->Info().feature_types.SetDevice(get_device());
|
||||
@ -136,7 +142,7 @@ void IterativeDMatrix::InitFromCUDA(DataIterHandle iter_handle, float missing) {
|
||||
return EllpackPageImpl(value, missing, get_device(), is_dense, row_counts_span,
|
||||
d_feature_types, row_stride, rows, cuts);
|
||||
});
|
||||
size_t num_elements = page_->Impl()->Copy(get_device(), &new_impl, offset);
|
||||
size_t num_elements = ellpack_->Impl()->Copy(get_device(), &new_impl, offset);
|
||||
offset += num_elements;
|
||||
|
||||
proxy->Info().num_row_ = num_rows();
|
||||
@ -160,15 +166,15 @@ void IterativeDMatrix::InitFromCUDA(DataIterHandle iter_handle, float missing) {
|
||||
rabit::Allreduce<rabit::op::Max>(&info_.num_col_, 1);
|
||||
}
|
||||
|
||||
BatchSet<EllpackPage> IterativeDMatrix::GetEllpackBatches(const BatchParam& param) {
|
||||
CHECK(page_);
|
||||
// FIXME(Jiamingy): https://github.com/dmlc/xgboost/issues/7976
|
||||
if (param.max_bin != batch_param_.max_bin) {
|
||||
LOG(WARNING) << "Inconsistent max_bin between Quantile DMatrix and Booster:" << param.max_bin
|
||||
<< " vs. " << batch_param_.max_bin;
|
||||
}
|
||||
auto begin_iter = BatchIterator<EllpackPage>(new SimpleBatchIteratorImpl<EllpackPage>(page_));
|
||||
BatchSet<EllpackPage> IterativeDMatrix::GetEllpackBatches(BatchParam const& param) {
|
||||
CheckParam(param);
|
||||
CHECK(ellpack_) << "Not initialized with GPU data";
|
||||
auto begin_iter = BatchIterator<EllpackPage>(new SimpleBatchIteratorImpl<EllpackPage>(ellpack_));
|
||||
return BatchSet<EllpackPage>(begin_iter);
|
||||
}
|
||||
|
||||
void GetCutsFromEllpack(EllpackPage const& page, common::HistogramCuts* cuts) {
|
||||
*cuts = page.Impl()->Cuts();
|
||||
}
|
||||
} // namespace data
|
||||
} // namespace xgboost
|
||||
|
||||
@ -5,45 +5,87 @@
|
||||
#ifndef XGBOOST_DATA_ITERATIVE_DMATRIX_H_
|
||||
#define XGBOOST_DATA_ITERATIVE_DMATRIX_H_
|
||||
|
||||
#include <vector>
|
||||
#include <memory>
|
||||
#include <string>
|
||||
#include <utility>
|
||||
#include <memory>
|
||||
#include <vector>
|
||||
|
||||
#include "xgboost/base.h"
|
||||
#include "xgboost/data.h"
|
||||
#include "xgboost/c_api.h"
|
||||
#include "proxy_dmatrix.h"
|
||||
#include "simple_batch_iterator.h"
|
||||
#include "xgboost/base.h"
|
||||
#include "xgboost/c_api.h"
|
||||
#include "xgboost/data.h"
|
||||
|
||||
namespace xgboost {
|
||||
namespace common {
|
||||
class HistogramCuts;
|
||||
}
|
||||
|
||||
namespace data {
|
||||
|
||||
class IterativeDMatrix : public DMatrix {
|
||||
MetaInfo info_;
|
||||
Context ctx_;
|
||||
BatchParam batch_param_;
|
||||
std::shared_ptr<EllpackPage> page_;
|
||||
std::shared_ptr<EllpackPage> ellpack_;
|
||||
std::shared_ptr<GHistIndexMatrix> ghist_;
|
||||
|
||||
DMatrixHandle proxy_;
|
||||
DataIterResetCallback *reset_;
|
||||
XGDMatrixCallbackNext *next_;
|
||||
|
||||
public:
|
||||
void InitFromCUDA(DataIterHandle iter, float missing);
|
||||
void CheckParam(BatchParam const ¶m) {
|
||||
// FIXME(Jiamingy): https://github.com/dmlc/xgboost/issues/7976
|
||||
if (param.max_bin != batch_param_.max_bin && param.max_bin != 0) {
|
||||
LOG(WARNING) << "Inconsistent max_bin between Quantile DMatrix and Booster:" << param.max_bin
|
||||
<< " vs. " << batch_param_.max_bin;
|
||||
}
|
||||
CHECK(!param.regen) << "Only `hist` and `gpu_hist` tree method can use `QuantileDMatrix`.";
|
||||
}
|
||||
|
||||
template <typename Page>
|
||||
static auto InvalidTreeMethod() {
|
||||
LOG(FATAL) << "Only `hist` and `gpu_hist` tree method can use `QuantileDMatrix`.";
|
||||
return BatchSet<Page>(BatchIterator<Page>(nullptr));
|
||||
}
|
||||
|
||||
public:
|
||||
explicit IterativeDMatrix(DataIterHandle iter, DMatrixHandle proxy, DataIterResetCallback *reset,
|
||||
XGDMatrixCallbackNext *next, float missing, int nthread, int max_bin)
|
||||
void InitFromCUDA(DataIterHandle iter, float missing, std::shared_ptr<DMatrix> ref);
|
||||
void InitFromCPU(DataIterHandle iter_handle, float missing, std::shared_ptr<DMatrix> ref);
|
||||
|
||||
public:
|
||||
explicit IterativeDMatrix(DataIterHandle iter_handle, DMatrixHandle proxy,
|
||||
std::shared_ptr<DMatrix> ref, DataIterResetCallback *reset,
|
||||
XGDMatrixCallbackNext *next, float missing, int nthread,
|
||||
bst_bin_t max_bin)
|
||||
: proxy_{proxy}, reset_{reset}, next_{next} {
|
||||
batch_param_ = BatchParam{MakeProxy(proxy_)->DeviceIdx(), max_bin};
|
||||
// fetch the first batch
|
||||
auto iter =
|
||||
DataIterProxy<DataIterResetCallback, XGDMatrixCallbackNext>{iter_handle, reset_, next_};
|
||||
iter.Reset();
|
||||
bool valid = iter.Next();
|
||||
CHECK(valid) << "Iterative DMatrix must have at least 1 batch.";
|
||||
|
||||
auto d = MakeProxy(proxy_)->DeviceIdx();
|
||||
if (batch_param_.gpu_id != Context::kCpuId) {
|
||||
CHECK_EQ(d, batch_param_.gpu_id) << "All batch should be on the same device.";
|
||||
}
|
||||
batch_param_ = BatchParam{d, max_bin};
|
||||
batch_param_.sparse_thresh = 0.2; // default from TrainParam
|
||||
|
||||
ctx_.UpdateAllowUnknown(Args{{"nthread", std::to_string(nthread)}});
|
||||
this->InitFromCUDA(iter, missing);
|
||||
if (d == Context::kCpuId) {
|
||||
this->InitFromCPU(iter_handle, missing, ref);
|
||||
} else {
|
||||
this->InitFromCUDA(iter_handle, missing, ref);
|
||||
}
|
||||
}
|
||||
~IterativeDMatrix() override = default;
|
||||
|
||||
bool EllpackExists() const override { return true; }
|
||||
bool EllpackExists() const override { return static_cast<bool>(ellpack_); }
|
||||
bool GHistIndexExists() const override { return static_cast<bool>(ghist_); }
|
||||
bool SparsePageExists() const override { return false; }
|
||||
|
||||
DMatrix *Slice(common::Span<int32_t const>) override {
|
||||
LOG(FATAL) << "Slicing DMatrix is not supported for Quantile DMatrix.";
|
||||
return nullptr;
|
||||
@ -52,20 +94,13 @@ class IterativeDMatrix : public DMatrix {
|
||||
LOG(FATAL) << "Not implemented.";
|
||||
return BatchSet<SparsePage>(BatchIterator<SparsePage>(nullptr));
|
||||
}
|
||||
BatchSet<CSCPage> GetColumnBatches() override {
|
||||
LOG(FATAL) << "Not implemented.";
|
||||
return BatchSet<CSCPage>(BatchIterator<CSCPage>(nullptr));
|
||||
}
|
||||
BatchSet<CSCPage> GetColumnBatches() override { return InvalidTreeMethod<CSCPage>(); }
|
||||
BatchSet<SortedCSCPage> GetSortedColumnBatches() override {
|
||||
LOG(FATAL) << "Not implemented.";
|
||||
return BatchSet<SortedCSCPage>(BatchIterator<SortedCSCPage>(nullptr));
|
||||
}
|
||||
BatchSet<GHistIndexMatrix> GetGradientIndex(const BatchParam&) override {
|
||||
LOG(FATAL) << "Not implemented.";
|
||||
return BatchSet<GHistIndexMatrix>(BatchIterator<GHistIndexMatrix>(nullptr));
|
||||
return InvalidTreeMethod<SortedCSCPage>();
|
||||
}
|
||||
BatchSet<GHistIndexMatrix> GetGradientIndex(BatchParam const ¶m) override;
|
||||
|
||||
BatchSet<EllpackPage> GetEllpackBatches(const BatchParam& param) override;
|
||||
BatchSet<EllpackPage> GetEllpackBatches(const BatchParam ¶m) override;
|
||||
|
||||
bool SingleColBlock() const override { return true; }
|
||||
|
||||
@ -75,20 +110,34 @@ class IterativeDMatrix : public DMatrix {
|
||||
Context const *Ctx() const override { return &ctx_; }
|
||||
};
|
||||
|
||||
/**
|
||||
* \brief Get quantile cuts from reference Quantile DMatrix.
|
||||
*/
|
||||
void GetCutsFromRef(std::shared_ptr<DMatrix> ref_, bst_feature_t n_features, BatchParam p,
|
||||
common::HistogramCuts *p_cuts);
|
||||
/**
|
||||
* \brief Get quantile cuts from ellpack page.
|
||||
*/
|
||||
void GetCutsFromEllpack(EllpackPage const &page, common::HistogramCuts *cuts);
|
||||
|
||||
#if !defined(XGBOOST_USE_CUDA)
|
||||
inline void IterativeDMatrix::InitFromCUDA(DataIterHandle iter, float missing) {
|
||||
inline void IterativeDMatrix::InitFromCUDA(DataIterHandle iter, float missing,
|
||||
std::shared_ptr<DMatrix> ref) {
|
||||
// silent the warning about unused variables.
|
||||
(void)(proxy_);
|
||||
(void)(reset_);
|
||||
(void)(next_);
|
||||
common::AssertGPUSupport();
|
||||
}
|
||||
inline BatchSet<EllpackPage> IterativeDMatrix::GetEllpackBatches(const BatchParam& param) {
|
||||
inline BatchSet<EllpackPage> IterativeDMatrix::GetEllpackBatches(const BatchParam ¶m) {
|
||||
common::AssertGPUSupport();
|
||||
auto begin_iter =
|
||||
BatchIterator<EllpackPage>(new SimpleBatchIteratorImpl<EllpackPage>(page_));
|
||||
auto begin_iter = BatchIterator<EllpackPage>(new SimpleBatchIteratorImpl<EllpackPage>(ellpack_));
|
||||
return BatchSet<EllpackPage>(BatchIterator<EllpackPage>(begin_iter));
|
||||
}
|
||||
|
||||
inline void GetCutsFromEllpack(EllpackPage const &, common::HistogramCuts *) {
|
||||
common::AssertGPUSupport();
|
||||
}
|
||||
#endif // !defined(XGBOOST_USE_CUDA)
|
||||
} // namespace data
|
||||
} // namespace xgboost
|
||||
|
||||
@ -8,22 +8,22 @@
|
||||
namespace xgboost {
|
||||
namespace data {
|
||||
void DMatrixProxy::SetArrayData(char const *c_interface) {
|
||||
std::shared_ptr<ArrayAdapter> adapter{
|
||||
new ArrayAdapter(StringView{c_interface})};
|
||||
std::shared_ptr<ArrayAdapter> adapter{new ArrayAdapter(StringView{c_interface})};
|
||||
this->batch_ = adapter;
|
||||
this->Info().num_col_ = adapter->NumColumns();
|
||||
this->Info().num_row_ = adapter->NumRows();
|
||||
this->ctx_.gpu_id = Context::kCpuId;
|
||||
}
|
||||
|
||||
void DMatrixProxy::SetCSRData(char const *c_indptr, char const *c_indices,
|
||||
char const *c_values, bst_feature_t n_features, bool on_host) {
|
||||
CHECK(on_host) << "Not implemented on device.";
|
||||
std::shared_ptr<CSRArrayAdapter> adapter{
|
||||
new CSRArrayAdapter(StringView{c_indptr}, StringView{c_indices},
|
||||
StringView{c_values}, n_features)};
|
||||
std::shared_ptr<CSRArrayAdapter> adapter{new CSRArrayAdapter(
|
||||
StringView{c_indptr}, StringView{c_indices}, StringView{c_values}, n_features)};
|
||||
this->batch_ = adapter;
|
||||
this->Info().num_col_ = adapter->NumColumns();
|
||||
this->Info().num_row_ = adapter->NumRows();
|
||||
this->ctx_.gpu_id = Context::kCpuId;
|
||||
}
|
||||
} // namespace data
|
||||
} // namespace xgboost
|
||||
|
||||
@ -16,6 +16,7 @@ void DMatrixProxy::FromCudaColumnar(StringView interface_str) {
|
||||
this->Info().num_row_ = adapter->NumRows();
|
||||
if (ctx_.gpu_id < 0) {
|
||||
CHECK_EQ(this->Info().num_row_, 0);
|
||||
ctx_.gpu_id = dh::CurrentDevice();
|
||||
}
|
||||
}
|
||||
|
||||
@ -27,6 +28,7 @@ void DMatrixProxy::FromCudaArray(StringView interface_str) {
|
||||
this->Info().num_row_ = adapter->NumRows();
|
||||
if (ctx_.gpu_id < 0) {
|
||||
CHECK_EQ(this->Info().num_row_, 0);
|
||||
ctx_.gpu_id = dh::CurrentDevice();
|
||||
}
|
||||
}
|
||||
} // namespace data
|
||||
|
||||
@ -65,9 +65,6 @@ class DMatrixProxy : public DMatrix {
|
||||
} else {
|
||||
this->FromCudaArray(interface_str);
|
||||
}
|
||||
if (this->info_.num_row_ == 0) {
|
||||
this->ctx_.gpu_id = Context::kCpuId;
|
||||
}
|
||||
#endif // defined(XGBOOST_USE_CUDA)
|
||||
}
|
||||
|
||||
@ -80,9 +77,11 @@ class DMatrixProxy : public DMatrix {
|
||||
MetaInfo const& Info() const override { return info_; }
|
||||
Context const* Ctx() const override { return &ctx_; }
|
||||
|
||||
bool SingleColBlock() const override { return true; }
|
||||
bool EllpackExists() const override { return true; }
|
||||
bool SingleColBlock() const override { return false; }
|
||||
bool EllpackExists() const override { return false; }
|
||||
bool GHistIndexExists() const override { return false; }
|
||||
bool SparsePageExists() const override { return false; }
|
||||
|
||||
DMatrix* Slice(common::Span<int32_t const> /*ridxs*/) override {
|
||||
LOG(FATAL) << "Slicing DMatrix is not supported for Proxy DMatrix.";
|
||||
return nullptr;
|
||||
|
||||
@ -55,12 +55,9 @@ class SimpleDMatrix : public DMatrix {
|
||||
std::shared_ptr<GHistIndexMatrix> gradient_index_{nullptr};
|
||||
BatchParam batch_param_;
|
||||
|
||||
bool EllpackExists() const override {
|
||||
return static_cast<bool>(ellpack_page_);
|
||||
}
|
||||
bool SparsePageExists() const override {
|
||||
return true;
|
||||
}
|
||||
bool EllpackExists() const override { return static_cast<bool>(ellpack_page_); }
|
||||
bool GHistIndexExists() const override { return static_cast<bool>(gradient_index_); }
|
||||
bool SparsePageExists() const override { return true; }
|
||||
|
||||
private:
|
||||
Context ctx_;
|
||||
|
||||
@ -120,15 +120,11 @@ class SparsePageDMatrix : public DMatrix {
|
||||
std::shared_ptr<EllpackPageSource> ellpack_page_source_;
|
||||
std::shared_ptr<CSCPageSource> column_source_;
|
||||
std::shared_ptr<SortedCSCPageSource> sorted_column_source_;
|
||||
std::shared_ptr<GHistIndexMatrix> ghist_index_page_; // hist
|
||||
std::shared_ptr<GradientIndexPageSource> ghist_index_source_;
|
||||
|
||||
bool EllpackExists() const override {
|
||||
return static_cast<bool>(ellpack_page_source_);
|
||||
}
|
||||
bool SparsePageExists() const override {
|
||||
return static_cast<bool>(sparse_page_source_);
|
||||
}
|
||||
bool EllpackExists() const override { return static_cast<bool>(ellpack_page_source_); }
|
||||
bool GHistIndexExists() const override { return static_cast<bool>(ghist_index_source_); }
|
||||
bool SparsePageExists() const override { return static_cast<bool>(sparse_page_source_); }
|
||||
};
|
||||
|
||||
inline std::string MakeId(std::string prefix, SparsePageDMatrix *ptr) {
|
||||
|
||||
@ -12,6 +12,7 @@
|
||||
#include "../common/math.h"
|
||||
#include "../common/threading_utils.h"
|
||||
#include "../data/adapter.h"
|
||||
#include "../data/gradient_index.h"
|
||||
#include "../data/proxy_dmatrix.h"
|
||||
#include "../gbm/gbtree_model.h"
|
||||
#include "predict_fn.h"
|
||||
@ -125,30 +126,71 @@ void FVecDrop(const size_t block_size, const size_t batch_offset, DataView* batc
|
||||
}
|
||||
}
|
||||
|
||||
template <size_t kUnrollLen = 8>
|
||||
namespace {
|
||||
static size_t constexpr kUnroll = 8;
|
||||
} // anonymous namespace
|
||||
|
||||
struct SparsePageView {
|
||||
bst_row_t base_rowid;
|
||||
HostSparsePageView view;
|
||||
static size_t constexpr kUnroll = kUnrollLen;
|
||||
|
||||
explicit SparsePageView(SparsePage const *p)
|
||||
: base_rowid{p->base_rowid} {
|
||||
view = p->GetView();
|
||||
}
|
||||
explicit SparsePageView(SparsePage const *p) : base_rowid{p->base_rowid} { view = p->GetView(); }
|
||||
SparsePage::Inst operator[](size_t i) { return view[i]; }
|
||||
size_t Size() const { return view.Size(); }
|
||||
};
|
||||
|
||||
template <typename Adapter, size_t kUnrollLen = 8>
|
||||
struct GHistIndexMatrixView {
|
||||
private:
|
||||
GHistIndexMatrix const &page_;
|
||||
uint64_t n_features_;
|
||||
common::Span<FeatureType const> ft_;
|
||||
common::Span<Entry> workspace_;
|
||||
std::vector<size_t> current_unroll_;
|
||||
|
||||
public:
|
||||
size_t base_rowid;
|
||||
|
||||
public:
|
||||
GHistIndexMatrixView(GHistIndexMatrix const &_page, uint64_t n_feat,
|
||||
common::Span<FeatureType const> ft, common::Span<Entry> workplace,
|
||||
int32_t n_threads)
|
||||
: page_{_page},
|
||||
n_features_{n_feat},
|
||||
ft_{ft},
|
||||
workspace_{workplace},
|
||||
current_unroll_(n_threads > 0 ? n_threads : 1, 0),
|
||||
base_rowid{_page.base_rowid} {}
|
||||
|
||||
SparsePage::Inst operator[](size_t r) {
|
||||
auto t = omp_get_thread_num();
|
||||
auto const beg = (n_features_ * kUnroll * t) + (current_unroll_[t] * n_features_);
|
||||
size_t non_missing{beg};
|
||||
|
||||
for (bst_feature_t c = 0; c < n_features_; ++c) {
|
||||
float f = page_.GetFvalue(r, c, common::IsCat(ft_, c));
|
||||
if (!common::CheckNAN(f)) {
|
||||
workspace_[non_missing] = Entry{c, f};
|
||||
++non_missing;
|
||||
}
|
||||
}
|
||||
|
||||
auto ret = workspace_.subspan(beg, non_missing - beg);
|
||||
current_unroll_[t]++;
|
||||
if (current_unroll_[t] == kUnroll) {
|
||||
current_unroll_[t] = 0;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
size_t Size() const { return page_.Size(); }
|
||||
};
|
||||
|
||||
template <typename Adapter>
|
||||
class AdapterView {
|
||||
Adapter* adapter_;
|
||||
float missing_;
|
||||
common::Span<Entry> workspace_;
|
||||
std::vector<size_t> current_unroll_;
|
||||
|
||||
public:
|
||||
static size_t constexpr kUnroll = kUnrollLen;
|
||||
|
||||
public:
|
||||
explicit AdapterView(Adapter *adapter, float missing, common::Span<Entry> workplace,
|
||||
int32_t n_threads)
|
||||
@ -251,33 +293,59 @@ class CPUPredictor : public Predictor {
|
||||
}
|
||||
}
|
||||
|
||||
void PredictGHistIndex(DMatrix *p_fmat, gbm::GBTreeModel const &model, int32_t tree_begin,
|
||||
int32_t tree_end, std::vector<bst_float> *out_preds) const {
|
||||
auto const n_threads = this->ctx_->Threads();
|
||||
|
||||
constexpr double kDensityThresh = .5;
|
||||
size_t total =
|
||||
std::max(p_fmat->Info().num_row_ * p_fmat->Info().num_col_, static_cast<uint64_t>(1));
|
||||
double density = static_cast<double>(p_fmat->Info().num_nonzero_) / static_cast<double>(total);
|
||||
bool blocked = density > kDensityThresh;
|
||||
|
||||
std::vector<RegTree::FVec> feat_vecs;
|
||||
InitThreadTemp(n_threads * (blocked ? kBlockOfRowsSize : 1), &feat_vecs);
|
||||
std::vector<Entry> workspace(p_fmat->Info().num_col_ * kUnroll * n_threads);
|
||||
auto ft = p_fmat->Info().feature_types.ConstHostVector();
|
||||
for (auto const &batch : p_fmat->GetBatches<GHistIndexMatrix>({})) {
|
||||
if (blocked) {
|
||||
PredictBatchByBlockOfRowsKernel<GHistIndexMatrixView, kBlockOfRowsSize>(
|
||||
GHistIndexMatrixView{batch, p_fmat->Info().num_col_, ft, workspace, n_threads},
|
||||
out_preds, model, tree_begin, tree_end, &feat_vecs, n_threads);
|
||||
} else {
|
||||
PredictBatchByBlockOfRowsKernel<GHistIndexMatrixView, 1>(
|
||||
GHistIndexMatrixView{batch, p_fmat->Info().num_col_, ft, workspace, n_threads},
|
||||
out_preds, model, tree_begin, tree_end, &feat_vecs, n_threads);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void PredictDMatrix(DMatrix *p_fmat, std::vector<bst_float> *out_preds,
|
||||
gbm::GBTreeModel const &model, int32_t tree_begin,
|
||||
int32_t tree_end) const {
|
||||
gbm::GBTreeModel const &model, int32_t tree_begin, int32_t tree_end) const {
|
||||
if (!p_fmat->PageExists<SparsePage>()) {
|
||||
this->PredictGHistIndex(p_fmat, model, tree_begin, tree_end, out_preds);
|
||||
return;
|
||||
}
|
||||
|
||||
auto const n_threads = this->ctx_->Threads();
|
||||
constexpr double kDensityThresh = .5;
|
||||
size_t total = std::max(p_fmat->Info().num_row_ * p_fmat->Info().num_col_,
|
||||
static_cast<uint64_t>(1));
|
||||
double density = static_cast<double>(p_fmat->Info().num_nonzero_) /
|
||||
static_cast<double>(total);
|
||||
size_t total =
|
||||
std::max(p_fmat->Info().num_row_ * p_fmat->Info().num_col_, static_cast<uint64_t>(1));
|
||||
double density = static_cast<double>(p_fmat->Info().num_nonzero_) / static_cast<double>(total);
|
||||
bool blocked = density > kDensityThresh;
|
||||
|
||||
std::vector<RegTree::FVec> feat_vecs;
|
||||
InitThreadTemp(n_threads * (blocked ? kBlockOfRowsSize : 1), &feat_vecs);
|
||||
for (auto const &batch : p_fmat->GetBatches<SparsePage>()) {
|
||||
CHECK_EQ(out_preds->size(),
|
||||
p_fmat->Info().num_row_ *
|
||||
model.learner_model_param->num_output_group);
|
||||
size_t constexpr kUnroll = 8;
|
||||
p_fmat->Info().num_row_ * model.learner_model_param->num_output_group);
|
||||
if (blocked) {
|
||||
PredictBatchByBlockOfRowsKernel<SparsePageView<kUnroll>, kBlockOfRowsSize>(
|
||||
SparsePageView<kUnroll>{&batch}, out_preds, model, tree_begin, tree_end, &feat_vecs,
|
||||
n_threads);
|
||||
PredictBatchByBlockOfRowsKernel<SparsePageView, kBlockOfRowsSize>(
|
||||
SparsePageView{&batch}, out_preds, model, tree_begin, tree_end, &feat_vecs, n_threads);
|
||||
|
||||
} else {
|
||||
PredictBatchByBlockOfRowsKernel<SparsePageView<kUnroll>, 1>(
|
||||
SparsePageView<kUnroll>{&batch}, out_preds, model, tree_begin, tree_end, &feat_vecs,
|
||||
n_threads);
|
||||
PredictBatchByBlockOfRowsKernel<SparsePageView, 1>(
|
||||
SparsePageView{&batch}, out_preds, model, tree_begin, tree_end, &feat_vecs, n_threads);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -316,7 +384,7 @@ class CPUPredictor : public Predictor {
|
||||
info.num_row_ = m->NumRows();
|
||||
this->InitOutPredictions(info, &(out_preds->predictions), model);
|
||||
}
|
||||
std::vector<Entry> workspace(m->NumColumns() * 8 * n_threads);
|
||||
std::vector<Entry> workspace(m->NumColumns() * kUnroll * n_threads);
|
||||
auto &predictions = out_preds->predictions.HostVector();
|
||||
std::vector<RegTree::FVec> thread_temp;
|
||||
InitThreadTemp(n_threads * kBlockSize, &thread_temp);
|
||||
|
||||
@ -149,10 +149,10 @@ struct SparsePageLoader {
|
||||
|
||||
struct EllpackLoader {
|
||||
EllpackDeviceAccessor const& matrix;
|
||||
XGBOOST_DEVICE EllpackLoader(EllpackDeviceAccessor const& m, bool,
|
||||
bst_feature_t, bst_row_t, size_t, float)
|
||||
XGBOOST_DEVICE EllpackLoader(EllpackDeviceAccessor const& m, bool, bst_feature_t, bst_row_t,
|
||||
size_t, float)
|
||||
: matrix{m} {}
|
||||
__device__ __forceinline__ float GetElement(size_t ridx, size_t fidx) const {
|
||||
__device__ __forceinline__ float GetElement(size_t ridx, size_t fidx) const {
|
||||
auto gidx = matrix.GetBinIndex(ridx, fidx);
|
||||
if (gidx == -1) {
|
||||
return nan("");
|
||||
|
||||
36
tests/cpp/data/test_iterative_dmatrix.cc
Normal file
36
tests/cpp/data/test_iterative_dmatrix.cc
Normal file
@ -0,0 +1,36 @@
|
||||
/*!
|
||||
* Copyright 2022 XGBoost contributors
|
||||
*/
|
||||
#include "test_iterative_dmatrix.h"
|
||||
|
||||
#include <gtest/gtest.h>
|
||||
|
||||
#include "../../../src/data/gradient_index.h"
|
||||
#include "../../../src/data/iterative_dmatrix.h"
|
||||
#include "../helpers.h"
|
||||
|
||||
namespace xgboost {
|
||||
namespace data {
|
||||
TEST(IterativeDMatrix, Ref) {
|
||||
TestRefDMatrix<GHistIndexMatrix, NumpyArrayIterForTest>(
|
||||
[&](GHistIndexMatrix const& page) { return page.cut; });
|
||||
}
|
||||
|
||||
TEST(IterativeDMatrix, IsDense) {
|
||||
int n_bins = 16;
|
||||
auto test = [n_bins](float sparsity) {
|
||||
NumpyArrayIterForTest iter(sparsity);
|
||||
IterativeDMatrix m(&iter, iter.Proxy(), nullptr, Reset, Next,
|
||||
std::numeric_limits<float>::quiet_NaN(), 0, n_bins);
|
||||
if (sparsity == 0.0) {
|
||||
ASSERT_TRUE(m.IsDense());
|
||||
} else {
|
||||
ASSERT_FALSE(m.IsDense());
|
||||
}
|
||||
};
|
||||
test(0.0);
|
||||
test(0.1);
|
||||
test(1.0);
|
||||
}
|
||||
} // namespace data
|
||||
} // namespace xgboost
|
||||
@ -3,19 +3,19 @@
|
||||
*/
|
||||
#include <gtest/gtest.h>
|
||||
|
||||
#include "../helpers.h"
|
||||
#include "../../../src/data/iterative_dmatrix.h"
|
||||
#include "../../../src/data/ellpack_page.cuh"
|
||||
#include "../../../src/data/device_adapter.cuh"
|
||||
#include "../../../src/data/ellpack_page.cuh"
|
||||
#include "../../../src/data/iterative_dmatrix.h"
|
||||
#include "../helpers.h"
|
||||
#include "test_iterative_dmatrix.h"
|
||||
|
||||
namespace xgboost {
|
||||
namespace data {
|
||||
|
||||
void TestEquivalent(float sparsity) {
|
||||
CudaArrayIterForTest iter{sparsity};
|
||||
IterativeDMatrix m(
|
||||
&iter, iter.Proxy(), Reset, Next, std::numeric_limits<float>::quiet_NaN(),
|
||||
0, 256);
|
||||
IterativeDMatrix m(&iter, iter.Proxy(), nullptr, Reset, Next,
|
||||
std::numeric_limits<float>::quiet_NaN(), 0, 256);
|
||||
size_t offset = 0;
|
||||
auto first = (*m.GetEllpackBatches({}).begin()).Impl();
|
||||
std::unique_ptr<EllpackPageImpl> page_concatenated {
|
||||
@ -88,9 +88,8 @@ TEST(IterativeDeviceDMatrix, Basic) {
|
||||
|
||||
TEST(IterativeDeviceDMatrix, RowMajor) {
|
||||
CudaArrayIterForTest iter(0.0f);
|
||||
IterativeDMatrix m(
|
||||
&iter, iter.Proxy(), Reset, Next, std::numeric_limits<float>::quiet_NaN(),
|
||||
0, 256);
|
||||
IterativeDMatrix m(&iter, iter.Proxy(), nullptr, Reset, Next,
|
||||
std::numeric_limits<float>::quiet_NaN(), 0, 256);
|
||||
size_t n_batches = 0;
|
||||
std::string interface_str = iter.AsArray();
|
||||
for (auto& ellpack : m.GetBatches<EllpackPage>({})) {
|
||||
@ -139,9 +138,8 @@ TEST(IterativeDeviceDMatrix, RowMajorMissing) {
|
||||
reinterpret_cast<float *>(get<Integer>(j_interface["data"][0])));
|
||||
thrust::copy(h_data.cbegin(), h_data.cend(), ptr);
|
||||
|
||||
IterativeDMatrix m(
|
||||
&iter, iter.Proxy(), Reset, Next, std::numeric_limits<float>::quiet_NaN(),
|
||||
0, 256);
|
||||
IterativeDMatrix m(&iter, iter.Proxy(), nullptr, Reset, Next,
|
||||
std::numeric_limits<float>::quiet_NaN(), 0, 256);
|
||||
auto &ellpack = *m.GetBatches<EllpackPage>({0, 256}).begin();
|
||||
auto impl = ellpack.Impl();
|
||||
common::CompressedIterator<uint32_t> iterator(
|
||||
@ -157,11 +155,10 @@ TEST(IterativeDeviceDMatrix, RowMajorMissing) {
|
||||
|
||||
TEST(IterativeDeviceDMatrix, IsDense) {
|
||||
int num_bins = 16;
|
||||
auto test = [num_bins] (float sparsity) {
|
||||
auto test = [num_bins](float sparsity) {
|
||||
CudaArrayIterForTest iter(sparsity);
|
||||
IterativeDMatrix m(
|
||||
&iter, iter.Proxy(), Reset, Next, std::numeric_limits<float>::quiet_NaN(),
|
||||
0, 256);
|
||||
IterativeDMatrix m(&iter, iter.Proxy(), nullptr, Reset, Next,
|
||||
std::numeric_limits<float>::quiet_NaN(), 0, num_bins);
|
||||
if (sparsity == 0.0) {
|
||||
ASSERT_TRUE(m.IsDense());
|
||||
} else {
|
||||
@ -170,6 +167,12 @@ TEST(IterativeDeviceDMatrix, IsDense) {
|
||||
};
|
||||
test(0.0);
|
||||
test(0.1);
|
||||
test(1.0);
|
||||
}
|
||||
|
||||
TEST(IterativeDeviceDMatrix, Ref) {
|
||||
TestRefDMatrix<EllpackPage, CudaArrayIterForTest>(
|
||||
[](EllpackPage const& page) { return page.Impl()->Cuts(); });
|
||||
}
|
||||
} // namespace data
|
||||
} // namespace xgboost
|
||||
|
||||
59
tests/cpp/data/test_iterative_dmatrix.h
Normal file
59
tests/cpp/data/test_iterative_dmatrix.h
Normal file
@ -0,0 +1,59 @@
|
||||
/*!
|
||||
* Copyright 2022 XGBoost contributors
|
||||
*/
|
||||
#pragma once
|
||||
#include <memory> // std::make_shared
|
||||
|
||||
#include "../../../src/data/iterative_dmatrix.h"
|
||||
#include "../helpers.h"
|
||||
|
||||
namespace xgboost {
|
||||
namespace data {
|
||||
template <typename Page, typename Iter, typename Cuts>
|
||||
void TestRefDMatrix(Cuts&& get_cuts) {
|
||||
int n_bins = 256;
|
||||
Iter iter(0.3, 2048);
|
||||
auto m = std::make_shared<IterativeDMatrix>(&iter, iter.Proxy(), nullptr, Reset, Next,
|
||||
std::numeric_limits<float>::quiet_NaN(), 0, n_bins);
|
||||
|
||||
Iter iter_1(0.8, 32, Iter::Cols(), 13);
|
||||
auto m_1 = std::make_shared<IterativeDMatrix>(&iter_1, iter_1.Proxy(), m, Reset, Next,
|
||||
std::numeric_limits<float>::quiet_NaN(), 0, n_bins);
|
||||
|
||||
for (auto const& page_0 : m->template GetBatches<Page>({})) {
|
||||
for (auto const& page_1 : m_1->template GetBatches<Page>({})) {
|
||||
auto const& cuts_0 = get_cuts(page_0);
|
||||
auto const& cuts_1 = get_cuts(page_1);
|
||||
ASSERT_EQ(cuts_0.Values(), cuts_1.Values());
|
||||
ASSERT_EQ(cuts_0.Ptrs(), cuts_1.Ptrs());
|
||||
ASSERT_EQ(cuts_0.MinValues(), cuts_1.MinValues());
|
||||
}
|
||||
}
|
||||
|
||||
m_1 = std::make_shared<IterativeDMatrix>(&iter_1, iter_1.Proxy(), nullptr, Reset, Next,
|
||||
std::numeric_limits<float>::quiet_NaN(), 0, n_bins);
|
||||
for (auto const& page_0 : m->template GetBatches<Page>({})) {
|
||||
for (auto const& page_1 : m_1->template GetBatches<Page>({})) {
|
||||
auto const& cuts_0 = get_cuts(page_0);
|
||||
auto const& cuts_1 = get_cuts(page_1);
|
||||
ASSERT_NE(cuts_0.Values(), cuts_1.Values());
|
||||
ASSERT_NE(cuts_0.Ptrs(), cuts_1.Ptrs());
|
||||
}
|
||||
}
|
||||
|
||||
// Use DMatrix as ref
|
||||
auto dm = RandomDataGenerator(2048, Iter::Cols(), 0.5).GenerateDMatrix(true);
|
||||
auto dqm = std::make_shared<IterativeDMatrix>(&iter_1, iter_1.Proxy(), dm, Reset, Next,
|
||||
std::numeric_limits<float>::quiet_NaN(), 0, n_bins);
|
||||
for (auto const& page_0 : dm->template GetBatches<Page>({})) {
|
||||
for (auto const& page_1 : dqm->template GetBatches<Page>({})) {
|
||||
auto const& cuts_0 = get_cuts(page_0);
|
||||
auto const& cuts_1 = get_cuts(page_1);
|
||||
ASSERT_EQ(cuts_0.Values(), cuts_1.Values());
|
||||
ASSERT_EQ(cuts_0.Ptrs(), cuts_1.Ptrs());
|
||||
ASSERT_EQ(cuts_0.MinValues(), cuts_1.MinValues());
|
||||
}
|
||||
}
|
||||
}
|
||||
} // namespace data
|
||||
} // namespace xgboost
|
||||
@ -384,7 +384,7 @@ RandomDataGenerator::GenerateDMatrix(bool with_label, bool float_label,
|
||||
std::shared_ptr<DMatrix> RandomDataGenerator::GenerateQuantileDMatrix() {
|
||||
NumpyArrayIterForTest iter{this->sparsity_, this->rows_, this->cols_, 1};
|
||||
auto m = std::make_shared<data::IterativeDMatrix>(
|
||||
&iter, iter.Proxy(), Reset, Next, std::numeric_limits<float>::quiet_NaN(), 0, bins_);
|
||||
&iter, iter.Proxy(), nullptr, Reset, Next, std::numeric_limits<float>::quiet_NaN(), 0, bins_);
|
||||
return m;
|
||||
}
|
||||
|
||||
@ -569,7 +569,7 @@ std::unique_ptr<GradientBooster> CreateTrainedGBM(
|
||||
auto& h_gpair = gpair.HostVector();
|
||||
h_gpair.resize(kRows);
|
||||
for (size_t i = 0; i < kRows; ++i) {
|
||||
h_gpair[i] = {static_cast<float>(i), 1};
|
||||
h_gpair[i] = GradientPair{static_cast<float>(i), 1};
|
||||
}
|
||||
|
||||
PredictionCacheEntry predts;
|
||||
|
||||
@ -27,7 +27,7 @@ int CudaArrayIterForTest::Next() {
|
||||
std::shared_ptr<DMatrix> RandomDataGenerator::GenerateDeviceDMatrix() {
|
||||
CudaArrayIterForTest iter{this->sparsity_, this->rows_, this->cols_, 1};
|
||||
auto m = std::make_shared<data::IterativeDMatrix>(
|
||||
&iter, iter.Proxy(), Reset, Next, std::numeric_limits<float>::quiet_NaN(), 0, bins_);
|
||||
&iter, iter.Proxy(), nullptr, Reset, Next, std::numeric_limits<float>::quiet_NaN(), 0, bins_);
|
||||
return m;
|
||||
}
|
||||
} // namespace xgboost
|
||||
|
||||
@ -245,6 +245,17 @@ void TestUpdatePredictionCache(bool use_subsampling) {
|
||||
}
|
||||
}
|
||||
|
||||
TEST(CPUPredictor, GHistIndex) {
|
||||
size_t constexpr kRows{128}, kCols{16}, kBins{64};
|
||||
auto p_hist = RandomDataGenerator{kRows, kCols, 0.0}.Bins(kBins).GenerateQuantileDMatrix();
|
||||
HostDeviceVector<float> storage(kRows * kCols);
|
||||
auto columnar = RandomDataGenerator{kRows, kCols, 0.0}.GenerateArrayInterface(&storage);
|
||||
auto adapter = data::ArrayAdapter(columnar.c_str());
|
||||
std::shared_ptr<DMatrix> p_full{
|
||||
DMatrix::Create(&adapter, std::numeric_limits<float>::quiet_NaN(), 1)};
|
||||
TestTrainingPrediction(kRows, kBins, "hist", p_full, p_hist);
|
||||
}
|
||||
|
||||
TEST(CPUPredictor, CategoricalPrediction) {
|
||||
TestCategoricalPrediction("cpu_predictor");
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user