diff --git a/include/xgboost/c_api.h b/include/xgboost/c_api.h index 5b335ede1..557e000c6 100644 --- a/include/xgboost/c_api.h +++ b/include/xgboost/c_api.h @@ -26,39 +26,10 @@ // manually define unsigned long typedef uint64_t bst_ulong; // NOLINT(*) - /*! \brief handle to DMatrix */ typedef void *DMatrixHandle; // NOLINT(*) /*! \brief handle to Booster */ typedef void *BoosterHandle; // NOLINT(*) -/*! \brief handle to a data iterator */ -typedef void *DataIterHandle; // NOLINT(*) -/*! \brief handle to a internal data holder. */ -typedef void *DataHolderHandle; // NOLINT(*) - -/*! \brief Mini batch used in XGBoost Data Iteration */ -typedef struct { // NOLINT(*) - /*! \brief number of rows in the minibatch */ - size_t size; - /* \brief number of columns in the minibatch. */ - size_t columns; - /*! \brief row pointer to the rows in the data */ -#ifdef __APPLE__ - /* Necessary as Java on MacOS defines jlong as long int - * and gcc defines int64_t as long long int. */ - long* offset; // NOLINT(*) -#else - int64_t* offset; // NOLINT(*) -#endif // __APPLE__ - /*! \brief labels of each instance */ - float* label; - /*! \brief weight of each instance, can be NULL */ - float* weight; - /*! \brief feature index */ - int* index; - /*! \brief feature values */ - float* value; -} XGBoostBatchCSR; /*! * \brief Return the version of the XGBoost library being currently used. @@ -71,29 +42,6 @@ typedef struct { // NOLINT(*) */ XGB_DLL void XGBoostVersion(int* major, int* minor, int* patch); -/*! - * \brief Callback to set the data to handle, - * \param handle The handle to the callback. - * \param batch The data content to be set. - */ -XGB_EXTERN_C typedef int XGBCallbackSetData( // NOLINT(*) - DataHolderHandle handle, XGBoostBatchCSR batch); - -/*! - * \brief The data reading callback function. - * The iterator will be able to give subset of batch in the data. - * - * If there is data, the function will call set_function to set the data. - * - * \param data_handle The handle to the callback. - * \param set_function The batch returned by the iterator - * \param set_function_handle The handle to be passed to set function. - * \return 0 if we are reaching the end and batch is not returned. - */ -XGB_EXTERN_C typedef int XGBCallbackDataIterNext( // NOLINT(*) - DataIterHandle data_handle, XGBCallbackSetData *set_function, - DataHolderHandle set_function_handle); - /*! * \brief get string message of the last error * @@ -126,20 +74,6 @@ XGB_DLL int XGDMatrixCreateFromFile(const char *fname, int silent, DMatrixHandle *out); -/*! - * \brief Create a DMatrix from a data iterator. - * \param data_handle The handle to the data. - * \param callback The callback to get the data. - * \param cache_info Additional information about cache file, can be null. - * \param out The created DMatrix - * \return 0 when success, -1 when failure happens. - */ -XGB_DLL int XGDMatrixCreateFromDataIter( - DataIterHandle data_handle, - XGBCallbackDataIterNext* callback, - const char* cache_info, - DMatrixHandle *out); - /*! * \brief create a matrix content from CSR format * \param indptr pointer to row headers @@ -221,6 +155,189 @@ XGB_DLL int XGDMatrixCreateFromDT(void** data, bst_ulong ncol, DMatrixHandle* out, int nthread); + +/* + * ========================== Begin data callback APIs ========================= + * + * Short notes for data callback + * + * There are 2 sets of data callbacks for DMatrix. The first one is currently exclusively + * used by JVM packages. It uses `XGBoostBatchCSR` to accept batches for CSR formated + * input, and concatenate them into 1 final big CSR. The related functions are: + * + * - XGBCallbackSetData + * - XGBCallbackDataIterNext + * - XGDMatrixCreateFromDataIter + * + * Another set is used by Quantile based DMatrix (used by hist algorithm) for reducing + * memory usage. Currently only GPU implementation is available. It accept foreign data + * iterators as callbacks and works similar to external memory. For GPU Hist, the data is + * first compressed by quantile sketching then merged. This is particular useful for + * distributed setting as it eliminates 2 copies of data. 1 by a `concat` from external + * library to make the data into a blob for normal DMatrix initialization, another by the + * internal CSR copy of DMatrix. Related functions are: + * + * - XGProxyDMatrixCreate + * - XGDMatrixCallbackNext + * - DataIterResetCallback + * - XGDeviceQuantileDMatrixSetDataCudaArrayInterface + * - XGDeviceQuantileDMatrixSetDataCudaColumnar + * - ... (data setters) + */ + +/* ==== First set of callback functions, used exclusively by JVM packages. ==== */ + +/*! \brief handle to a external data iterator */ +typedef void *DataIterHandle; // NOLINT(*) +/*! \brief handle to a internal data holder. */ +typedef void *DataHolderHandle; // NOLINT(*) + + +/*! \brief Mini batch used in XGBoost Data Iteration */ +typedef struct { // NOLINT(*) + /*! \brief number of rows in the minibatch */ + size_t size; + /* \brief number of columns in the minibatch. */ + size_t columns; + /*! \brief row pointer to the rows in the data */ +#ifdef __APPLE__ + /* Necessary as Java on MacOS defines jlong as long int + * and gcc defines int64_t as long long int. */ + long* offset; // NOLINT(*) +#else + int64_t* offset; // NOLINT(*) +#endif // __APPLE__ + /*! \brief labels of each instance */ + float* label; + /*! \brief weight of each instance, can be NULL */ + float* weight; + /*! \brief feature index */ + int* index; + /*! \brief feature values */ + float* value; +} XGBoostBatchCSR; + +/*! + * \brief Callback to set the data to handle, + * \param handle The handle to the callback. + * \param batch The data content to be set. + */ +XGB_EXTERN_C typedef int XGBCallbackSetData( // NOLINT(*) + DataHolderHandle handle, XGBoostBatchCSR batch); + +/*! + * \brief The data reading callback function. + * The iterator will be able to give subset of batch in the data. + * + * If there is data, the function will call set_function to set the data. + * + * \param data_handle The handle to the callback. + * \param set_function The batch returned by the iterator + * \param set_function_handle The handle to be passed to set function. + * \return 0 if we are reaching the end and batch is not returned. + */ +XGB_EXTERN_C typedef int XGBCallbackDataIterNext( // NOLINT(*) + DataIterHandle data_handle, XGBCallbackSetData *set_function, + DataHolderHandle set_function_handle); + +/*! + * \brief Create a DMatrix from a data iterator. + * \param data_handle The handle to the data. + * \param callback The callback to get the data. + * \param cache_info Additional information about cache file, can be null. + * \param out The created DMatrix + * \return 0 when success, -1 when failure happens. + */ +XGB_DLL int XGDMatrixCreateFromDataIter( + DataIterHandle data_handle, + XGBCallbackDataIterNext* callback, + const char* cache_info, + DMatrixHandle *out); + +/* == Second set of callback functions, used by constructing Quantile based DMatrix. === + * + * Short note for how to use the second set of callback for GPU Hist tree method. + * + * Step 0: Define a data iterator with 2 methods `reset`, and `next`. + * Step 1: Create a DMatrix proxy by `XGProxyDMatrixCreate` and hold the handle. + * Step 2: Pass the iterator handle, proxy handle and 2 methods into + * `XGDeviceQuantileDMatrixCreateFromCallback`. + * Step 3: Call appropriate data setters in `next` functions. + * + * See test_iterative_device_dmatrix.cu or Python interface for examples. + */ + +/*! + * \brief Create a DMatrix proxy for setting data, can be free by XGDMatrixFree. + * + * \param out The created Device Quantile DMatrix + * + * \return 0 when success, -1 when failure happens + */ +XGB_DLL int XGProxyDMatrixCreate(DMatrixHandle* out); + +/*! + * \brief Callback function prototype for getting next batch of data. + * + * \param iter A handler to the user defined iterator. + * + * \return 0 when success, -1 when failure happens + */ +XGB_EXTERN_C typedef int XGDMatrixCallbackNext(DataIterHandle iter); // NOLINT(*) + +/*! + * \brief Callback function prototype for reseting external iterator + */ +XGB_EXTERN_C typedef void DataIterResetCallback(DataIterHandle handle); // NOLINT(*) + +/*! + * \brief Create a device DMatrix with data iterator. + * + * \param iter A handle to external data iterator. + * \param proxy A DMatrix proxy handle created by `XGProxyDMatrixCreate`. + * \param reset Callback function reseting the iterator state. + * \param next Callback function yieling the next batch of data. + * \param missing Which value to represent missing value + * \param nthread Number of threads to use, 0 for default. + * \param max_bin Maximum number of bins for building histogram. + * \param out The created Device Quantile DMatrix + * + * \return 0 when success, -1 when failure happens + */ +XGB_DLL int XGDeviceQuantileDMatrixCreateFromCallback( + DataIterHandle iter, DMatrixHandle proxy, DataIterResetCallback *reset, + XGDMatrixCallbackNext *next, float missing, int nthread, int max_bin, + DMatrixHandle *out); +/*! + * \brief Set data on a DMatrix proxy. + * + * \param handle A DMatrix proxy created by XGProxyDMatrixCreate + * \param c_interface_str Null terminated JSON document string representation of CUDA + * array interface. + * + * \return 0 when success, -1 when failure happens + */ +XGB_DLL int XGDeviceQuantileDMatrixSetDataCudaArrayInterface( + DMatrixHandle handle, + const char* c_interface_str); +/*! + * \brief Set data on a DMatrix proxy. + * + * \param handle A DMatrix proxy created by XGProxyDMatrixCreate + * \param c_interface_str Null terminated JSON document string representation of CUDA + * array interface, with an array of columns. + * + * \return 0 when success, -1 when failure happens + */ +XGB_DLL int XGDeviceQuantileDMatrixSetDataCudaColumnar( + DMatrixHandle handle, + const char* c_interface_str); +/* + * ==========================- End data callback APIs ========================== + */ + + + /*! * \brief create a new dmatrix from sliced content of existing matrix * \param handle instance of data matrix to be sliced @@ -261,6 +378,18 @@ XGB_DLL int XGDMatrixFree(DMatrixHandle handle); */ XGB_DLL int XGDMatrixSaveBinary(DMatrixHandle handle, const char *fname, int silent); + +/*! + * \brief Set content in array interface to a content in info. + * \param handle a instance of data matrix + * \param field field name. + * \param c_interface_str JSON string representation of array interface. + * \return 0 when success, -1 when failure happens + */ +XGB_DLL int XGDMatrixSetInfoFromInterface(DMatrixHandle handle, + char const* field, + char const* c_interface_str); + /*! * \brief set float vector to a content in info * \param handle a instance of data matrix @@ -437,6 +566,10 @@ XGB_DLL int XGBoosterPredict(BoosterHandle handle, int training, bst_ulong *out_len, const float **out_result); + +/* + * ========================== Begin Serialization APIs ========================= + */ /* * Short note for serialization APIs. There are 3 different sets of serialization API. * @@ -559,6 +692,10 @@ XGB_DLL int XGBoosterSaveJsonConfig(BoosterHandle handle, bst_ulong *out_len, */ XGB_DLL int XGBoosterLoadJsonConfig(BoosterHandle handle, char const *json_parameters); +/* + * =========================== End Serialization APIs ========================== + */ + /*! * \brief dump model, return array of strings representing model dump diff --git a/include/xgboost/data.h b/include/xgboost/data.h index 57babfafe..1e9e429d5 100644 --- a/include/xgboost/data.h +++ b/include/xgboost/data.h @@ -502,7 +502,33 @@ class DMatrix { const std::string& cache_prefix = "", size_t page_size = kPageSize); - virtual DMatrix* Slice(common::Span ridxs) = 0; + /** + * \brief Create a new Quantile based DMatrix used for histogram based algorithm. + * + * \tparam DataIterHandle External iterator type, defined in C API. + * \tparam DMatrixHandle DMatrix handle, defined in C API. + * \tparam DataIterResetCallback Callback for reset, prototype defined in C API. + * \tparam XGDMatrixCallbackNext Callback for next, prototype defined in C API. + * + * \param iter External data iterator + * \param proxy A hanlde to ProxyDMatrix + * \param reset Callback for reset + * \param next Callback for next + * \param missing Value that should be treated as missing. + * \param nthread number of threads used for initialization. + * \param max_bin Maximum number of bins. + * + * \return A created quantile based DMatrix. + */ + template + static DMatrix *Create(DataIterHandle iter, DMatrixHandle proxy, + DataIterResetCallback *reset, + XGDMatrixCallbackNext *next, float missing, + int nthread, + int max_bin); + + virtual DMatrix *Slice(common::Span ridxs) = 0; /*! \brief page size 32 MB */ static const size_t kPageSize = 32UL << 20UL; diff --git a/src/c_api/c_api.cc b/src/c_api/c_api.cc index 8431e4243..adf51f780 100644 --- a/src/c_api/c_api.cc +++ b/src/c_api/c_api.cc @@ -23,6 +23,7 @@ #include "../common/io.h" #include "../data/adapter.h" #include "../data/simple_dmatrix.h" +#include "../data/proxy_dmatrix.h" using namespace xgboost; // NOLINT(*); @@ -101,6 +102,50 @@ XGB_DLL int XGDMatrixCreateFromArrayInterface(char const* c_json_strs, #endif +// Create from data iterator +XGB_DLL int XGProxyDMatrixCreate(DMatrixHandle* out) { + API_BEGIN(); + *out = new std::shared_ptr(new xgboost::data::DMatrixProxy);; + API_END(); +} + +XGB_DLL int +XGDeviceQuantileDMatrixSetDataCudaArrayInterface(DMatrixHandle handle, + char const *c_interface_str) { + API_BEGIN(); + CHECK_HANDLE(); + auto p_m = static_cast *>(handle); + CHECK(p_m); + auto m = static_cast(p_m->get()); + CHECK(m) << "Current DMatrix type does not support set data."; + m->SetData(c_interface_str); + API_END(); +} + +XGB_DLL int +XGDeviceQuantileDMatrixSetDataCudaColumnar(DMatrixHandle handle, + char const *c_interface_str) { + API_BEGIN(); + CHECK_HANDLE(); + auto p_m = static_cast *>(handle); + CHECK(p_m); + auto m = static_cast(p_m->get()); + CHECK(m) << "Current DMatrix type does not support set data."; + m->SetData(c_interface_str); + API_END(); +} + +XGB_DLL int XGDeviceQuantileDMatrixCreateFromCallback( + DataIterHandle iter, DMatrixHandle proxy, DataIterResetCallback *reset, + XGDMatrixCallbackNext *next, float missing, int nthread, + int max_bin, DMatrixHandle *out) { + API_BEGIN(); + *out = new std::shared_ptr{ + xgboost::DMatrix::Create(iter, proxy, reset, next, missing, nthread, max_bin)}; + API_END(); +} +// End Create from data iterator + XGB_DLL int XGDMatrixCreateFromCSREx(const size_t* indptr, const unsigned* indices, const bst_float* data, diff --git a/src/common/hist_util.cuh b/src/common/hist_util.cuh index 6f8d1e522..fe720c530 100644 --- a/src/common/hist_util.cuh +++ b/src/common/hist_util.cuh @@ -68,9 +68,11 @@ struct SketchContainer { // Prevent copying/assigning/moving this as its internals can't be // assigned/copied/moved SketchContainer(const SketchContainer&) = delete; - SketchContainer(const SketchContainer&&) = delete; + SketchContainer(SketchContainer&& that) { + std::swap(sketches_, that.sketches_); + } SketchContainer& operator=(const SketchContainer&) = delete; - SketchContainer& operator=(const SketchContainer&&) = delete; + SketchContainer& operator=(SketchContainer&&) = delete; }; struct EntryCompareOp { diff --git a/src/data/data.cc b/src/data/data.cc index f24753e31..8d36d8278 100644 --- a/src/data/data.cc +++ b/src/data/data.cc @@ -19,6 +19,7 @@ #include "../common/version.h" #include "../common/group_data.h" #include "../data/adapter.h" +#include "../data/iterative_device_dmatrix.h" #if DMLC_ENABLE_STD_THREAD #include "./sparse_page_source.h" @@ -569,6 +570,26 @@ DMatrix* DMatrix::Load(const std::string& uri, } return dmat; } +template +DMatrix *DMatrix::Create(DataIterHandle iter, DMatrixHandle proxy, + DataIterResetCallback *reset, + XGDMatrixCallbackNext *next, float missing, + int nthread, + int max_bin) { +#if defined(XGBOOST_USE_CUDA) + return new data::IterativeDeviceDMatrix(iter, proxy, reset, next, missing, nthread, max_bin); +#else + common::AssertGPUSupport(); + return nullptr; +#endif +} + +template DMatrix *DMatrix::Create( + DataIterHandle iter, DMatrixHandle proxy, DataIterResetCallback *reset, + XGDMatrixCallbackNext *next, float missing, int nthread, + int max_bin); template DMatrix* DMatrix::Create(AdapterT* adapter, float missing, int nthread, diff --git a/src/data/ellpack_page.cu b/src/data/ellpack_page.cu index f2a0a2ea9..4a91c9545 100644 --- a/src/data/ellpack_page.cu +++ b/src/data/ellpack_page.cu @@ -252,6 +252,31 @@ EllpackPageImpl::EllpackPageImpl(AdapterT* adapter, float missing, bool is_dense ELLPACK_SPECIALIZATION(data::CudfAdapter) ELLPACK_SPECIALIZATION(data::CupyAdapter) + +template +EllpackPageImpl::EllpackPageImpl(AdapterBatch batch, float missing, int device, + bool is_dense, int nthread, + common::Span row_counts_span, + size_t row_stride, size_t n_rows, size_t n_cols, + common::HistogramCuts const& cuts) { + dh::safe_cuda(cudaSetDevice(device)); + + *this = EllpackPageImpl(device, cuts, is_dense, row_stride, n_rows); + CopyDataToEllpack(batch, this, device, missing); + WriteNullValues(this, device, row_counts_span); +} + +#define ELLPACK_BATCH_SPECIALIZE(__BATCH_T) \ + template EllpackPageImpl::EllpackPageImpl( \ + __BATCH_T batch, float missing, int device, \ + bool is_dense, int nthread, \ + common::Span row_counts_span, \ + size_t row_stride, size_t n_rows, size_t n_cols, \ + common::HistogramCuts const& cuts); + +ELLPACK_BATCH_SPECIALIZE(data::CudfAdapterBatch) +ELLPACK_BATCH_SPECIALIZE(data::CupyAdapterBatch) + // A functor that copies the data from one EllpackPage to another. struct CopyPage { common::CompressedBufferWriter cbw; @@ -279,6 +304,10 @@ size_t EllpackPageImpl::Copy(int device, EllpackPageImpl* page, size_t offset) { CHECK_EQ(row_stride, page->row_stride); CHECK_EQ(NumSymbols(), page->NumSymbols()); CHECK_GE(n_rows * row_stride, offset + num_elements); + if (page == this) { + LOG(FATAL) << "Concatenating the same Ellpack."; + return this->n_rows * this->row_stride; + } gidx_buffer.SetDevice(device); page->gidx_buffer.SetDevice(device); dh::LaunchN(device, num_elements, CopyPage(this, page, offset)); diff --git a/src/data/ellpack_page.cuh b/src/data/ellpack_page.cuh index 011190a07..fb54a0c65 100644 --- a/src/data/ellpack_page.cuh +++ b/src/data/ellpack_page.cuh @@ -149,7 +149,7 @@ class EllpackPageImpl { EllpackPageImpl(int device, common::HistogramCuts cuts, const SparsePage& page, - bool is_dense,size_t row_stride); + bool is_dense, size_t row_stride); /*! * \brief Constructor from an existing DMatrix. @@ -161,8 +161,16 @@ class EllpackPageImpl { template explicit EllpackPageImpl(AdapterT* adapter, float missing, bool is_dense, int nthread, - int max_bin, common::Span row_counts_span, + int max_bin, + common::Span row_counts_span, size_t row_stride); + + template + explicit EllpackPageImpl(AdapterBatch batch, float missing, int device, bool is_dense, int nthread, + common::Span row_counts_span, + size_t row_stride, size_t n_rows, size_t n_cols, + common::HistogramCuts const& cuts); + /*! \brief Copy the elements of the given ELLPACK page into this page. * * @param device The GPU device to use. diff --git a/src/data/iterative_device_dmatrix.cu b/src/data/iterative_device_dmatrix.cu new file mode 100644 index 000000000..6a4e06e02 --- /dev/null +++ b/src/data/iterative_device_dmatrix.cu @@ -0,0 +1,188 @@ +/*! + * Copyright 2020 XGBoost contributors + */ +#include +#include +#include + +#include "../common/hist_util.cuh" +#include "simple_batch_iterator.h" +#include "iterative_device_dmatrix.h" +#include "sparse_page_source.h" +#include "ellpack_page.cuh" +#include "proxy_dmatrix.h" +#include "device_adapter.cuh" + +namespace xgboost { +namespace data { + +template +decltype(auto) Dispatch(DMatrixProxy const* proxy, Fn fn) { + if (proxy->Adapter().type() == typeid(std::shared_ptr)) { + auto value = dmlc::get>( + proxy->Adapter())->Value(); + return fn(value); + } else if (proxy->Adapter().type() == typeid(std::shared_ptr)) { + auto value = dmlc::get>( + proxy->Adapter())->Value(); + return fn(value); + } else { + LOG(FATAL) << "Unknown type: " << proxy->Adapter().type().name(); + auto value = dmlc::get>( + proxy->Adapter())->Value(); + return fn(value); + } +} + +void IterativeDeviceDMatrix::Initialize(DataIterHandle iter_handle, float missing, int nthread) { + // A handle passed to external iterator. + auto handle = static_cast*>(proxy_); + CHECK(handle); + DMatrixProxy* proxy = static_cast(handle->get()); + CHECK(proxy); + // The external iterator + auto iter = DataIterProxy{ + iter_handle, reset_, next_}; + + dh::XGBCachingDeviceAllocator alloc; + + auto num_rows = [&]() { + return Dispatch(proxy, [](auto const &value) { return value.NumRows(); }); + }; + auto num_cols = [&]() { + return Dispatch(proxy, [](auto const &value) { return value.NumCols(); }); + }; + + size_t row_stride = 0; + size_t nnz = 0; + // Sketch for all batches. + iter.Reset(); + common::HistogramCuts cuts; + common::DenseCuts dense_cuts(&cuts); + + std::vector sketch_containers; + size_t batches = 0; + size_t accumulated_rows = 0; + bst_feature_t cols = 0; + while (iter.Next()) { + auto device = proxy->DeviceIdx(); + dh::safe_cuda(cudaSetDevice(device)); + if (cols == 0) { + cols = num_cols(); + } else { + CHECK_EQ(cols, num_cols()) << "Inconsistent number of columns."; + } + sketch_containers.emplace_back(batch_param_.max_bin, num_cols(), num_rows()); + auto* p_sketch = &sketch_containers.back(); + if (proxy->Info().weights_.Size() != 0) { + proxy->Info().weights_.SetDevice(device); + Dispatch(proxy, [&](auto const &value) { + common::AdapterDeviceSketchWeighted(value, batch_param_.max_bin, + proxy->Info(), + missing, device, p_sketch); + }); + } else { + Dispatch(proxy, [&](auto const &value) { + common::AdapterDeviceSketch(value, batch_param_.max_bin, missing, + device, p_sketch); + }); + } + + auto batch_rows = num_rows(); + accumulated_rows += batch_rows; + dh::caching_device_vector row_counts(batch_rows + 1, 0); + common::Span row_counts_span(row_counts.data().get(), + row_counts.size()); + row_stride = + std::max(row_stride, Dispatch(proxy, [=](auto const& value) { + return GetRowCounts(value, row_counts_span, device, missing); + })); + nnz += thrust::reduce(thrust::cuda::par(alloc), + row_counts.begin(), row_counts.end()); + batches++; + } + + // Merging multiple batches for each column + std::vector summary_array(cols); + size_t intermediate_num_cuts = std::min( + accumulated_rows, static_cast(batch_param_.max_bin * + common::SketchContainer::kFactor)); + size_t nbytes = + common::WQSketch::SummaryContainer::CalcMemCost(intermediate_num_cuts); +#pragma omp parallel for num_threads(nthread) if (nthread > 0) + for (omp_ulong c = 0; c < cols; ++c) { + for (auto& sketch_batch : sketch_containers) { + common::WQSketch::SummaryContainer summary; + sketch_batch.sketches_.at(c).GetSummary(&summary); + sketch_batch.sketches_.at(c).Init(0, 1); + summary_array.at(c).Reduce(summary, nbytes); + } + } + sketch_containers.clear(); + + // Build the final summary. + std::vector sketches(cols); +#pragma omp parallel for num_threads(nthread) if (nthread > 0) + for (omp_ulong c = 0; c < cols; ++c) { + sketches.at(c).Init( + accumulated_rows, + 1.0 / (common::SketchContainer::kFactor * batch_param_.max_bin)); + sketches.at(c).PushSummary(summary_array.at(c)); + } + dense_cuts.Init(&sketches, batch_param_.max_bin, accumulated_rows); + summary_array.clear(); + + this->info_.num_col_ = cols; + this->info_.num_row_ = accumulated_rows; + this->info_.num_nonzero_ = nnz; + + // Construct the final ellpack page. + page_.reset(new EllpackPage); + *(page_->Impl()) = EllpackPageImpl(proxy->DeviceIdx(), cuts, this->IsDense(), + row_stride, accumulated_rows); + + size_t offset = 0; + iter.Reset(); + while (iter.Next()) { + auto device = proxy->DeviceIdx(); + dh::safe_cuda(cudaSetDevice(device)); + auto rows = num_rows(); + dh::caching_device_vector row_counts(rows + 1, 0); + common::Span row_counts_span(row_counts.data().get(), + row_counts.size()); + Dispatch(proxy, [=](auto const& value) { + return GetRowCounts(value, row_counts_span, device, missing); + }); + auto is_dense = this->IsDense(); + auto new_impl = Dispatch(proxy, [&](auto const &value) { + return EllpackPageImpl(value, missing, device, is_dense, nthread, + row_counts_span, row_stride, rows, cols, cuts); + }); + size_t num_elements = page_->Impl()->Copy(device, &new_impl, offset); + offset += num_elements; + + proxy->Info().num_row_ = num_rows(); + proxy->Info().num_col_ = cols; + if (batches != 1) { + this->info_.Extend(std::move(proxy->Info()), false); + } + } + + if (batches == 1) { + this->info_ = std::move(proxy->Info()); + CHECK_EQ(proxy->Info().labels_.Size(), 0); + } + + iter.Reset(); + // Synchronise worker columns + rabit::Allreduce(&info_.num_col_, 1); +} + +BatchSet IterativeDeviceDMatrix::GetEllpackBatches(const BatchParam& param) { + CHECK(page_); + auto begin_iter = + BatchIterator(new SimpleBatchIteratorImpl(page_.get())); + return BatchSet(begin_iter); +} +} // namespace data +} // namespace xgboost diff --git a/src/data/iterative_device_dmatrix.h b/src/data/iterative_device_dmatrix.h new file mode 100644 index 000000000..bc73d7c85 --- /dev/null +++ b/src/data/iterative_device_dmatrix.h @@ -0,0 +1,76 @@ +/*! + * Copyright 2020 by Contributors + * \file iterative_device_dmatrix.h + */ +#ifndef XGBOOST_DATA_ITERATIVE_DEVICE_DMATRIX_H_ +#define XGBOOST_DATA_ITERATIVE_DEVICE_DMATRIX_H_ + +#include +#include +#include +#include + +#include "xgboost/base.h" +#include "xgboost/data.h" +#include "xgboost/c_api.h" +#include "proxy_dmatrix.h" + +namespace xgboost { +namespace data { + +class IterativeDeviceDMatrix : public DMatrix { + MetaInfo info_; + BatchParam batch_param_; + std::shared_ptr page_; + + DMatrixHandle proxy_; + DataIterResetCallback *reset_; + XGDMatrixCallbackNext *next_; + + public: + void Initialize(DataIterHandle iter, float missing, int nthread); + + public: + explicit IterativeDeviceDMatrix(DataIterHandle iter, DMatrixHandle proxy, + DataIterResetCallback *reset, + XGDMatrixCallbackNext *next, float missing, + int nthread, int max_bin) + : proxy_{proxy}, reset_{reset}, next_{next} { + batch_param_ = BatchParam{0, max_bin, 0}; + this->Initialize(iter, missing, nthread); + } + + bool EllpackExists() const override { return true; } + bool SparsePageExists() const override { return false; } + DMatrix *Slice(common::Span ridxs) override { + LOG(FATAL) << "Slicing DMatrix is not supported for Device DMatrix."; + return nullptr; + } + BatchSet GetRowBatches() override { + LOG(FATAL) << "Not implemented."; + return BatchSet(BatchIterator(nullptr)); + } + BatchSet GetColumnBatches() override { + LOG(FATAL) << "Not implemented."; + return BatchSet(BatchIterator(nullptr)); + } + BatchSet GetSortedColumnBatches() override { + LOG(FATAL) << "Not implemented."; + return BatchSet(BatchIterator(nullptr)); + } + + BatchSet GetEllpackBatches(const BatchParam& param) override; + + bool SingleColBlock() const override { return false; } + + MetaInfo& Info() override { + return info_; + } + MetaInfo const& Info() const override { + return info_; + } +}; +} // namespace data +} // namespace xgboost + +#endif // XGBOOST_DATA_ITERATIVE_DEVICE_DMATRIX_H_ diff --git a/tests/cpp/data/test_iterative_device_dmatrix.cu b/tests/cpp/data/test_iterative_device_dmatrix.cu new file mode 100644 index 000000000..e44010336 --- /dev/null +++ b/tests/cpp/data/test_iterative_device_dmatrix.cu @@ -0,0 +1,166 @@ +/*! + * Copyright 2020 XGBoost contributors + */ +#include + +#include "../helpers.h" +#include "../../../src/data/iterative_device_dmatrix.h" +#include "../../../src/data/ellpack_page.cuh" +#include "../../../src/data/device_adapter.cuh" + +namespace xgboost { +namespace data { + +void TestEquivalent(float sparsity) { + CudaArrayIterForTest iter{sparsity}; + IterativeDeviceDMatrix m( + &iter, iter.Proxy(), Reset, Next, std::numeric_limits::quiet_NaN(), + 0, 256); + size_t offset = 0; + auto first = (*m.GetEllpackBatches({}).begin()).Impl(); + std::unique_ptr page_concatenated { + new EllpackPageImpl(0, first->Cuts(), first->is_dense, + first->row_stride, 1000 * 100)}; + for (auto& batch : m.GetBatches()) { + auto page = batch.Impl(); + size_t num_elements = page_concatenated->Copy(0, page, offset); + offset += num_elements; + } + auto from_iter = page_concatenated->GetDeviceAccessor(0); + ASSERT_EQ(m.Info().num_col_, CudaArrayIterForTest::kCols); + ASSERT_EQ(m.Info().num_row_, CudaArrayIterForTest::kRows); + + std::string interface_str = iter.AsArray(); + auto adapter = CupyAdapter(interface_str); + std::unique_ptr dm{ + DMatrix::Create(&adapter, std::numeric_limits::quiet_NaN(), 0)}; + BatchParam bp {0, 256}; + for (auto& ellpack : dm->GetBatches(bp)) { + auto from_data = ellpack.Impl()->GetDeviceAccessor(0); + + std::vector cuts_from_iter(from_iter.gidx_fvalue_map.size()); + std::vector min_fvalues_iter(from_iter.min_fvalue.size()); + std::vector cut_ptrs_iter(from_iter.feature_segments.size()); + dh::CopyDeviceSpanToVector(&cuts_from_iter, from_iter.gidx_fvalue_map); + dh::CopyDeviceSpanToVector(&min_fvalues_iter, from_iter.min_fvalue); + dh::CopyDeviceSpanToVector(&cut_ptrs_iter, from_iter.feature_segments); + + std::vector cuts_from_data(from_data.gidx_fvalue_map.size()); + std::vector min_fvalues_data(from_data.min_fvalue.size()); + std::vector cut_ptrs_data(from_data.feature_segments.size()); + dh::CopyDeviceSpanToVector(&cuts_from_data, from_data.gidx_fvalue_map); + dh::CopyDeviceSpanToVector(&min_fvalues_data, from_data.min_fvalue); + dh::CopyDeviceSpanToVector(&cut_ptrs_data, from_data.feature_segments); + + ASSERT_EQ(cuts_from_iter.size(), cuts_from_data.size()); + for (size_t i = 0; i < cuts_from_iter.size(); ++i) { + EXPECT_NEAR(cuts_from_iter[i], cuts_from_data[i], kRtEps); + } + ASSERT_EQ(min_fvalues_iter.size(), min_fvalues_data.size()); + for (size_t i = 0; i < min_fvalues_iter.size(); ++i) { + ASSERT_NEAR(min_fvalues_iter[i], min_fvalues_data[i], kRtEps); + } + ASSERT_EQ(cut_ptrs_iter.size(), cut_ptrs_data.size()); + for (size_t i = 0; i < cut_ptrs_iter.size(); ++i) { + ASSERT_EQ(cut_ptrs_iter[i], cut_ptrs_data[i]); + } + + auto const& buffer_from_iter = page_concatenated->gidx_buffer; + auto const& buffer_from_data = ellpack.Impl()->gidx_buffer; + ASSERT_NE(buffer_from_data.Size(), 0); + ASSERT_EQ(buffer_from_data.ConstHostVector(), buffer_from_data.ConstHostVector()); + } +} + +TEST(IterativeDeviceDMatrix, Basic) { + TestEquivalent(0.0); + TestEquivalent(0.5); +} + +TEST(IterativeDeviceDMatrix, RowMajor) { + CudaArrayIterForTest iter(0.0f); + IterativeDeviceDMatrix m( + &iter, iter.Proxy(), Reset, Next, std::numeric_limits::quiet_NaN(), + 0, 256); + size_t n_batches = 0; + std::string interface_str = iter.AsArray(); + for (auto& ellpack : m.GetBatches()) { + n_batches ++; + auto impl = ellpack.Impl(); + common::CompressedIterator iterator( + impl->gidx_buffer.HostVector().data(), impl->NumSymbols()); + auto cols = CudaArrayIterForTest::kCols; + auto rows = CudaArrayIterForTest::kRows; + + auto j_interface = + Json::Load({interface_str.c_str(), interface_str.size()}); + ArrayInterface loaded {get(j_interface)}; + std::vector h_data(cols * rows); + common::Span s_data{static_cast(loaded.data), cols * rows}; + dh::CopyDeviceSpanToVector(&h_data, s_data); + + for(auto i = 0ull; i < rows * cols; i++) { + int column_idx = i % cols; + EXPECT_EQ(impl->Cuts().SearchBin(h_data[i], column_idx), iterator[i]); + } + EXPECT_EQ(m.Info().num_col_, cols); + EXPECT_EQ(m.Info().num_row_, rows); + EXPECT_EQ(m.Info().num_nonzero_, rows * cols); + } + // All batches are concatenated. + ASSERT_EQ(n_batches, 1); +} + +TEST(IterativeDeviceDMatrix, RowMajorMissing) { + const float kMissing = std::numeric_limits::quiet_NaN(); + size_t rows = 10; + size_t cols = 2; + CudaArrayIterForTest iter(0.0f, rows, cols, 2); + std::string interface_str = iter.AsArray(); + auto j_interface = + Json::Load({interface_str.c_str(), interface_str.size()}); + ArrayInterface loaded {get(j_interface)}; + std::vector h_data(cols * rows); + common::Span s_data{static_cast(loaded.data), cols * rows}; + dh::CopyDeviceSpanToVector(&h_data, s_data); + h_data[1] = kMissing; + h_data[5] = kMissing; + h_data[6] = kMissing; + auto ptr = thrust::device_ptr( + reinterpret_cast(get(j_interface["data"][0]))); + thrust::copy(h_data.cbegin(), h_data.cend(), ptr); + + IterativeDeviceDMatrix m( + &iter, iter.Proxy(), Reset, Next, std::numeric_limits::quiet_NaN(), + 0, 256); + auto &ellpack = *m.GetBatches({0, 256, 0}).begin(); + auto impl = ellpack.Impl(); + common::CompressedIterator iterator( + impl->gidx_buffer.HostVector().data(), impl->NumSymbols()); + EXPECT_EQ(iterator[1], impl->GetDeviceAccessor(0).NullValue()); + EXPECT_EQ(iterator[5], impl->GetDeviceAccessor(0).NullValue()); + // null values get placed after valid values in a row + EXPECT_EQ(iterator[7], impl->GetDeviceAccessor(0).NullValue()); + EXPECT_EQ(m.Info().num_col_, cols); + EXPECT_EQ(m.Info().num_row_, rows); + EXPECT_EQ(m.Info().num_nonzero_, rows* cols - 3); +} + +TEST(IterativeDeviceDMatrix, IsDense) { + int num_bins = 16; + auto test = [num_bins] (float sparsity) { + CudaArrayIterForTest iter(sparsity); + IterativeDeviceDMatrix m( + &iter, iter.Proxy(), Reset, Next, std::numeric_limits::quiet_NaN(), + 0, 256); + if (sparsity == 0.0) { + ASSERT_TRUE(m.IsDense()); + } else { + ASSERT_FALSE(m.IsDense()); + } + }; + test(0.0); + test(0.1); +} +} // namespace data +} // namespace xgboost diff --git a/tests/cpp/helpers.cu b/tests/cpp/helpers.cu index cd53ebf18..9b70ea543 100644 --- a/tests/cpp/helpers.cu +++ b/tests/cpp/helpers.cu @@ -1,17 +1,43 @@ +#include + #include "helpers.h" #include "../../src/data/device_adapter.cuh" -#include "../../src/data/device_dmatrix.h" +#include "../../src/data/iterative_device_dmatrix.h" namespace xgboost { + +CudaArrayIterForTest::CudaArrayIterForTest(float sparsity, size_t rows, + size_t cols, size_t batches) + : rows_{rows}, cols_{cols}, n_batches_{batches} { + XGProxyDMatrixCreate(&proxy_); + rng_.reset(new RandomDataGenerator{rows_, cols_, sparsity}); + rng_->Device(0); + std::tie(batches_, interface_) = + rng_->GenerateArrayInterfaceBatch(&data_, n_batches_); + this->Reset(); +} + +CudaArrayIterForTest::~CudaArrayIterForTest() { XGDMatrixFree(proxy_); } + +int CudaArrayIterForTest::Next() { + if (iter_ == n_batches_) { + return 0; + } + XGDeviceQuantileDMatrixSetDataCudaArrayInterface(proxy_, batches_[iter_].c_str()); + iter_++; + return 1; +} + +size_t constexpr CudaArrayIterForTest::kRows; +size_t constexpr CudaArrayIterForTest::kCols; + std::shared_ptr RandomDataGenerator::GenerateDeviceDMatrix(bool with_label, bool float_label, size_t classes) { - std::vector> storage(cols_); - std::string arr = this->GenerateColumnarArrayInterface(&storage); - auto adapter = data::CudfAdapter(arr); - std::shared_ptr m { - new data::DeviceDMatrix{&adapter, - std::numeric_limits::quiet_NaN(), 1, 256}}; + CudaArrayIterForTest iter{this->sparsity_, this->rows_, this->cols_, 1}; + auto m = std::make_shared( + &iter, iter.Proxy(), Reset, Next, std::numeric_limits::quiet_NaN(), + 0, bins_); return m; } } // namespace xgboost diff --git a/tests/cpp/helpers.h b/tests/cpp/helpers.h index 7d5907718..e0c322a46 100644 --- a/tests/cpp/helpers.h +++ b/tests/cpp/helpers.h @@ -304,5 +304,51 @@ inline HostDeviceVector GenerateRandomGradients(const size_t n_row HostDeviceVector gpair(h_gpair); return gpair; } + +typedef void *DMatrixHandle; // NOLINT(*); + +class CudaArrayIterForTest { + HostDeviceVector data_; + size_t iter_ {0}; + DMatrixHandle proxy_; + std::unique_ptr rng_; + + std::vector batches_; + std::string interface_; + size_t rows_; + size_t cols_; + size_t n_batches_; + + public: + size_t static constexpr kRows { 1000 }; + size_t static constexpr kBatches { 100 }; + size_t static constexpr kCols { 13 }; + + explicit CudaArrayIterForTest(float sparsity, size_t rows = kRows, + size_t cols = kCols, size_t batches = kBatches); + ~CudaArrayIterForTest(); + + std::string AsArray() const { + return interface_; + } + + int Next(); + void Reset() { + iter_ = 0; + } + size_t Iter() const { return iter_; } + auto Proxy() -> decltype(proxy_) { return proxy_; } +}; + +typedef void *DataIterHandle; // NOLINT(*) + +inline void Reset(DataIterHandle self) { + static_cast(self)->Reset(); +} + +inline int Next(DataIterHandle self) { + return static_cast(self)->Next(); +} + } // namespace xgboost #endif diff --git a/tests/cpp/predictor/test_gpu_predictor.cu b/tests/cpp/predictor/test_gpu_predictor.cu index aee33ef76..fc40d1778 100644 --- a/tests/cpp/predictor/test_gpu_predictor.cu +++ b/tests/cpp/predictor/test_gpu_predictor.cu @@ -76,15 +76,15 @@ TEST(GPUPredictor, EllpackTraining) { .Bins(kBins) .Device(0) .GenerateDeviceDMatrix(true); - std::vector> storage(kCols); + HostDeviceVector storage(kRows * kCols); auto columnar = RandomDataGenerator{kRows, kCols, 0.0} .Device(0) - .GenerateColumnarArrayInterface(&storage); - auto adapter = data::CudfAdapter(columnar); + .GenerateArrayInterface(&storage); + auto adapter = data::CupyAdapter(columnar); std::shared_ptr p_full { DMatrix::Create(&adapter, std::numeric_limits::quiet_NaN(), 1) }; - TestTrainingPrediction(kRows, "gpu_hist", p_full, p_ellpack); + TestTrainingPrediction(kRows, kBins, "gpu_hist", p_full, p_ellpack); } TEST(GPUPredictor, ExternalMemoryTest) { diff --git a/tests/cpp/predictor/test_predictor.cc b/tests/cpp/predictor/test_predictor.cc index b00814485..77c5a1634 100644 --- a/tests/cpp/predictor/test_predictor.cc +++ b/tests/cpp/predictor/test_predictor.cc @@ -32,7 +32,8 @@ TEST(Predictor, PredictionCache) { EXPECT_ANY_THROW(container.Entry(m)); } -void TestTrainingPrediction(size_t rows, std::string tree_method, +void TestTrainingPrediction(size_t rows, size_t bins, + std::string tree_method, std::shared_ptr p_full, std::shared_ptr p_hist) { size_t constexpr kCols = 16; diff --git a/tests/cpp/predictor/test_predictor.h b/tests/cpp/predictor/test_predictor.h index 61cd349dc..43c9c950d 100644 --- a/tests/cpp/predictor/test_predictor.h +++ b/tests/cpp/predictor/test_predictor.h @@ -52,7 +52,7 @@ void TestPredictionFromGradientIndex(std::string name, size_t rows, size_t cols, } // p_full and p_hist should come from the same data set. -void TestTrainingPrediction(size_t rows, std::string tree_method, +void TestTrainingPrediction(size_t rows, size_t bins, std::string tree_method, std::shared_ptr p_full, std::shared_ptr p_hist);