Rewrite sparse dmatrix using callbacks. (#7092)

- Reduce dependency on dmlc parsers and provide an interface for users to load data by themselves.
- Remove use of threaded iterator and IO queue.
- Remove `page_size`.
- Make sure the number of pages in memory is bounded.
- Make sure the cache can not be violated.
- Provide an interface for internal algorithms to process data asynchronously.
This commit is contained in:
Jiaming Yuan
2021-07-16 12:33:31 +08:00
committed by GitHub
parent 2f524e9f41
commit bd1f3a38f0
51 changed files with 1445 additions and 1391 deletions

View File

@@ -22,11 +22,10 @@
#include "../common/threading_utils.h"
#include "../data/adapter.h"
#include "../data/iterative_device_dmatrix.h"
#include "file_iterator.h"
#if DMLC_ENABLE_STD_THREAD
#include "./sparse_page_source.h"
#include "./sparse_page_dmatrix.h"
#endif // DMLC_ENABLE_STD_THREAD
namespace dmlc {
DMLC_REGISTRY_ENABLE(::xgboost::data::SparsePageFormatReg<::xgboost::SparsePage>);
@@ -500,13 +499,17 @@ void MetaInfo::GetFeatureInfo(const char *field,
}
}
void MetaInfo::Extend(MetaInfo const& that, bool accumulate_rows) {
void MetaInfo::Extend(MetaInfo const& that, bool accumulate_rows, bool check_column) {
if (accumulate_rows) {
this->num_row_ += that.num_row_;
}
if (this->num_col_ != 0) {
CHECK_EQ(this->num_col_, that.num_col_)
<< "Number of columns must be consistent across batches.";
if (check_column) {
CHECK_EQ(this->num_col_, that.num_col_)
<< "Number of columns must be consistent across batches.";
} else {
this->num_col_ = std::max(this->num_col_, that.num_col_);
}
}
this->num_col_ = that.num_col_;
@@ -630,11 +633,34 @@ DMatrix::~DMatrix() {
}
}
DMatrix *TryLoadBinary(std::string fname, bool silent) {
int magic;
std::unique_ptr<dmlc::Stream> fi(
dmlc::Stream::Create(fname.c_str(), "r", true));
if (fi != nullptr) {
common::PeekableInStream is(fi.get());
if (is.PeekRead(&magic, sizeof(magic)) == sizeof(magic)) {
if (!DMLC_IO_NO_ENDIAN_SWAP) {
dmlc::ByteSwap(&magic, sizeof(magic), 1);
}
if (magic == data::SimpleDMatrix::kMagic) {
DMatrix *dmat = new data::SimpleDMatrix(&is);
if (!silent) {
LOG(CONSOLE) << dmat->Info().num_row_ << 'x' << dmat->Info().num_col_
<< " matrix with " << dmat->Info().num_nonzero_
<< " entries loaded from " << fname;
}
return dmat;
}
}
}
return nullptr;
}
DMatrix* DMatrix::Load(const std::string& uri,
bool silent,
bool load_row_split,
const std::string& file_format,
const size_t page_size) {
const std::string& file_format) {
std::string fname, cache_file;
size_t dlm_pos = uri.find('#');
if (dlm_pos != std::string::npos) {
@@ -682,35 +708,34 @@ DMatrix* DMatrix::Load(const std::string& uri,
// legacy handling of binary data loading
if (file_format == "auto" && npart == 1) {
int magic;
std::unique_ptr<dmlc::Stream> fi(dmlc::Stream::Create(fname.c_str(), "r", true));
if (fi != nullptr) {
common::PeekableInStream is(fi.get());
if (is.PeekRead(&magic, sizeof(magic)) == sizeof(magic)) {
if (!DMLC_IO_NO_ENDIAN_SWAP) {
dmlc::ByteSwap(&magic, sizeof(magic), 1);
}
if (magic == data::SimpleDMatrix::kMagic) {
DMatrix* dmat = new data::SimpleDMatrix(&is);
if (!silent) {
LOG(CONSOLE) << dmat->Info().num_row_ << 'x' << dmat->Info().num_col_ << " matrix with "
<< dmat->Info().num_nonzero_ << " entries loaded from " << uri;
}
return dmat;
}
}
DMatrix *loaded = TryLoadBinary(fname, silent);
if (loaded) {
return loaded;
}
}
std::unique_ptr<dmlc::Parser<uint32_t> > parser(
dmlc::Parser<uint32_t>::Create(fname.c_str(), partid, npart, file_format.c_str()));
data::FileAdapter adapter(parser.get());
DMatrix* dmat {nullptr};
try {
dmat = DMatrix::Create(&adapter, std::numeric_limits<float>::quiet_NaN(), 1,
cache_file, page_size);
} catch (dmlc::Error& e) {
if (cache_file.empty()) {
std::unique_ptr<dmlc::Parser<uint32_t>> parser(
dmlc::Parser<uint32_t>::Create(fname.c_str(), partid, npart,
file_format.c_str()));
data::FileAdapter adapter(parser.get());
dmat = DMatrix::Create(&adapter, std::numeric_limits<float>::quiet_NaN(),
1, cache_file);
} else {
data::FileIterator iter{fname, uint32_t(partid), uint32_t(npart),
file_format};
dmat = new data::SparsePageDMatrix{
&iter,
iter.Proxy(),
data::fileiter::Reset,
data::fileiter::Next,
std::numeric_limits<float>::quiet_NaN(),
1,
cache_file};
}
} catch (dmlc::Error &e) {
std::vector<std::string> splited = common::Split(fname, '#');
std::vector<std::string> args = common::Split(splited.front(), '?');
std::string format {file_format};
@@ -734,10 +759,6 @@ DMatrix* DMatrix::Load(const std::string& uri,
LOG(FATAL) << "Encountered parser error:\n" << e.what();
}
if (!silent) {
LOG(CONSOLE) << dmat->Info().num_row_ << 'x' << dmat->Info().num_col_ << " matrix with "
<< dmat->Info().num_nonzero_ << " entries loaded from " << uri;
}
/* sync up number of features after matrix loaded.
* partitioned data will fail the train/val validation check
* since partitioned data not knowing the real number of features. */
@@ -769,12 +790,19 @@ DMatrix *DMatrix::Create(DataIterHandle iter, DMatrixHandle proxy,
XGDMatrixCallbackNext *next, float missing,
int nthread,
int max_bin) {
#if defined(XGBOOST_USE_CUDA)
return new data::IterativeDeviceDMatrix(iter, proxy, reset, next, missing, nthread, max_bin);
#else
common::AssertGPUSupport();
return nullptr;
#endif
return new data::IterativeDeviceDMatrix(iter, proxy, reset, next, missing,
nthread, max_bin);
}
template <typename DataIterHandle, typename DMatrixHandle,
typename DataIterResetCallback, typename XGDMatrixCallbackNext>
DMatrix *DMatrix::Create(DataIterHandle iter, DMatrixHandle proxy,
DataIterResetCallback *reset,
XGDMatrixCallbackNext *next, float missing,
int32_t n_threads,
std::string cache) {
return new data::SparsePageDMatrix(iter, proxy, reset, next, missing, n_threads,
cache);
}
template DMatrix *DMatrix::Create<DataIterHandle, DMatrixHandle,
@@ -783,49 +811,42 @@ template DMatrix *DMatrix::Create<DataIterHandle, DMatrixHandle,
XGDMatrixCallbackNext *next, float missing, int nthread,
int max_bin);
template DMatrix *DMatrix::Create<DataIterHandle, DMatrixHandle,
DataIterResetCallback, XGDMatrixCallbackNext>(
DataIterHandle iter, DMatrixHandle proxy, DataIterResetCallback *reset,
XGDMatrixCallbackNext *next, float missing, int32_t n_threads, std::string);
template <typename AdapterT>
DMatrix* DMatrix::Create(AdapterT* adapter, float missing, int nthread,
const std::string& cache_prefix, size_t page_size) {
if (cache_prefix.length() == 0) {
// Data split mode is fixed to be row right now.
return new data::SimpleDMatrix(adapter, missing, nthread);
} else {
#if DMLC_ENABLE_STD_THREAD
return new data::SparsePageDMatrix(adapter, missing, nthread, cache_prefix,
page_size);
#else
LOG(FATAL) << "External memory is not enabled in mingw";
return nullptr;
#endif // DMLC_ENABLE_STD_THREAD
}
const std::string& cache_prefix) {
return new data::SimpleDMatrix(adapter, missing, nthread);
}
template DMatrix* DMatrix::Create<data::DenseAdapter>(
data::DenseAdapter* adapter, float missing, int nthread,
const std::string& cache_prefix, size_t page_size);
const std::string& cache_prefix);
template DMatrix* DMatrix::Create<data::ArrayAdapter>(
data::ArrayAdapter* adapter, float missing, int nthread,
const std::string& cache_prefix, size_t page_size);
const std::string& cache_prefix);
template DMatrix* DMatrix::Create<data::CSRAdapter>(
data::CSRAdapter* adapter, float missing, int nthread,
const std::string& cache_prefix, size_t page_size);
const std::string& cache_prefix);
template DMatrix* DMatrix::Create<data::CSCAdapter>(
data::CSCAdapter* adapter, float missing, int nthread,
const std::string& cache_prefix, size_t page_size);
const std::string& cache_prefix);
template DMatrix* DMatrix::Create<data::DataTableAdapter>(
data::DataTableAdapter* adapter, float missing, int nthread,
const std::string& cache_prefix, size_t page_size);
const std::string& cache_prefix);
template DMatrix* DMatrix::Create<data::FileAdapter>(
data::FileAdapter* adapter, float missing, int nthread,
const std::string& cache_prefix, size_t page_size);
const std::string& cache_prefix);
template DMatrix* DMatrix::Create<data::CSRArrayAdapter>(
data::CSRArrayAdapter* adapter, float missing, int nthread,
const std::string& cache_prefix, size_t page_size);
const std::string& cache_prefix);
template DMatrix *
DMatrix::Create(data::IteratorAdapter<DataIterHandle, XGBCallbackDataIterNext,
XGBoostBatchCSR> *adapter,
float missing, int nthread, const std::string &cache_prefix,
size_t page_size);
float missing, int nthread, const std::string &cache_prefix);
SparsePage SparsePage::GetTranspose(int num_columns) const {
SparsePage transpose;
@@ -1044,6 +1065,8 @@ SparsePage::Push(const data::ArrayAdapterBatch& batch, float missing, int nthrea
template uint64_t
SparsePage::Push(const data::CSRAdapterBatch& batch, float missing, int nthread);
template uint64_t
SparsePage::Push(const data::CSRArrayAdapterBatch& batch, float missing, int nthread);
template uint64_t
SparsePage::Push(const data::CSCAdapterBatch& batch, float missing, int nthread);
template uint64_t
SparsePage::Push(const data::DataTableAdapterBatch& batch, float missing, int nthread);

View File

@@ -167,7 +167,7 @@ void MetaInfo::SetInfo(const char * c_key, std::string const& interface_str) {
template <typename AdapterT>
DMatrix* DMatrix::Create(AdapterT* adapter, float missing, int nthread,
const std::string& cache_prefix, size_t page_size) {
const std::string& cache_prefix) {
CHECK_EQ(cache_prefix.size(), 0)
<< "Device memory construction is not currently supported with external "
"memory.";
@@ -176,8 +176,8 @@ DMatrix* DMatrix::Create(AdapterT* adapter, float missing, int nthread,
template DMatrix* DMatrix::Create<data::CudfAdapter>(
data::CudfAdapter* adapter, float missing, int nthread,
const std::string& cache_prefix, size_t page_size);
const std::string& cache_prefix);
template DMatrix* DMatrix::Create<data::CupyAdapter>(
data::CupyAdapter* adapter, float missing, int nthread,
const std::string& cache_prefix, size_t page_size);
const std::string& cache_prefix);
} // namespace xgboost

View File

@@ -122,6 +122,7 @@ EllpackPageImpl::EllpackPageImpl(DMatrix* dmat, const BatchParam& param)
dmat->Info().feature_types.SetDevice(param.gpu_id);
auto ft = dmat->Info().feature_types.ConstDeviceSpan();
monitor_.Start("BinningCompression");
CHECK(dmat->SingleColBlock());
for (const auto& batch : dmat->GetBatches<SparsePage>()) {
CreateHistIndices(param.gpu_id, batch, ft);
}
@@ -301,9 +302,8 @@ struct CopyPage {
// The number of elements to skip.
size_t offset;
CopyPage(EllpackPageImpl* dst, EllpackPageImpl* src, size_t offset)
: cbw{dst->NumSymbols()},
dst_data_d{dst->gidx_buffer.DevicePointer()},
CopyPage(EllpackPageImpl *dst, EllpackPageImpl const *src, size_t offset)
: cbw{dst->NumSymbols()}, dst_data_d{dst->gidx_buffer.DevicePointer()},
src_iterator_d{src->gidx_buffer.DevicePointer(), src->NumSymbols()},
offset(offset) {}
@@ -314,7 +314,8 @@ struct CopyPage {
};
// Copy the data from the given EllpackPage to the current page.
size_t EllpackPageImpl::Copy(int device, EllpackPageImpl* page, size_t offset) {
size_t EllpackPageImpl::Copy(int device, EllpackPageImpl const *page,
size_t offset) {
monitor_.Start("Copy");
size_t num_elements = page->n_rows * page->row_stride;
CHECK_EQ(row_stride, page->row_stride);
@@ -351,7 +352,7 @@ struct CompactPage {
size_t base_rowid;
size_t row_stride;
CompactPage(EllpackPageImpl* dst, EllpackPageImpl* src,
CompactPage(EllpackPageImpl* dst, EllpackPageImpl const* src,
common::Span<size_t> row_indexes)
: cbw{dst->NumSymbols()},
dst_data_d{dst->gidx_buffer.DevicePointer()},
@@ -374,7 +375,7 @@ struct CompactPage {
};
// Compacts the data from the given EllpackPage into the current page.
void EllpackPageImpl::Compact(int device, EllpackPageImpl* page,
void EllpackPageImpl::Compact(int device, EllpackPageImpl const* page,
common::Span<size_t> row_indexes) {
monitor_.Start("Compact");
CHECK_EQ(row_stride, page->row_stride);
@@ -459,7 +460,7 @@ void EllpackPageImpl::CreateHistIndices(int device,
gidx_buffer.DevicePointer(), row_ptrs.data().get(),
entries_d.data().get(), device_accessor.gidx_fvalue_map.data(),
device_accessor.feature_segments.data(), feature_types,
row_batch.base_rowid + batch_row_begin, batch_nrows, row_stride,
batch_row_begin, batch_nrows, row_stride,
null_gidx_value);
}
}

View File

@@ -164,7 +164,7 @@ class EllpackPageImpl {
* @param offset The number of elements to skip before copying.
* @returns The number of elements copied.
*/
size_t Copy(int device, EllpackPageImpl* page, size_t offset);
size_t Copy(int device, EllpackPageImpl const *page, size_t offset);
/*! \brief Compact the given ELLPACK page into the current page.
*
@@ -172,7 +172,7 @@ class EllpackPageImpl {
* @param page The ELLPACK page to compact from.
* @param row_indexes Row indexes for the compacted page.
*/
void Compact(int device, EllpackPageImpl* page, common::Span<size_t> row_indexes);
void Compact(int device, EllpackPageImpl const* page, common::Span<size_t> row_indexes);
/*! \return Number of instances in the page. */

View File

@@ -1,24 +0,0 @@
/*!
* Copyright 2019 XGBoost contributors
*/
#ifndef XGBOOST_USE_CUDA
#include <dmlc/base.h>
#if DMLC_ENABLE_STD_THREAD
#include "ellpack_page_source.h"
#include <xgboost/data.h>
namespace xgboost {
namespace data {
EllpackPageSource::EllpackPageSource(DMatrix* dmat,
const std::string& cache_info,
const BatchParam& param) noexcept(false) {
LOG(FATAL)
<< "Internal Error: "
"XGBoost is not compiled with CUDA but EllpackPageSource is required";
}
} // namespace data
} // namespace xgboost
#endif // DMLC_ENABLE_STD_THREAD
#endif // XGBOOST_USE_CUDA

View File

@@ -1,89 +1,24 @@
/*!
* Copyright 2019 XGBoost contributors
* Copyright 2019-2021 XGBoost contributors
*/
#include <memory>
#include <utility>
#include "../common/hist_util.cuh"
#include "ellpack_page.cuh"
#include "ellpack_page_source.h"
#include "sparse_page_source.h"
namespace xgboost {
namespace data {
// Build the quantile sketch across the whole input data, then use the histogram cuts to compress
// each CSR page, and write the accumulated ELLPACK pages to disk.
EllpackPageSource::EllpackPageSource(DMatrix* dmat,
const std::string& cache_info,
const BatchParam& param) noexcept(false) {
cache_info_ = ParseCacheInfo(cache_info, kPageType_);
for (auto file : cache_info_.name_shards) {
CheckCacheFileExists(file);
}
if (param.gpu_page_size > 0) {
page_size_ = param.gpu_page_size;
}
monitor_.Init("ellpack_page_source");
dh::safe_cuda(cudaSetDevice(param.gpu_id));
monitor_.Start("Quantiles");
size_t row_stride = GetRowStride(dmat);
auto cuts = common::DeviceSketch(param.gpu_id, dmat, param.max_bin);
monitor_.Stop("Quantiles");
monitor_.Start("WriteEllpackPages");
WriteEllpackPages(param.gpu_id, dmat, cuts, cache_info, row_stride);
monitor_.Stop("WriteEllpackPages");
external_prefetcher_.reset(
new ExternalMemoryPrefetcher<EllpackPage>(cache_info_));
}
// Compress each CSR page to ELLPACK, and write the accumulated pages to disk.
void EllpackPageSource::WriteEllpackPages(int device, DMatrix* dmat,
const common::HistogramCuts& cuts,
const std::string& cache_info,
size_t row_stride) const {
auto cinfo = ParseCacheInfo(cache_info, kPageType_);
const size_t extra_buffer_capacity = 6;
SparsePageWriter<EllpackPage> writer(cinfo.name_shards, cinfo.format_shards,
extra_buffer_capacity);
std::shared_ptr<EllpackPage> page;
SparsePage temp_host_page;
writer.Alloc(&page);
auto* impl = page->Impl();
auto ft = dmat->Info().feature_types.ConstDeviceSpan();
size_t bytes_write = 0;
double tstart = dmlc::GetTime();
for (const auto& batch : dmat->GetBatches<SparsePage>()) {
temp_host_page.Push(batch);
size_t mem_cost_bytes =
EllpackPageImpl::MemCostBytes(temp_host_page.Size(), row_stride, cuts);
if (mem_cost_bytes >= page_size_) {
bytes_write += mem_cost_bytes;
*impl = EllpackPageImpl(device, cuts, temp_host_page, dmat->IsDense(),
row_stride, ft);
writer.PushWrite(std::move(page));
writer.Alloc(&page);
impl = page->Impl();
temp_host_page.Clear();
double tdiff = dmlc::GetTime() - tstart;
LOG(INFO) << "Writing " << kPageType_ << " to " << cache_info << " in "
<< ((bytes_write >> 20UL) / tdiff) << " MB/s, "
<< (bytes_write >> 20UL) << " written";
}
}
if (temp_host_page.Size() != 0) {
*impl = EllpackPageImpl(device, cuts, temp_host_page, dmat->IsDense(),
row_stride, ft);
writer.PushWrite(std::move(page));
void EllpackPageSource::Fetch() {
if (!this->ReadCache()) {
auto const &csr = source_->Page();
this->page_.reset(new EllpackPage{});
auto *impl = this->page_->Impl();
*impl = EllpackPageImpl(param_.gpu_id, *cuts_, *csr, is_dense_, row_stride_,
feature_types_);
page_->SetBaseRowId(csr->base_rowid);
this->WriteCache();
}
}
} // namespace data
} // namespace xgboost

View File

@@ -1,5 +1,5 @@
/*!
* Copyright 2019 by XGBoost Contributors
* Copyright 2019-2021 by XGBoost Contributors
*/
#ifndef XGBOOST_DATA_ELLPACK_PAGE_SOURCE_H_
@@ -8,57 +8,44 @@
#include <xgboost/data.h>
#include <memory>
#include <string>
#include <utility>
#include "../common/timer.h"
#include "../common/common.h"
#include "../common/hist_util.h"
#include "sparse_page_source.h"
namespace xgboost {
namespace data {
/*!
* \brief External memory data source for ELLPACK format.
*
*/
class EllpackPageSource {
class EllpackPageSource : public PageSourceIncMixIn<EllpackPage> {
bool is_dense_;
size_t row_stride_;
BatchParam param_;
common::Span<FeatureType const> feature_types_;
std::unique_ptr<common::HistogramCuts> cuts_;
public:
/*!
* \brief Create source from cache files the cache_prefix.
* \param cache_prefix The prefix of cache we want to solve.
*/
explicit EllpackPageSource(DMatrix* dmat,
const std::string& cache_info,
const BatchParam& param) noexcept(false);
BatchSet<EllpackPage> GetBatchSet() {
auto begin_iter = BatchIterator<EllpackPage>(
new SparseBatchIteratorImpl<ExternalMemoryPrefetcher<EllpackPage>,
EllpackPage>(external_prefetcher_.get()));
return BatchSet<EllpackPage>(begin_iter);
EllpackPageSource(
float missing, int nthreads, bst_feature_t n_features, size_t n_batches,
std::shared_ptr<Cache> cache, BatchParam param,
std::unique_ptr<common::HistogramCuts> cuts, bool is_dense,
size_t row_stride, common::Span<FeatureType const> feature_types,
std::shared_ptr<SparsePageSource> source)
: PageSourceIncMixIn(missing, nthreads, n_features, n_batches, cache),
is_dense_{is_dense}, row_stride_{row_stride}, param_{param},
feature_types_{feature_types}, cuts_{std::move(cuts)} {
this->source_ = source;
this->Fetch();
}
~EllpackPageSource() {
external_prefetcher_.reset();
for (auto file : cache_info_.name_shards) {
TryDeleteCacheFile(file);
}
}
private:
void WriteEllpackPages(int device, DMatrix* dmat,
const common::HistogramCuts& cuts,
const std::string& cache_info,
size_t row_stride) const;
/*! \brief The page type string for ELLPACK. */
const std::string kPageType_{".ellpack.page"};
size_t page_size_{DMatrix::kPageSize};
common::Monitor monitor_;
std::unique_ptr<ExternalMemoryPrefetcher<EllpackPage>> external_prefetcher_;
CacheInfo cache_info_;
void Fetch() final;
};
#if !defined(XGBOOST_USE_CUDA)
inline void EllpackPageSource::Fetch() {
common::AssertGPUSupport();
}
#endif // !defined(XGBOOST_USE_CUDA)
} // namespace data
} // namespace xgboost

115
src/data/file_iterator.h Normal file
View File

@@ -0,0 +1,115 @@
/*!
* Copyright 2021 XGBoost contributors
*/
#ifndef XGBOOST_DATA_FILE_ITERATOR_H_
#define XGBOOST_DATA_FILE_ITERATOR_H_
#include <string>
#include <memory>
#include <vector>
#include <utility>
#include "dmlc/data.h"
#include "xgboost/c_api.h"
#include "xgboost/json.h"
#include "array_interface.h"
namespace xgboost {
namespace data {
/**
* An iterator for implementing external memory support with file inputs. Users of
* external memory are encouraged to define their own file parsers/loaders so this one is
* just here for compatibility with old versions of XGBoost and CLI interface.
*/
class FileIterator {
// uri of input file, encodes parameters about whether it's 1-based index etc. dmlc
// parser will decode these information.
std::string uri_;
// Equals to rank_id in distributed training, used to split file into parts for each
// worker.
uint32_t part_idx_;
// Equals to total number of workers.
uint32_t n_parts_;
// Format of the input file, like "libsvm".
std::string type_;
DMatrixHandle proxy_;
std::unique_ptr<dmlc::Parser<uint32_t>> parser_;
// Temporary reference to stage the data.
dmlc::RowBlock<uint32_t, float> row_block_;
// Storage for the array interface strings.
std::string indptr_;
std::string values_;
std::string indices_;
public:
FileIterator(std::string uri, unsigned part_index, unsigned num_parts,
std::string type)
: uri_{std::move(uri)}, part_idx_{part_index}, n_parts_{num_parts},
type_{std::move(type)} {
XGProxyDMatrixCreate(&proxy_);
}
~FileIterator() {
XGDMatrixFree(proxy_);
}
int Next() {
CHECK(parser_);
if (parser_->Next()) {
row_block_ = parser_->Value();
indptr_ = MakeArrayInterface(row_block_.offset, row_block_.size + 1);
values_ = MakeArrayInterface(row_block_.value,
row_block_.offset[row_block_.size]);
indices_ = MakeArrayInterface(row_block_.index,
row_block_.offset[row_block_.size]);
size_t n_columns = *std::max_element(
row_block_.index,
row_block_.index + row_block_.offset[row_block_.size]);
// dmlc parser converts 1-based indexing back to 0-based indexing so we can ignore
// this condition and just add 1 to n_columns
n_columns += 1;
XGProxyDMatrixSetDataCSR(proxy_, indptr_.c_str(), indices_.c_str(),
values_.c_str(), n_columns);
if (row_block_.label) {
XGDMatrixSetDenseInfo(proxy_, "label", row_block_.label, row_block_.size, 1);
}
if (row_block_.qid) {
XGDMatrixSetDenseInfo(proxy_, "qid", row_block_.qid, row_block_.size, 1);
}
if (row_block_.weight) {
XGDMatrixSetDenseInfo(proxy_, "weight", row_block_.weight, row_block_.size, 1);
}
// Continue iteration
return true;
} else {
// Stop iteration
return false;
}
}
auto Proxy() -> decltype(proxy_) { return proxy_; }
void Reset() {
CHECK(!type_.empty());
parser_.reset(dmlc::Parser<uint32_t>::Create(uri_.c_str(), part_idx_,
n_parts_, type_.c_str()));
}
};
namespace fileiter {
inline void Reset(DataIterHandle self) {
static_cast<FileIterator*>(self)->Reset();
}
inline int Next(DataIterHandle self) {
return static_cast<FileIterator*>(self)->Next();
}
} // namespace fileiter
} // namespace data
} // namespace xgboost
#endif // XGBOOST_DATA_FILE_ITERATOR_H_

View File

@@ -143,7 +143,7 @@ void IterativeDeviceDMatrix::Initialize(DataIterHandle iter_handle, float missin
proxy->Info().num_row_ = num_rows();
proxy->Info().num_col_ = cols;
if (batches != 1) {
this->info_.Extend(std::move(proxy->Info()), false);
this->info_.Extend(std::move(proxy->Info()), false, true);
}
n_batches_for_verification++;
}
@@ -163,7 +163,7 @@ void IterativeDeviceDMatrix::Initialize(DataIterHandle iter_handle, float missin
BatchSet<EllpackPage> IterativeDeviceDMatrix::GetEllpackBatches(const BatchParam& param) {
CHECK(page_);
auto begin_iter =
BatchIterator<EllpackPage>(new SimpleBatchIteratorImpl<EllpackPage>(page_.get()));
BatchIterator<EllpackPage>(new SimpleBatchIteratorImpl<EllpackPage>(page_));
return BatchSet<EllpackPage>(begin_iter);
}
} // namespace data

View File

@@ -14,6 +14,7 @@
#include "xgboost/data.h"
#include "xgboost/c_api.h"
#include "proxy_dmatrix.h"
#include "simple_batch_iterator.h"
namespace xgboost {
namespace data {
@@ -36,9 +37,10 @@ class IterativeDeviceDMatrix : public DMatrix {
XGDMatrixCallbackNext *next, float missing,
int nthread, int max_bin)
: proxy_{proxy}, reset_{reset}, next_{next} {
batch_param_ = BatchParam{0, max_bin, 0};
batch_param_ = BatchParam{0, max_bin};
this->Initialize(iter, missing, nthread);
}
~IterativeDeviceDMatrix() override = default;
bool EllpackExists() const override { return true; }
bool SparsePageExists() const override { return false; }
@@ -74,6 +76,18 @@ class IterativeDeviceDMatrix : public DMatrix {
return info_;
}
};
#if !defined(XGBOOST_USE_CUDA)
inline void IterativeDeviceDMatrix::Initialize(DataIterHandle iter, float missing, int nthread) {
common::AssertGPUSupport();
}
inline BatchSet<EllpackPage> IterativeDeviceDMatrix::GetEllpackBatches(const BatchParam& param) {
common::AssertGPUSupport();
auto begin_iter =
BatchIterator<EllpackPage>(new SimpleBatchIteratorImpl<EllpackPage>(page_));
return BatchSet<EllpackPage>(BatchIterator<EllpackPage>(begin_iter));
}
#endif // !defined(XGBOOST_USE_CUDA)
} // namespace data
} // namespace xgboost

View File

@@ -1,5 +1,5 @@
/*!
* Copyright 2020 XGBoost contributors
* Copyright 2020-2021 XGBoost contributors
*/
#ifndef XGBOOST_DATA_PROXY_DMATRIX_H_
#define XGBOOST_DATA_PROXY_DMATRIX_H_

View File

@@ -1,10 +1,13 @@
/*!
* Copyright 2019 XGBoost contributors
* Copyright 2019-2021 XGBoost contributors
*/
#ifndef XGBOOST_DATA_SIMPLE_BATCH_ITERATOR_H_
#define XGBOOST_DATA_SIMPLE_BATCH_ITERATOR_H_
#include <xgboost/data.h>
#include <memory>
#include <utility>
#include "xgboost/data.h"
namespace xgboost {
namespace data {
@@ -12,20 +15,21 @@ namespace data {
template<typename T>
class SimpleBatchIteratorImpl : public BatchIteratorImpl<T> {
public:
explicit SimpleBatchIteratorImpl(T* page) : page_(page) {}
T& operator*() override {
CHECK(page_ != nullptr);
return *page_;
}
explicit SimpleBatchIteratorImpl(std::shared_ptr<T const> page) : page_(std::move(page)) {}
const T& operator*() const override {
CHECK(page_ != nullptr);
return *page_;
}
void operator++() override { page_ = nullptr; }
SimpleBatchIteratorImpl &operator++() override {
page_ = nullptr;
return *this;
}
bool AtEnd() const override { return page_ == nullptr; }
std::shared_ptr<T const> Page() const override { return page_; }
private:
T* page_{nullptr};
std::shared_ptr<T const> page_{nullptr};
};
} // namespace data

View File

@@ -1,5 +1,5 @@
/*!
* Copyright 2014~2020 by Contributors
* Copyright 2014~2021 by Contributors
* \file simple_dmatrix.cc
* \brief the input data structure for gradient boosting
* \author Tianqi Chen
@@ -27,7 +27,7 @@ const MetaInfo& SimpleDMatrix::Info() const { return info_; }
DMatrix* SimpleDMatrix::Slice(common::Span<int32_t const> ridxs) {
auto out = new SimpleDMatrix;
SparsePage& out_page = out->sparse_page_;
SparsePage& out_page = *out->sparse_page_;
for (auto const &page : this->GetBatches<SparsePage>()) {
auto batch = page.GetView();
auto& h_data = out_page.data.HostVector();
@@ -48,17 +48,17 @@ DMatrix* SimpleDMatrix::Slice(common::Span<int32_t const> ridxs) {
BatchSet<SparsePage> SimpleDMatrix::GetRowBatches() {
// since csr is the default data structure so `source_` is always available.
auto begin_iter = BatchIterator<SparsePage>(
new SimpleBatchIteratorImpl<SparsePage>(&sparse_page_));
new SimpleBatchIteratorImpl<SparsePage>(sparse_page_));
return BatchSet<SparsePage>(begin_iter);
}
BatchSet<CSCPage> SimpleDMatrix::GetColumnBatches() {
// column page doesn't exist, generate it
if (!column_page_) {
column_page_.reset(new CSCPage(sparse_page_.GetTranspose(info_.num_col_)));
column_page_.reset(new CSCPage(sparse_page_->GetTranspose(info_.num_col_)));
}
auto begin_iter =
BatchIterator<CSCPage>(new SimpleBatchIteratorImpl<CSCPage>(column_page_.get()));
BatchIterator<CSCPage>(new SimpleBatchIteratorImpl<CSCPage>(column_page_));
return BatchSet<CSCPage>(begin_iter);
}
@@ -66,11 +66,11 @@ BatchSet<SortedCSCPage> SimpleDMatrix::GetSortedColumnBatches() {
// Sorted column page doesn't exist, generate it
if (!sorted_column_page_) {
sorted_column_page_.reset(
new SortedCSCPage(sparse_page_.GetTranspose(info_.num_col_)));
new SortedCSCPage(sparse_page_->GetTranspose(info_.num_col_)));
sorted_column_page_->SortRows();
}
auto begin_iter = BatchIterator<SortedCSCPage>(
new SimpleBatchIteratorImpl<SortedCSCPage>(sorted_column_page_.get()));
new SimpleBatchIteratorImpl<SortedCSCPage>(sorted_column_page_));
return BatchSet<SortedCSCPage>(begin_iter);
}
@@ -86,7 +86,7 @@ BatchSet<EllpackPage> SimpleDMatrix::GetEllpackBatches(const BatchParam& param)
batch_param_ = param;
}
auto begin_iter =
BatchIterator<EllpackPage>(new SimpleBatchIteratorImpl<EllpackPage>(ellpack_page_.get()));
BatchIterator<EllpackPage>(new SimpleBatchIteratorImpl<EllpackPage>(ellpack_page_));
return BatchSet<EllpackPage>(begin_iter);
}
@@ -100,7 +100,7 @@ BatchSet<GHistIndexMatrix> SimpleDMatrix::GetGradientIndex(const BatchParam& par
batch_param_ = param;
}
auto begin_iter = BatchIterator<GHistIndexMatrix>(
new SimpleBatchIteratorImpl<GHistIndexMatrix>(gradient_index_.get()));
new SimpleBatchIteratorImpl<GHistIndexMatrix>(gradient_index_));
return BatchSet<GHistIndexMatrix>(begin_iter);
}
@@ -110,8 +110,8 @@ SimpleDMatrix::SimpleDMatrix(AdapterT* adapter, float missing, int nthread) {
uint64_t default_max = std::numeric_limits<uint64_t>::max();
uint64_t last_group_id = default_max;
bst_uint group_size = 0;
auto& offset_vec = sparse_page_.offset.HostVector();
auto& data_vec = sparse_page_.data.HostVector();
auto& offset_vec = sparse_page_->offset.HostVector();
auto& data_vec = sparse_page_->data.HostVector();
uint64_t inferred_num_columns = 0;
uint64_t total_batch_size = 0;
// batch_size is either number of rows or cols, depending on data layout
@@ -120,7 +120,7 @@ SimpleDMatrix::SimpleDMatrix(AdapterT* adapter, float missing, int nthread) {
// Iterate over batches of input data
while (adapter->Next()) {
auto& batch = adapter->Value();
auto batch_max_columns = sparse_page_.Push(batch, missing, nthread);
auto batch_max_columns = sparse_page_->Push(batch, missing, nthread);
inferred_num_columns = std::max(batch_max_columns, inferred_num_columns);
total_batch_size += batch.Size();
// Append meta information if available
@@ -203,8 +203,8 @@ SimpleDMatrix::SimpleDMatrix(dmlc::Stream* in_stream) {
CHECK(in_stream->Read(&tmagic)) << "invalid input file format";
CHECK_EQ(tmagic, kMagic) << "invalid format, magic number mismatch";
info_.LoadBinary(in_stream);
in_stream->Read(&sparse_page_.offset.HostVector());
in_stream->Read(&sparse_page_.data.HostVector());
in_stream->Read(&sparse_page_->offset.HostVector());
in_stream->Read(&sparse_page_->data.HostVector());
}
void SimpleDMatrix::SaveToLocalFile(const std::string& fname) {
@@ -212,8 +212,8 @@ void SimpleDMatrix::SaveToLocalFile(const std::string& fname) {
int tmagic = kMagic;
fo->Write(tmagic);
info_.SaveBinary(fo.get());
fo->Write(sparse_page_.offset.HostVector());
fo->Write(sparse_page_.data.HostVector());
fo->Write(sparse_page_->offset.HostVector());
fo->Write(sparse_page_->data.HostVector());
}
template SimpleDMatrix::SimpleDMatrix(DenseAdapter* adapter, float missing,

View File

@@ -28,7 +28,7 @@ SimpleDMatrix::SimpleDMatrix(AdapterT* adapter, float missing, int nthread) {
CHECK(!adapter->Next());
info_.num_nonzero_ = CopyToSparsePage(adapter->Value(), adapter->DeviceIdx(),
missing, &sparse_page_);
missing, sparse_page_.get());
info_.num_col_ = adapter->NumColumns();
info_.num_row_ = adapter->NumRows();
// Synchronise worker columns

View File

@@ -1,5 +1,5 @@
/*!
* Copyright 2015 by Contributors
* Copyright 2015-2021 by Contributors
* \file simple_dmatrix.h
* \brief In-memory version of DMatrix.
* \author Tianqi Chen
@@ -47,11 +47,12 @@ class SimpleDMatrix : public DMatrix {
BatchSet<GHistIndexMatrix> GetGradientIndex(const BatchParam& param) override;
MetaInfo info_;
SparsePage sparse_page_; // Primary storage type
std::unique_ptr<CSCPage> column_page_;
std::unique_ptr<SortedCSCPage> sorted_column_page_;
std::unique_ptr<EllpackPage> ellpack_page_;
std::unique_ptr<GHistIndexMatrix> gradient_index_;
// Primary storage type
std::shared_ptr<SparsePage> sparse_page_ = std::make_shared<SparsePage>();
std::shared_ptr<CSCPage> column_page_;
std::shared_ptr<SortedCSCPage> sorted_column_page_;
std::shared_ptr<EllpackPage> ellpack_page_;
std::shared_ptr<GHistIndexMatrix> gradient_index_;
BatchParam batch_param_;
bool EllpackExists() const override {

View File

@@ -1,59 +1,147 @@
/*!
* Copyright 2014-2020 by Contributors
* Copyright 2014-2021 by Contributors
* \file sparse_page_dmatrix.cc
* \brief The external memory version of Page Iterator.
* \author Tianqi Chen
*/
#include <dmlc/base.h>
#include <dmlc/timer.h>
#if DMLC_ENABLE_STD_THREAD
#include "./sparse_page_dmatrix.h"
#include "./simple_batch_iterator.h"
#include "gradient_index.h"
namespace xgboost {
namespace data {
MetaInfo& SparsePageDMatrix::Info() {
return row_source_->info;
MetaInfo &SparsePageDMatrix::Info() { return info_; }
const MetaInfo &SparsePageDMatrix::Info() const { return info_; }
SparsePageDMatrix::SparsePageDMatrix(DataIterHandle iter_handle, DMatrixHandle proxy_handle,
DataIterResetCallback *reset,
XGDMatrixCallbackNext *next, float missing,
int32_t nthreads, std::string cache_prefix)
: proxy_{proxy_handle}, iter_{iter_handle}, reset_{reset}, next_{next}, missing_{missing},
nthreads_{nthreads}, cache_prefix_{std::move(cache_prefix)} {
cache_prefix_ = cache_prefix_.empty() ? "DMatrix" : cache_prefix_;
if (rabit::IsDistributed()) {
cache_prefix_ += ("-r" + std::to_string(rabit::GetRank()));
}
DMatrixProxy *proxy = MakeProxy(proxy_);
auto iter = DataIterProxy<DataIterResetCallback, XGDMatrixCallbackNext>{
iter_, reset_, next_};
uint32_t n_batches = 0;
size_t n_features = 0;
size_t n_samples = 0;
size_t nnz = 0;
auto num_rows = [&]() {
return HostAdapterDispatch(
proxy, [](auto const &value) { return value.NumRows(); });
};
auto num_cols = [&]() {
return HostAdapterDispatch(
proxy, [](auto const &value) { return value.NumCols(); });
};
// the proxy is iterated together with the sparse page source so we can obtain all
// information in 1 pass.
for (auto const &page : this->GetRowBatchesImpl()) {
this->info_.Extend(std::move(proxy->Info()), false, false);
n_features = std::max(n_features, num_cols());
n_samples += num_rows();
nnz += page.data.Size();
n_batches++;
}
iter.Reset();
this->n_batches_ = n_batches;
this->info_.num_row_ = n_samples;
this->info_.num_col_ = n_features;
this->info_.num_nonzero_ = nnz;
rabit::Allreduce<rabit::op::Max>(&info_.num_col_, 1);
CHECK_NE(info_.num_col_, 0);
}
const MetaInfo& SparsePageDMatrix::Info() const {
return row_source_->info;
void SparsePageDMatrix::InitializeSparsePage() {
auto id = MakeCache(this, ".row.page", cache_prefix_, &cache_info_);
// Don't use proxy DMatrix once this is already initialized, this allows users to
// release the iterator and data.
if (cache_info_.at(id)->written) {
CHECK(sparse_page_source_);
sparse_page_source_->Reset();
return;
}
auto iter = DataIterProxy<DataIterResetCallback, XGDMatrixCallbackNext>{
iter_, reset_, next_};
DMatrixProxy *proxy = MakeProxy(proxy_);
sparse_page_source_.reset(); // clear before creating new one to prevent conflicts.
sparse_page_source_ = std::make_shared<SparsePageSource>(
iter, proxy, this->missing_, this->nthreads_, this->info_.num_col_,
this->n_batches_, cache_info_.at(id));
}
BatchSet<SparsePage> SparsePageDMatrix::GetRowBatchesImpl() {
this->InitializeSparsePage();
auto begin_iter = BatchIterator<SparsePage>(sparse_page_source_);
return BatchSet<SparsePage>(BatchIterator<SparsePage>(begin_iter));
}
BatchSet<SparsePage> SparsePageDMatrix::GetRowBatches() {
return row_source_->GetBatchSet();
return this->GetRowBatchesImpl();
}
BatchSet<CSCPage> SparsePageDMatrix::GetColumnBatches() {
// Lazily instantiate
auto id = MakeCache(this, ".col.page", cache_prefix_, &cache_info_);
CHECK_NE(this->Info().num_col_, 0);
this->InitializeSparsePage();
if (!column_source_) {
column_source_.reset(new CSCPageSource(this, cache_info_));
column_source_ = std::make_shared<CSCPageSource>(
this->missing_, this->nthreads_, this->Info().num_col_,
this->n_batches_, cache_info_.at(id), sparse_page_source_);
} else {
column_source_->Reset();
}
return column_source_->GetBatchSet();
auto begin_iter = BatchIterator<CSCPage>(column_source_);
return BatchSet<CSCPage>(BatchIterator<CSCPage>(begin_iter));
}
BatchSet<SortedCSCPage> SparsePageDMatrix::GetSortedColumnBatches() {
// Lazily instantiate
auto id = MakeCache(this, ".sorted.col.page", cache_prefix_, &cache_info_);
CHECK_NE(this->Info().num_col_, 0);
this->InitializeSparsePage();
if (!sorted_column_source_) {
sorted_column_source_.reset(new SortedCSCPageSource(this, cache_info_));
sorted_column_source_ = std::make_shared<SortedCSCPageSource>(
this->missing_, this->nthreads_, this->Info().num_col_,
this->n_batches_, cache_info_.at(id), sparse_page_source_);
} else {
sorted_column_source_->Reset();
}
return sorted_column_source_->GetBatchSet();
auto begin_iter = BatchIterator<SortedCSCPage>(sorted_column_source_);
return BatchSet<SortedCSCPage>(BatchIterator<SortedCSCPage>(begin_iter));
}
BatchSet<EllpackPage> SparsePageDMatrix::GetEllpackBatches(const BatchParam& param) {
CHECK_GE(param.gpu_id, 0);
BatchSet<GHistIndexMatrix> SparsePageDMatrix::GetGradientIndex(const BatchParam& param) {
CHECK_GE(param.max_bin, 2);
// Lazily instantiate
if (!ellpack_source_ || (batch_param_ != param && param != BatchParam{})) {
ellpack_source_.reset(new EllpackPageSource(this, cache_info_, param));
// External memory is not support
if (!ghist_index_source_ || (param != batch_param_ && param != BatchParam{})) {
this->InitializeSparsePage();
ghist_index_source_.reset(new GHistIndexMatrix{this, param.max_bin});
batch_param_ = param;
}
return ellpack_source_->GetBatchSet();
this->InitializeSparsePage();
auto begin_iter = BatchIterator<GHistIndexMatrix>(
new SimpleBatchIteratorImpl<GHistIndexMatrix>(ghist_index_source_));
return BatchSet<GHistIndexMatrix>(begin_iter);
}
#if !defined(XGBOOST_USE_CUDA)
BatchSet<EllpackPage> SparsePageDMatrix::GetEllpackBatches(const BatchParam& param) {
common::AssertGPUSupport();
auto begin_iter = BatchIterator<EllpackPage>(ellpack_page_source_);
return BatchSet<EllpackPage>(BatchIterator<EllpackPage>(begin_iter));
}
#endif // !defined(XGBOOST_USE_CUDA)
} // namespace data
} // namespace xgboost
#endif // DMLC_ENABLE_STD_THREAD

View File

@@ -0,0 +1,46 @@
/*!
* Copyright 2021 XGBoost contributors
*/
#include "sparse_page_source.h"
#include "../common/hist_util.cuh"
#include "ellpack_page.cuh"
#include "sparse_page_dmatrix.h"
namespace xgboost {
namespace data {
BatchSet<EllpackPage> SparsePageDMatrix::GetEllpackBatches(const BatchParam& param) {
CHECK_GE(param.gpu_id, 0);
CHECK_GE(param.max_bin, 2);
auto id = MakeCache(this, ".ellpack.page", cache_prefix_, &cache_info_);
size_t row_stride = 0;
this->InitializeSparsePage();
if (!cache_info_.at(id)->written || (batch_param_ != param && param != BatchParam{})) {
// reinitialize the cache
cache_info_.erase(id);
MakeCache(this, ".ellpack.page", cache_prefix_, &cache_info_);
std::unique_ptr<common::HistogramCuts> cuts;
cuts.reset(new common::HistogramCuts{
common::DeviceSketch(param.gpu_id, this, param.max_bin, 0)});
this->InitializeSparsePage(); // reset after use.
row_stride = GetRowStride(this);
this->InitializeSparsePage(); // reset after use.
CHECK_NE(row_stride, 0);
batch_param_ = param;
auto ft = this->info_.feature_types.ConstDeviceSpan();
ellpack_page_source_.reset(); // release resources.
ellpack_page_source_.reset(new EllpackPageSource(
this->missing_, this->nthreads_, this->Info().num_col_,
this->n_batches_, cache_info_.at(id), param, std::move(cuts),
this->IsDense(), row_stride, ft, sparse_page_source_));
} else {
CHECK(sparse_page_source_);
ellpack_page_source_->Reset();
}
auto begin_iter = BatchIterator<EllpackPage>(ellpack_page_source_);
return BatchSet<EllpackPage>(BatchIterator<EllpackPage>(begin_iter));
}
} // namespace data
} // namespace xgboost

View File

@@ -1,5 +1,5 @@
/*!
* Copyright 2015 by Contributors
* Copyright 2015-2021 by Contributors
* \file sparse_page_dmatrix.h
* \brief External-memory version of DMatrix.
* \author Tianqi Chen
@@ -13,24 +13,88 @@
#include <string>
#include <utility>
#include <vector>
#include <map>
#include "ellpack_page_source.h"
#include "sparse_page_source.h"
namespace xgboost {
namespace data {
// Used for external memory.
/**
* \brief DMatrix used for external memory.
*
* The external memory is created for controlling memory usage by splitting up data into
* multiple batches. However that doesn't mean we will actually process exact 1 batch at
* a time, which would be terribly slow considering that we have to loop through the
* whole dataset for every tree split. So we use async pre-fetch and let caller to decide
* how many batches it wants to process by returning data as shared pointer. The caller
* can use async function to process the data or just stage those batches, making the
* decision is out of the scope for sparse page dmatrix. These 2 optimizations might
* defeat the purpose of splitting up dataset since if you load all the batches then the
* memory usage is even worse than using a single batch. Essentially we need to control
* how many batches can be in memory at the same time.
*
* Right now the write to the cache is sequential operation and is blocking, reading from
* cache is async but with a hard coded limit of 4 pages as an heuristic. So by sparse
* dmatrix itself there can be only 9 pages in main memory (might be of different types)
* at the same time: 1 page pending for write, 4 pre-fetched sparse pages, 4 pre-fetched
* dependent pages. If the caller stops iteration at the middle and start again, then the
* number of pages in memory can hit 16 due to pre-fetching, but this should be a bug in
* caller's code (XGBoost doesn't discard a large portion of data at the end, there's not
* sampling algo that samples only the first portion of data).
*
* Of course if the caller decides to retain some batches to perform parallel processing,
* then we might load all pages in memory, which is also considered as a bug in caller's
* code. So if the algo supports external memory, it must be careful that queue for async
* call must have an upper limit.
*
* Another assumption we make is that the data must be immutable so caller should never
* change the data. Sparse page source returns const page to make sure of that. If you
* want to change the generated page like Ellpack, pass parameter into `GetBatches` to
* re-generate them instead of trying to modify the pages in-place.
*
* A possible optimization is dropping the sparse page once dependent pages like ellpack
* are constructed and cached.
*/
class SparsePageDMatrix : public DMatrix {
MetaInfo info_;
BatchParam batch_param_;
std::map<std::string, std::shared_ptr<Cache>> cache_info_;
DMatrixHandle proxy_;
DataIterHandle iter_;
DataIterResetCallback *reset_;
XGDMatrixCallbackNext *next_;
float missing_;
int nthreads_;
std::string cache_prefix_;
uint32_t n_batches_ {0};
// sparse page is the source to other page types, we make a special member function.
void InitializeSparsePage();
// Non-virtual version that can be used in constructor
BatchSet<SparsePage> GetRowBatchesImpl();
public:
template <typename AdapterT>
explicit SparsePageDMatrix(AdapterT* adapter, float missing, int nthread,
const std::string& cache_prefix,
size_t page_size = kPageSize)
: cache_info_(std::move(cache_prefix)) {
row_source_.reset(new data::SparsePageSource(adapter, missing, nthread,
cache_prefix, page_size));
explicit SparsePageDMatrix(DataIterHandle iter, DMatrixHandle proxy,
DataIterResetCallback *reset,
XGDMatrixCallbackNext *next, float missing,
int32_t nthreads, std::string cache_prefix);
~SparsePageDMatrix() override {
// Clear out all resources before deleting the cache file.
sparse_page_source_.reset();
ellpack_page_source_.reset();
column_source_.reset();
sorted_column_source_.reset();
ghist_index_source_.reset();
for (auto const &kv : cache_info_) {
CHECK(kv.second);
auto n = kv.second->ShardName();
TryDeleteCacheFile(n);
}
}
~SparsePageDMatrix() override = default;
MetaInfo& Info() override;
@@ -47,30 +111,41 @@ class SparsePageDMatrix : public DMatrix {
BatchSet<CSCPage> GetColumnBatches() override;
BatchSet<SortedCSCPage> GetSortedColumnBatches() override;
BatchSet<EllpackPage> GetEllpackBatches(const BatchParam& param) override;
BatchSet<GHistIndexMatrix> GetGradientIndex(const BatchParam&) override {
LOG(FATAL) << "Not implemented.";
return BatchSet<GHistIndexMatrix>(BatchIterator<GHistIndexMatrix>(nullptr));
}
BatchSet<GHistIndexMatrix> GetGradientIndex(const BatchParam&) override;
// source data pointers.
std::unique_ptr<SparsePageSource> row_source_;
std::unique_ptr<CSCPageSource> column_source_;
std::unique_ptr<SortedCSCPageSource> sorted_column_source_;
std::unique_ptr<EllpackPageSource> ellpack_source_;
// saved batch param
BatchParam batch_param_;
// the cache prefix
std::string cache_info_;
// Store column densities to avoid recalculating
std::vector<float> col_density_;
std::shared_ptr<SparsePageSource> sparse_page_source_;
std::shared_ptr<EllpackPageSource> ellpack_page_source_;
std::shared_ptr<CSCPageSource> column_source_;
std::shared_ptr<SortedCSCPageSource> sorted_column_source_;
std::shared_ptr<GHistIndexMatrix> ghist_index_source_;
bool EllpackExists() const override {
return static_cast<bool>(ellpack_source_);
return static_cast<bool>(ellpack_page_source_);
}
bool SparsePageExists() const override {
return static_cast<bool>(row_source_);
return static_cast<bool>(sparse_page_source_);
}
};
inline std::string MakeId(std::string prefix, SparsePageDMatrix *ptr) {
std::stringstream ss;
ss << ptr;
return prefix + "-" + ss.str();
}
inline std::string
MakeCache(SparsePageDMatrix *ptr, std::string format, std::string prefix,
std::map<std::string, std::shared_ptr<Cache>> *out) {
auto &cache_info = *out;
auto name = MakeId(prefix, ptr);
auto id = name + format;
auto it = cache_info.find(id);
if (it == cache_info.cend()) {
cache_info[id].reset(new Cache{false, name, format});
}
return id;
}
} // namespace data
} // namespace xgboost
#endif // XGBOOST_DATA_SPARSE_PAGE_DMATRIX_H_

View File

@@ -1,77 +0,0 @@
/*!
* Copyright (c) 2020 by XGBoost Contributors
*/
#include "sparse_page_source.h"
namespace xgboost {
namespace data {
void DataPool::Slice(std::shared_ptr<SparsePage> out, size_t offset,
size_t n_rows, size_t entry_offset) const {
auto const &in_offset = pool_.offset.HostVector();
auto const &in_data = pool_.data.HostVector();
auto &h_offset = out->offset.HostVector();
CHECK_LE(offset + n_rows + 1, in_offset.size());
h_offset.resize(n_rows + 1, 0);
std::transform(in_offset.cbegin() + offset,
in_offset.cbegin() + offset + n_rows + 1, h_offset.begin(),
[=](size_t ptr) { return ptr - entry_offset; });
auto &h_data = out->data.HostVector();
CHECK_GT(h_offset.size(), 0);
size_t n_entries = h_offset.back();
h_data.resize(n_entries);
CHECK_EQ(n_entries, in_offset.at(offset + n_rows) - in_offset.at(offset));
std::copy_n(in_data.cbegin() + in_offset.at(offset), n_entries,
h_data.begin());
}
void DataPool::SplitWritePage() {
size_t total = pool_.Size();
size_t offset = 0;
size_t entry_offset = 0;
do {
size_t n_rows = std::min(page_size_, total - offset);
std::shared_ptr<SparsePage> out;
writer_->Alloc(&out);
out->Clear();
out->SetBaseRowId(inferred_num_rows_);
this->Slice(out, offset, n_rows, entry_offset);
inferred_num_rows_ += out->Size();
offset += n_rows;
entry_offset += out->data.Size();
CHECK_NE(out->Size(), 0);
writer_->PushWrite(std::move(out));
} while (total - offset >= page_size_);
if (total - offset != 0) {
auto out = std::make_shared<SparsePage>();
this->Slice(out, offset, total - offset, entry_offset);
CHECK_NE(out->Size(), 0);
pool_.Clear();
pool_.Push(*out);
} else {
pool_.Clear();
}
}
size_t DataPool::Finalize() {
inferred_num_rows_ += pool_.Size();
if (pool_.Size() != 0) {
std::shared_ptr<SparsePage> page;
this->writer_->Alloc(&page);
page->Clear();
page->Push(pool_);
this->writer_->PushWrite(std::move(page));
}
if (inferred_num_rows_ == 0) {
std::shared_ptr<SparsePage> page;
this->writer_->Alloc(&page);
page->Clear();
this->writer_->PushWrite(std::move(page));
}
return inferred_num_rows_;
}
} // namespace data
} // namespace xgboost

View File

@@ -0,0 +1,17 @@
/*!
* Copyright 2021 XGBoost contributors
*/
#include "sparse_page_source.h"
#include "proxy_dmatrix.cuh"
#include "simple_dmatrix.cuh"
namespace xgboost {
namespace data {
void DevicePush(DMatrixProxy* proxy, float missing, SparsePage* page) {
auto device = proxy->DeviceIdx();
Dispatch(proxy, [&](auto const &value) {
CopyToSparsePage(value, device, missing, page);
});
}
} // namespace data
} // namespace xgboost

View File

@@ -1,54 +1,18 @@
/*!
* Copyright (c) 2014-2019 by Contributors
* \file page_csr_source.h
* External memory data source, saved with sparse_batch_page binary format.
* \author Tianqi Chen
*
* -------------------------------------------------
* Random notes on implementation of external memory
* -------------------------------------------------
*
* As of XGBoost 1.3, the general pipeline is:
*
* dmlc text file parser --> file adapter --> sparse page source -> data pool -->
* write to binary cache --> load it back ~~> [ other pages (csc, ellpack, sorted csc) -->
* write to binary cache ] --> use it in various algorithms.
*
* ~~> means optional
*
* The dmlc text file parser returns number of blocks based on available threads, which
* can make the data partitioning non-deterministic, so here we set up an extra data pool
* to stage parsed data. As a result, the number of blocks returned by text parser does
* not equal to number of blocks in binary cache.
*
* Binary cache loading is async by the dmlc threaded iterator, which helps performance,
* but as this iterator itself is not thread safe, so calling
* `dmatrix->GetBatches<SparsePage>` is also not thread safe. Please note that, the
* threaded iterator is also used inside dmlc text file parser.
*
* Memory consumption is difficult to control due to various reasons. Firstly the text
* parsing doesn't have a batch size, only a hard coded buffer size is available.
* Secondly, everything is loaded/written with async queue, with multiple queues running
* the memory consumption is difficult to measure.
*
* The threaded iterator relies heavily on C++ memory model and threading primitive. The
* concurrent writer for binary cache is an old copy of moody queue. We should try to
* replace them with something more robust.
* Copyright (c) 2014-2021 by Contributors
* \file sparse_page_source.h
*/
#ifndef XGBOOST_DATA_SPARSE_PAGE_SOURCE_H_
#define XGBOOST_DATA_SPARSE_PAGE_SOURCE_H_
#include <dmlc/threadediter.h>
#include <dmlc/timer.h>
#include <algorithm>
#include <limits>
#include <locale>
#include <memory>
#include <algorithm> // std::min
#include <string>
#include <utility>
#include <vector>
#include <fstream>
#include <future>
#include <thread>
#include <map>
#include <memory>
#include "rabit/rabit.h"
#include "xgboost/base.h"
@@ -56,93 +20,12 @@
#include "adapter.h"
#include "sparse_page_writer.h"
#include "proxy_dmatrix.h"
#include "../common/common.h"
#include <xgboost/data.h>
namespace detail {
// Split a cache info string with delimiter ':'
// If cache info string contains drive letter (e.g. C:), exclude it before splitting
inline std::vector<std::string>
GetCacheShards(const std::string& cache_info) {
#if (defined _WIN32) || (defined __CYGWIN__)
if (cache_info.length() >= 2
&& std::isalpha(cache_info[0], std::locale::classic())
&& cache_info[1] == ':') {
std::vector<std::string> cache_shards
= xgboost::common::Split(cache_info.substr(2), ':');
cache_shards[0] = cache_info.substr(0, 2) + cache_shards[0];
return cache_shards;
}
#endif // (defined _WIN32) || (defined __CYGWIN__)
return xgboost::common::Split(cache_info, ':');
}
} // namespace detail
namespace xgboost {
namespace data {
template<typename S, typename T>
class SparseBatchIteratorImpl : public BatchIteratorImpl<T> {
public:
explicit SparseBatchIteratorImpl(S* source) : source_(source) {
CHECK(source_ != nullptr);
source_->BeforeFirst();
source_->Next();
}
T& operator*() override { return source_->Value(); }
const T& operator*() const override { return source_->Value(); }
void operator++() override { at_end_ = !source_->Next(); }
bool AtEnd() const override { return at_end_; }
private:
S* source_{nullptr};
bool at_end_{ false };
};
/*! \brief magic number used to identify Page */
static const int kMagic = 0xffffab02;
/*!
* \brief decide the format from cache prefix.
* \return pair of row format, column format type of the cache prefix.
*/
inline std::pair<std::string, std::string> DecideFormat(const std::string& cache_prefix) {
size_t pos = cache_prefix.rfind(".fmt-");
if (pos != std::string::npos) {
std::string fmt = cache_prefix.substr(pos + 5, cache_prefix.length());
size_t cpos = fmt.rfind('-');
if (cpos != std::string::npos) {
return std::make_pair(fmt.substr(0, cpos), fmt.substr(cpos + 1, fmt.length()));
} else {
return std::make_pair(fmt, fmt);
}
} else {
std::string raw = "raw";
return std::make_pair(raw, raw);
}
}
struct CacheInfo {
std::string name_info;
std::vector<std::string> format_shards;
std::vector<std::string> name_shards;
};
inline CacheInfo ParseCacheInfo(const std::string& cache_info, const std::string& page_type) {
CacheInfo info;
std::vector<std::string> cache_shards = ::detail::GetCacheShards(cache_info);
CHECK_NE(cache_shards.size(), 0U);
// read in the info files.
info.name_info = cache_shards[0];
for (const std::string& prefix : cache_shards) {
info.name_shards.push_back(prefix + page_type);
info.format_shards.push_back(DecideFormat(prefix).first);
}
return info;
}
inline void TryDeleteCacheFile(const std::string& file) {
if (std::remove(file.c_str()) != 0) {
LOG(WARNING) << "Couldn't remove external memory cache file " << file
@@ -150,415 +33,327 @@ inline void TryDeleteCacheFile(const std::string& file) {
}
}
inline void CheckCacheFileExists(const std::string& file) {
std::ifstream f(file.c_str());
if (f.good()) {
LOG(FATAL)
<< "Cache file " << file << " exists already; "
<< "Is there another DMatrix with the same "
"cache prefix? It can be caused by previously used DMatrix that "
"hasn't been collected by language environment garbage collector. "
"Otherwise please remove it manually.";
}
}
struct Cache {
// whether the write to the cache is complete
bool written;
std::string name;
std::string format;
// offset into binary cache file.
std::vector<size_t> offset;
Cache(bool w, std::string n, std::string fmt)
: written{w}, name{std::move(n)}, format{std::move(fmt)} {
offset.push_back(0);
}
static std::string ShardName(std::string name, std::string format) {
CHECK_EQ(format.front(), '.');
return name + format;
}
std::string ShardName() {
return ShardName(this->name, this->format);
}
// The write is completed.
void Commit() {
if (!written) {
std::partial_sum(offset.begin(), offset.end(), offset.begin());
written = true;
}
}
};
// Prevents multi-threaded call.
class TryLockGuard {
std::mutex& lock_;
/**
* \brief Given a set of cache files and page type, this object iterates over batches
* using prefetching for improved performance. Not thread safe.
*
* \tparam PageT Type of the page t.
*/
template <typename PageT>
class ExternalMemoryPrefetcher : dmlc::DataIter<PageT> {
public:
explicit ExternalMemoryPrefetcher(const CacheInfo& info) noexcept(false)
: base_rowid_(0), page_(nullptr), clock_ptr_(0) {
// read in the info files
CHECK_NE(info.name_shards.size(), 0U);
{
std::unique_ptr<dmlc::Stream> finfo(
dmlc::Stream::Create(info.name_info.c_str(), "r"));
int tmagic;
CHECK(finfo->Read(&tmagic));
CHECK_EQ(tmagic, kMagic) << "invalid format, magic number mismatch";
}
files_.resize(info.name_shards.size());
formats_.resize(info.name_shards.size());
prefetchers_.resize(info.name_shards.size());
// read in the cache files.
for (size_t i = 0; i < info.name_shards.size(); ++i) {
std::string name_row = info.name_shards.at(i);
files_[i].reset(dmlc::SeekStream::CreateForRead(name_row.c_str()));
std::unique_ptr<dmlc::SeekStream>& fi = files_[i];
std::string format;
CHECK(fi->Read(&format)) << "Invalid page format";
formats_[i].reset(CreatePageFormat<PageT>(format));
std::unique_ptr<SparsePageFormat<PageT>>& fmt = formats_[i];
size_t fbegin = fi->Tell();
prefetchers_[i].reset(new dmlc::ThreadedIter<PageT>(4));
prefetchers_[i]->Init(
[&fi, &fmt](PageT** dptr) {
if (*dptr == nullptr) {
*dptr = new PageT();
}
return fmt->Read(*dptr, fi.get());
},
[&fi, fbegin]() { fi->Seek(fbegin); });
}
explicit TryLockGuard(std::mutex& lock) : lock_{lock} { // NOLINT
CHECK(lock_.try_lock()) << "Multiple threads attempting to use Sparse DMatrix.";
}
/*! \brief destructor */
~ExternalMemoryPrefetcher() override {
delete page_;
~TryLockGuard() {
lock_.unlock();
}
};
// implement Next
bool Next() override {
CHECK(mutex_.try_lock()) << "Multiple threads attempting to use prefetcher";
// doing clock rotation over shards.
if (page_ != nullptr) {
size_t n = prefetchers_.size();
prefetchers_[(clock_ptr_ + n - 1) % n]->Recycle(&page_);
}
template <typename S>
class SparsePageSourceImpl : public BatchIteratorImpl<S> {
protected:
// Prevents calling this iterator from multiple places(or threads).
std::mutex single_threaded_;
if (prefetchers_[clock_ptr_]->Next(&page_)) {
page_->SetBaseRowId(base_rowid_);
base_rowid_ += page_->Size();
// advance clock
clock_ptr_ = (clock_ptr_ + 1) % prefetchers_.size();
mutex_.unlock();
return true;
} else {
mutex_.unlock();
std::shared_ptr<S> page_;
bool at_end_ {false};
float missing_;
int nthreads_;
bst_feature_t n_features_;
uint32_t count_{0};
uint32_t n_batches_ {0};
std::shared_ptr<Cache> cache_info_;
std::unique_ptr<dmlc::Stream> fo_;
using Ring = std::vector<std::future<std::shared_ptr<S>>>;
// A ring storing futures to data. Since the DMatrix iterator is forward only, so we
// can pre-fetch data in a ring.
std::unique_ptr<Ring> ring_{new Ring};
bool ReadCache() {
CHECK(!at_end_);
if (!cache_info_->written) {
return false;
}
}
// implement BeforeFirst
void BeforeFirst() override {
CHECK(mutex_.try_lock()) << "Multiple threads attempting to use prefetcher";
base_rowid_ = 0;
clock_ptr_ = 0;
for (auto& p : prefetchers_) {
p->BeforeFirst();
if (fo_) {
fo_.reset(); // flush the data to disk.
ring_->resize(n_batches_);
}
mutex_.unlock();
// An heuristic for number of pre-fetched batches. We can make it part of BatchParam
// to let user adjust number of pre-fetched batches when needed.
uint32_t constexpr kPreFetch = 4;
size_t n_prefetch_batches = std::min(kPreFetch, n_batches_);
CHECK_GT(n_prefetch_batches, 0) << "total batches:" << n_batches_;
size_t fetch_it = count_;
for (size_t i = 0; i < n_prefetch_batches; ++i, ++fetch_it) {
fetch_it %= n_batches_; // ring
if (ring_->at(fetch_it).valid()) { continue; }
auto const *self = this; // make sure it's const
CHECK_LT(fetch_it, cache_info_->offset.size());
ring_->at(fetch_it) = std::async(std::launch::async, [fetch_it, self]() {
std::unique_ptr<SparsePageFormat<S>> fmt{CreatePageFormat<S>("raw")};
auto n = self->cache_info_->ShardName();
size_t offset = self->cache_info_->offset.at(fetch_it);
std::unique_ptr<dmlc::SeekStream> fi{
dmlc::SeekStream::CreateForRead(n.c_str())};
fi->Seek(offset);
CHECK_EQ(fi->Tell(), offset);
auto page = std::make_shared<S>();
CHECK(fmt->Read(page.get(), fi.get()));
return page;
});
}
CHECK_EQ(std::count_if(ring_->cbegin(), ring_->cend(),
[](auto const &f) { return f.valid(); }),
n_prefetch_batches)
<< "Sparse DMatrix assumes forward iteration.";
page_ = (*ring_)[count_].get();
return true;
}
// implement Value
PageT& Value() { return *page_; }
void WriteCache() {
CHECK(!cache_info_->written);
std::unique_ptr<SparsePageFormat<S>> fmt{CreatePageFormat<S>("raw")};
if (!fo_) {
auto n = cache_info_->ShardName();
fo_.reset(dmlc::Stream::Create(n.c_str(), "w"));
}
auto bytes = fmt->Write(*page_, fo_.get());
cache_info_->offset.push_back(bytes);
}
const PageT& Value() const override { return *page_; }
private:
std::mutex mutex_;
/*! \brief number of rows */
size_t base_rowid_;
/*! \brief page currently on hold. */
PageT* page_;
/*! \brief internal clock ptr */
size_t clock_ptr_;
/*! \brief file pointer to the row blob file. */
std::vector<std::unique_ptr<dmlc::SeekStream>> files_;
/*! \brief Sparse page format file. */
std::vector<std::unique_ptr<SparsePageFormat<PageT>>> formats_;
/*! \brief internal prefetcher. */
std::vector<std::unique_ptr<dmlc::ThreadedIter<PageT>>> prefetchers_;
};
// A data pool to keep the size of each page balanced and data partitioning to be
// deterministic.
class DataPool {
size_t inferred_num_rows_;
MetaInfo* info_;
SparsePage pool_;
size_t page_size_;
SparsePageWriter<SparsePage> *writer_;
void Slice(std::shared_ptr<SparsePage> out, size_t offset, size_t n_rows,
size_t entry_offset) const;
void SplitWritePage();
virtual void Fetch() = 0;
public:
DataPool(MetaInfo *info, size_t page_size,
SparsePageWriter<SparsePage> *writer)
: inferred_num_rows_{0}, info_{info},
page_size_{page_size}, writer_{writer} {}
SparsePageSourceImpl(float missing, int nthreads, bst_feature_t n_features,
uint32_t n_batches, std::shared_ptr<Cache> cache)
: missing_{missing}, nthreads_{nthreads}, n_features_{n_features},
n_batches_{n_batches}, cache_info_{std::move(cache)} {}
void Push(std::shared_ptr<SparsePage> page) {
info_->num_nonzero_ += page->data.Size();
pool_.Push(*page);
if (pool_.Size() > page_size_) {
this->SplitWritePage();
SparsePageSourceImpl(SparsePageSourceImpl const &that) = delete;
~SparsePageSourceImpl() override {
for (auto& fu : *ring_) {
if (fu.valid()) {
fu.get();
}
}
page->Clear();
}
size_t Finalize();
uint32_t Iter() const { return count_; }
const S &operator*() const override {
CHECK(page_);
return *page_;
}
std::shared_ptr<S const> Page() const override {
return page_;
}
bool AtEnd() const override {
return at_end_;
}
virtual void Reset() {
TryLockGuard guard{single_threaded_};
at_end_ = false;
count_ = 0;
this->Fetch();
}
};
class SparsePageSource {
#if defined(XGBOOST_USE_CUDA)
void DevicePush(DMatrixProxy* proxy, float missing, SparsePage* page);
#else
inline void DevicePush(DMatrixProxy* proxy, float missing, SparsePage* page) {
common::AssertGPUSupport();
}
#endif
class SparsePageSource : public SparsePageSourceImpl<SparsePage> {
DataIterProxy<DataIterResetCallback, XGDMatrixCallbackNext> iter_;
DMatrixProxy* proxy_;
size_t base_row_id_ {0};
void Fetch() final {
page_ = std::make_shared<SparsePage>();
if (!this->ReadCache()) {
bool type_error { false };
CHECK(proxy_);
HostAdapterDispatch(proxy_, [&](auto const &adapter_batch) {
page_->Push(adapter_batch, this->missing_, this->nthreads_);
}, &type_error);
if (type_error) {
DevicePush(proxy_, missing_, page_.get());
}
page_->SetBaseRowId(base_row_id_);
base_row_id_ += page_->Size();
n_batches_++;
this->WriteCache();
}
}
public:
template <typename AdapterT>
SparsePageSource(AdapterT* adapter, float missing, int nthread,
const std::string& cache_info,
const size_t page_size = DMatrix::kPageSize) {
const std::string page_type = ".row.page";
cache_info_ = ParseCacheInfo(cache_info, page_type);
// Warn user if old cache files
CheckCacheFileExists(cache_info_.name_info);
for (auto file : cache_info_.name_shards) {
CheckCacheFileExists(file);
SparsePageSource(
DataIterProxy<DataIterResetCallback, XGDMatrixCallbackNext> iter,
DMatrixProxy *proxy, float missing, int nthreads,
bst_feature_t n_features, uint32_t n_batches, std::shared_ptr<Cache> cache)
: SparsePageSourceImpl(missing, nthreads, n_features, n_batches, cache),
iter_{iter}, proxy_{proxy} {
if (!cache_info_->written) {
iter_.Reset();
iter_.Next();
}
{
SparsePageWriter<SparsePage> writer(cache_info_.name_shards,
cache_info_.format_shards, 6);
DataPool pool(&info, page_size, &writer);
std::shared_ptr<SparsePage> page { new SparsePage };
uint64_t inferred_num_columns = 0;
uint64_t inferred_num_rows = 0;
const uint64_t default_max = std::numeric_limits<uint64_t>::max();
uint64_t last_group_id = default_max;
bst_uint group_size = 0;
std::vector<uint64_t> qids;
adapter->BeforeFirst();
while (adapter->Next()) {
auto& batch = adapter->Value();
if (batch.Labels() != nullptr) {
auto& labels = info.labels_.HostVector();
labels.insert(labels.end(), batch.Labels(),
batch.Labels() + batch.Size());
}
if (batch.Weights() != nullptr) {
auto& weights = info.weights_.HostVector();
weights.insert(weights.end(), batch.Weights(),
batch.Weights() + batch.Size());
}
if (batch.BaseMargin() != nullptr) {
auto& base_margin = info.base_margin_.HostVector();
base_margin.insert(base_margin.end(), batch.BaseMargin(),
batch.BaseMargin() + batch.Size());
}
if (batch.Qid() != nullptr) {
qids.insert(qids.end(), batch.Qid(), batch.Qid() + batch.Size());
// get group
for (size_t i = 0; i < batch.Size(); ++i) {
const uint64_t cur_group_id = batch.Qid()[i];
if (last_group_id == default_max ||
last_group_id != cur_group_id) {
info.group_ptr_.push_back(group_size);
}
last_group_id = cur_group_id;
++group_size;
}
}
CHECK_EQ(page->Size(), 0);
auto batch_max_columns = page->Push(batch, missing, nthread);
inferred_num_columns =
std::max(batch_max_columns, inferred_num_columns);
inferred_num_rows += page->Size();
pool.Push(page);
page->SetBaseRowId(inferred_num_rows);
}
if (last_group_id != default_max) {
if (group_size > info.group_ptr_.back()) {
info.group_ptr_.push_back(group_size);
}
}
// Deal with empty rows/columns if necessary
if (adapter->NumColumns() == kAdapterUnknownSize) {
info.num_col_ = inferred_num_columns;
} else {
info.num_col_ = adapter->NumColumns();
}
// Synchronise worker columns
rabit::Allreduce<rabit::op::Max>(&info.num_col_, 1);
if (adapter->NumRows() == kAdapterUnknownSize) {
info.num_row_ = inferred_num_rows;
} else {
if (page->offset.HostVector().empty()) {
page->offset.HostVector().emplace_back(0);
}
while (inferred_num_rows < adapter->NumRows()) {
page->offset.HostVector().emplace_back(
page->offset.HostVector().back());
inferred_num_rows++;
}
info.num_row_ = adapter->NumRows();
}
pool.Push(page);
pool.Finalize();
std::unique_ptr<dmlc::Stream> fo(
dmlc::Stream::Create(cache_info_.name_info.c_str(), "w"));
int tmagic = kMagic;
fo->Write(tmagic);
// Either every row has query ID or none at all
CHECK(qids.empty() || qids.size() == info.num_row_);
info.SaveBinary(fo.get());
}
LOG(INFO) << "SparsePageSource Finished writing to "
<< cache_info_.name_info;
external_prefetcher_.reset(
new ExternalMemoryPrefetcher<SparsePage>(cache_info_));
this->Fetch();
}
~SparsePageSource() {
external_prefetcher_.reset();
TryDeleteCacheFile(cache_info_.name_info);
for (auto file : cache_info_.name_shards) {
TryDeleteCacheFile(file);
SparsePageSource& operator++() final {
TryLockGuard guard{single_threaded_};
count_++;
if (cache_info_->written) {
at_end_ = (count_ == n_batches_);
} else {
at_end_ = !iter_.Next();
}
if (at_end_) {
cache_info_->Commit();
if (n_batches_ != 0) {
CHECK_EQ(count_, n_batches_);
}
CHECK_GE(count_, 1);
proxy_ = nullptr;
} else {
this->Fetch();
}
return *this;
}
BatchSet<SparsePage> GetBatchSet() {
auto begin_iter = BatchIterator<SparsePage>(
new SparseBatchIteratorImpl<ExternalMemoryPrefetcher<SparsePage>,
SparsePage>(external_prefetcher_.get()));
return BatchSet<SparsePage>(begin_iter);
}
MetaInfo info;
void Reset() override {
if (proxy_) {
TryLockGuard guard{single_threaded_};
iter_.Reset();
}
SparsePageSourceImpl::Reset();
private:
std::unique_ptr<ExternalMemoryPrefetcher<SparsePage>> external_prefetcher_;
CacheInfo cache_info_;
TryLockGuard guard{single_threaded_};
base_row_id_ = 0;
}
};
class CSCPageSource {
// A mixin for advancing the iterator.
template <typename S>
class PageSourceIncMixIn : public SparsePageSourceImpl<S> {
protected:
std::shared_ptr<SparsePageSource> source_;
public:
CSCPageSource(DMatrix* src, const std::string& cache_info,
const size_t page_size = DMatrix::kPageSize) {
std::string page_type = ".col.page";
cache_info_ = ParseCacheInfo(cache_info, page_type);
for (auto file : cache_info_.name_shards) {
CheckCacheFileExists(file);
}
{
SparsePageWriter<SparsePage> writer(cache_info_.name_shards,
cache_info_.format_shards, 6);
std::shared_ptr<SparsePage> page;
writer.Alloc(&page);
page->Clear();
using SparsePageSourceImpl<S>::SparsePageSourceImpl;
PageSourceIncMixIn& operator++() final {
TryLockGuard guard{this->single_threaded_};
++(*source_);
size_t bytes_write = 0;
double tstart = dmlc::GetTime();
for (auto& batch : src->GetBatches<SparsePage>()) {
page->PushCSC(batch.GetTranspose(src->Info().num_col_));
++this->count_;
this->at_end_ = source_->AtEnd();
if (page->MemCostBytes() >= page_size) {
bytes_write += page->MemCostBytes();
writer.PushWrite(std::move(page));
writer.Alloc(&page);
page->Clear();
double tdiff = dmlc::GetTime() - tstart;
LOG(INFO) << "Writing to " << cache_info << " in "
<< ((bytes_write >> 20UL) / tdiff) << " MB/s, "
<< (bytes_write >> 20UL) << " written";
}
if (this->at_end_) {
this->cache_info_->Commit();
if (this->n_batches_ != 0) {
CHECK_EQ(this->count_, this->n_batches_);
}
if (page->data.Size() != 0) {
writer.PushWrite(std::move(page));
}
LOG(INFO) << "CSCPageSource: Finished writing to "
<< cache_info_.name_info;
CHECK_GE(this->count_, 1);
} else {
this->Fetch();
}
external_prefetcher_.reset(
new ExternalMemoryPrefetcher<CSCPage>(cache_info_));
CHECK_EQ(source_->Iter(), this->count_);
return *this;
}
~CSCPageSource() {
external_prefetcher_.reset();
for (auto file : cache_info_.name_shards) {
TryDeleteCacheFile(file);
}
}
BatchSet<CSCPage> GetBatchSet() {
auto begin_iter = BatchIterator<CSCPage>(
new SparseBatchIteratorImpl<ExternalMemoryPrefetcher<CSCPage>, CSCPage>(
external_prefetcher_.get()));
return BatchSet<CSCPage>(begin_iter);
}
private:
std::unique_ptr<ExternalMemoryPrefetcher<CSCPage>> external_prefetcher_;
CacheInfo cache_info_;
};
class SortedCSCPageSource {
class CSCPageSource : public PageSourceIncMixIn<CSCPage> {
protected:
void Fetch() final {
if (!this->ReadCache()) {
auto const &csr = source_->Page();
this->page_.reset(new CSCPage{});
// we might be able to optimize this by merging transpose and pushcsc
this->page_->PushCSC(csr->GetTranspose(n_features_));
page_->SetBaseRowId(csr->base_rowid);
this->WriteCache();
}
}
public:
SortedCSCPageSource(DMatrix* src, const std::string& cache_info,
const size_t page_size = DMatrix::kPageSize) {
std::string page_type = ".sorted.col.page";
cache_info_ = ParseCacheInfo(cache_info, page_type);
for (auto file : cache_info_.name_shards) {
CheckCacheFileExists(file);
}
{
SparsePageWriter<SparsePage> writer(cache_info_.name_shards,
cache_info_.format_shards, 6);
std::shared_ptr<SparsePage> page;
writer.Alloc(&page);
page->Clear();
size_t bytes_write = 0;
double tstart = dmlc::GetTime();
for (auto& batch : src->GetBatches<SparsePage>()) {
SparsePage tmp = batch.GetTranspose(src->Info().num_col_);
page->PushCSC(tmp);
page->SortRows();
if (page->MemCostBytes() >= page_size) {
bytes_write += page->MemCostBytes();
writer.PushWrite(std::move(page));
writer.Alloc(&page);
page->Clear();
double tdiff = dmlc::GetTime() - tstart;
LOG(INFO) << "Writing to " << cache_info << " in "
<< ((bytes_write >> 20UL) / tdiff) << " MB/s, "
<< (bytes_write >> 20UL) << " written";
}
}
if (page->data.Size() != 0) {
writer.PushWrite(std::move(page));
}
LOG(INFO) << "SortedCSCPageSource: Finished writing to "
<< cache_info_.name_info;
}
external_prefetcher_.reset(
new ExternalMemoryPrefetcher<SortedCSCPage>(cache_info_));
CSCPageSource(
float missing, int nthreads, bst_feature_t n_features, uint32_t n_batches,
std::shared_ptr<Cache> cache,
std::shared_ptr<SparsePageSource> source)
: PageSourceIncMixIn(missing, nthreads, n_features,
n_batches, cache) {
this->source_ = source;
this->Fetch();
}
~SortedCSCPageSource() {
external_prefetcher_.reset();
for (auto file : cache_info_.name_shards) {
TryDeleteCacheFile(file);
}
}
BatchSet<SortedCSCPage> GetBatchSet() {
auto begin_iter = BatchIterator<SortedCSCPage>(
new SparseBatchIteratorImpl<ExternalMemoryPrefetcher<SortedCSCPage>,
SortedCSCPage>(external_prefetcher_.get()));
return BatchSet<SortedCSCPage>(begin_iter);
}
private:
std::unique_ptr<ExternalMemoryPrefetcher<SortedCSCPage>> external_prefetcher_;
CacheInfo cache_info_;
};
class SortedCSCPageSource : public PageSourceIncMixIn<SortedCSCPage> {
protected:
void Fetch() final {
if (!this->ReadCache()) {
auto const &csr = this->source_->Page();
this->page_.reset(new SortedCSCPage{});
// we might be able to optimize this by merging transpose and pushcsc
this->page_->PushCSC(csr->GetTranspose(n_features_));
CHECK_EQ(this->page_->Size(), n_features_);
CHECK_EQ(this->page_->data.Size(), csr->data.Size());
this->page_->SortRows();
page_->SetBaseRowId(csr->base_rowid);
this->WriteCache();
}
}
public:
SortedCSCPageSource(float missing, int nthreads, bst_feature_t n_features,
uint32_t n_batches, std::shared_ptr<Cache> cache,
std::shared_ptr<SparsePageSource> source)
: PageSourceIncMixIn(missing, nthreads, n_features, n_batches, cache) {
this->source_ = source;
this->Fetch();
}
};
} // namespace data
} // namespace xgboost
#endif // XGBOOST_DATA_SPARSE_PAGE_SOURCE_H_

View File

@@ -63,103 +63,6 @@ inline SparsePageFormat<T>* CreatePageFormat(const std::string& name) {
return (e->body)();
}
#if DMLC_ENABLE_STD_THREAD
/*!
* \brief A threaded writer to write sparse batch page to sharded files.
* @tparam T Type of the page.
*/
template<typename T>
class SparsePageWriter {
public:
/*!
* \brief constructor
* \param name_shards name of shard files.
* \param format_shards format of each shard.
* \param extra_buffer_capacity Extra buffer capacity before block.
*/
explicit SparsePageWriter(const std::vector<std::string>& name_shards,
const std::vector<std::string>& format_shards,
size_t extra_buffer_capacity)
: num_free_buffer_(extra_buffer_capacity + name_shards.size()),
clock_ptr_(0),
workers_(name_shards.size()),
qworkers_(name_shards.size()) {
CHECK_EQ(name_shards.size(), format_shards.size());
// start writer threads
for (size_t i = 0; i < name_shards.size(); ++i) {
std::string name_shard = name_shards[i];
std::string format_shard = format_shards[i];
auto* wqueue = &qworkers_[i];
workers_[i].reset(new std::thread(
[this, name_shard, format_shard, wqueue]() {
std::unique_ptr<dmlc::Stream> fo(dmlc::Stream::Create(name_shard.c_str(), "w"));
std::unique_ptr<SparsePageFormat<T>> fmt(CreatePageFormat<T>(format_shard));
fo->Write(format_shard);
std::shared_ptr<T> page;
while (wqueue->Pop(&page)) {
if (page == nullptr) break;
fmt->Write(*page, fo.get());
qrecycle_.Push(std::move(page));
}
fo.reset(nullptr);
LOG(INFO) << "SparsePageWriter Finished writing to " << name_shard;
}));
}
}
/*! \brief destructor, will close the files automatically */
~SparsePageWriter() {
for (auto& queue : qworkers_) {
// use nullptr to signal termination.
std::shared_ptr<T> sig(nullptr);
queue.Push(std::move(sig));
}
for (auto& thread : workers_) {
thread->join();
}
}
/*!
* \brief Push a write job to the writer.
* This function won't block,
* writing is done by another thread inside writer.
* \param page The page to be written
*/
void PushWrite(std::shared_ptr<T>&& page) {
qworkers_[clock_ptr_].Push(std::move(page));
clock_ptr_ = (clock_ptr_ + 1) % workers_.size();
}
/*!
* \brief Allocate a page to store results.
* This function can block when the writer is too slow and buffer pages
* have not yet been recycled.
* \param out_page Used to store the allocated pages.
*/
void Alloc(std::shared_ptr<T>* out_page) {
CHECK(*out_page == nullptr);
if (num_free_buffer_ != 0) {
out_page->reset(new T());
--num_free_buffer_;
} else {
CHECK(qrecycle_.Pop(out_page));
}
}
private:
/*! \brief number of allocated pages */
size_t num_free_buffer_;
/*! \brief clock_pointer */
size_t clock_ptr_;
/*! \brief writer threads */
std::vector<std::unique_ptr<std::thread>> workers_;
/*! \brief recycler queue */
dmlc::ConcurrentBlockingQueue<std::shared_ptr<T>> qrecycle_;
/*! \brief worker threads */
std::vector<dmlc::ConcurrentBlockingQueue<std::shared_ptr<T>>> qworkers_;
};
#endif // DMLC_ENABLE_STD_THREAD
/*!
* \brief Registry entry for sparse page format.
*/