Deterministic data partitioning for external memory (#6317)

* Make external memory data partitioning deterministic.

* Change the meaning of `page_size` from bytes to number of rows.

* Design a data pool.

* Note for external memory.

* Enable unity build on Windows CI.

* Force garbage collect on test.
This commit is contained in:
Jiaming Yuan
2020-11-11 06:11:06 +08:00
committed by GitHub
parent 9564886d9f
commit 43efadea2e
15 changed files with 334 additions and 88 deletions

View File

@@ -830,9 +830,10 @@ void SparsePage::Push(const SparsePage &batch) {
const auto& batch_data_vec = batch.data.HostVector();
size_t top = offset_vec.back();
data_vec.resize(top + batch.data.Size());
std::memcpy(dmlc::BeginPtr(data_vec) + top,
dmlc::BeginPtr(batch_data_vec),
sizeof(Entry) * batch.data.Size());
if (dmlc::BeginPtr(data_vec) && dmlc::BeginPtr(batch_data_vec)) {
std::memcpy(dmlc::BeginPtr(data_vec) + top, dmlc::BeginPtr(batch_data_vec),
sizeof(Entry) * batch.data.Size());
}
size_t begin = offset.Size();
offset_vec.resize(begin + batch.Size());
for (size_t i = 0; i < batch.Size(); ++i) {

View File

@@ -0,0 +1,77 @@
/*!
* Copyright (c) 2020 by XGBoost Contributors
*/
#include "sparse_page_source.h"
namespace xgboost {
namespace data {
void DataPool::Slice(std::shared_ptr<SparsePage> out, size_t offset,
size_t n_rows, size_t entry_offset) const {
auto const &in_offset = pool_.offset.HostVector();
auto const &in_data = pool_.data.HostVector();
auto &h_offset = out->offset.HostVector();
CHECK_LE(offset + n_rows + 1, in_offset.size());
h_offset.resize(n_rows + 1, 0);
std::transform(in_offset.cbegin() + offset,
in_offset.cbegin() + offset + n_rows + 1, h_offset.begin(),
[=](size_t ptr) { return ptr - entry_offset; });
auto &h_data = out->data.HostVector();
CHECK_GT(h_offset.size(), 0);
size_t n_entries = h_offset.back();
h_data.resize(n_entries);
CHECK_EQ(n_entries, in_offset.at(offset + n_rows) - in_offset.at(offset));
std::copy_n(in_data.cbegin() + in_offset.at(offset), n_entries,
h_data.begin());
}
void DataPool::SplitWritePage() {
size_t total = pool_.Size();
size_t offset = 0;
size_t entry_offset = 0;
do {
size_t n_rows = std::min(page_size_, total - offset);
std::shared_ptr<SparsePage> out;
writer_->Alloc(&out);
out->Clear();
out->SetBaseRowId(inferred_num_rows_);
this->Slice(out, offset, n_rows, entry_offset);
inferred_num_rows_ += out->Size();
offset += n_rows;
entry_offset += out->data.Size();
CHECK_NE(out->Size(), 0);
writer_->PushWrite(std::move(out));
} while (total - offset >= page_size_);
if (total - offset != 0) {
auto out = std::make_shared<SparsePage>();
this->Slice(out, offset, total - offset, entry_offset);
CHECK_NE(out->Size(), 0);
pool_.Clear();
pool_.Push(*out);
} else {
pool_.Clear();
}
}
size_t DataPool::Finalize() {
inferred_num_rows_ += pool_.Size();
if (pool_.Size() != 0) {
std::shared_ptr<SparsePage> page;
this->writer_->Alloc(&page);
page->Clear();
page->Push(pool_);
this->writer_->PushWrite(std::move(page));
}
if (inferred_num_rows_ == 0) {
std::shared_ptr<SparsePage> page;
this->writer_->Alloc(&page);
page->Clear();
this->writer_->PushWrite(std::move(page));
}
return inferred_num_rows_;
}
} // namespace data
} // namespace xgboost

View File

@@ -3,6 +3,37 @@
* \file page_csr_source.h
* External memory data source, saved with sparse_batch_page binary format.
* \author Tianqi Chen
*
* -------------------------------------------------
* Random notes on implementation of external memory
* -------------------------------------------------
*
* As of XGBoost 1.3, the general pipeline is:
*
* dmlc text file parser --> file adapter --> sparse page source -> data pool -->
* write to binary cache --> load it back ~~> [ other pages (csc, ellpack, sorted csc) -->
* write to binary cache ] --> use it in various algorithms.
*
* ~~> means optional
*
* The dmlc text file parser returns number of blocks based on available threads, which
* can make the data partitioning non-deterministic, so here we set up an extra data pool
* to stage parsed data. As a result, the number of blocks returned by text parser does
* not equal to number of blocks in binary cache.
*
* Binary cache loading is async by the dmlc threaded iterator, which helps performance,
* but as this iterator itself is not thread safe, so calling
* `dmatrix->GetBatches<SparsePage>` is also not thread safe. Please note that, the
* threaded iterator is also used inside dmlc text file parser.
*
* Memory consumption is difficult to control due to various reasons. Firstly the text
* parsing doesn't have a batch size, only a hard coded buffer size is available.
* Secondly, everything is loaded/written with async queue, with multiple queues running
* the memory consumption is difficult to measure.
*
* The threaded iterator relies heavily on C++ memory model and threading primitive. The
* concurrent writer for binary cache is an old copy of moody queue. We should try to
* replace them with something more robust.
*/
#ifndef XGBOOST_DATA_SPARSE_PAGE_SOURCE_H_
#define XGBOOST_DATA_SPARSE_PAGE_SOURCE_H_
@@ -19,6 +50,7 @@
#include <vector>
#include <fstream>
#include "rabit/rabit.h"
#include "xgboost/base.h"
#include "xgboost/data.h"
@@ -121,9 +153,12 @@ inline void TryDeleteCacheFile(const std::string& file) {
inline void CheckCacheFileExists(const std::string& file) {
std::ifstream f(file.c_str());
if (f.good()) {
LOG(FATAL) << "Cache file " << file
<< " exists already; Is there another DMatrix with the same "
"cache prefix? Otherwise please remove it manually.";
LOG(FATAL)
<< "Cache file " << file << " exists already; "
<< "Is there another DMatrix with the same "
"cache prefix? It can be caused by previously used DMatrix that "
"hasn't been collected by language environment garbage collector. "
"Otherwise please remove it manually.";
}
}
@@ -231,6 +266,38 @@ class ExternalMemoryPrefetcher : dmlc::DataIter<PageT> {
std::vector<std::unique_ptr<dmlc::ThreadedIter<PageT>>> prefetchers_;
};
// A data pool to keep the size of each page balanced and data partitioning to be
// deterministic.
class DataPool {
size_t inferred_num_rows_;
MetaInfo* info_;
SparsePage pool_;
size_t page_size_;
SparsePageWriter<SparsePage> *writer_;
void Slice(std::shared_ptr<SparsePage> out, size_t offset, size_t n_rows,
size_t entry_offset) const;
void SplitWritePage();
public:
DataPool(MetaInfo *info, size_t page_size,
SparsePageWriter<SparsePage> *writer)
: inferred_num_rows_{0}, info_{info},
page_size_{page_size}, writer_{writer} {}
void Push(std::shared_ptr<SparsePage> page) {
info_->num_nonzero_ += page->data.Size();
pool_.Push(*page);
if (pool_.Size() > page_size_) {
this->SplitWritePage();
}
page->Clear();
}
size_t Finalize();
};
class SparsePageSource {
public:
template <typename AdapterT>
@@ -249,17 +316,12 @@ class SparsePageSource {
{
SparsePageWriter<SparsePage> writer(cache_info_.name_shards,
cache_info_.format_shards, 6);
std::shared_ptr<SparsePage> page;
writer.Alloc(&page);
page->Clear();
DataPool pool(&info, page_size, &writer);
std::shared_ptr<SparsePage> page { new SparsePage };
uint64_t inferred_num_columns = 0;
uint64_t inferred_num_rows = 0;
size_t bytes_write = 0;
double tstart = dmlc::GetTime();
// print every 4 sec.
constexpr double kStep = 4.0;
size_t tick_expected = static_cast<double>(kStep);
const uint64_t default_max = std::numeric_limits<uint64_t>::max();
uint64_t last_group_id = default_max;
@@ -296,26 +358,13 @@ class SparsePageSource {
++group_size;
}
}
CHECK_EQ(page->Size(), 0);
auto batch_max_columns = page->Push(batch, missing, nthread);
inferred_num_columns =
std::max(batch_max_columns, inferred_num_columns);
if (page->MemCostBytes() >= page_size) {
inferred_num_rows += page->Size();
info.num_nonzero_ += page->offset.HostVector().back();
bytes_write += page->MemCostBytes();
writer.PushWrite(std::move(page));
writer.Alloc(&page);
page->Clear();
page->SetBaseRowId(inferred_num_rows);
double tdiff = dmlc::GetTime() - tstart;
if (tdiff >= tick_expected) {
LOG(CONSOLE) << "Writing " << page_type << " to " << cache_info
<< " in " << ((bytes_write >> 20UL) / tdiff)
<< " MB/s, " << (bytes_write >> 20UL) << " written";
tick_expected += static_cast<size_t>(kStep);
}
}
inferred_num_rows += page->Size();
pool.Push(page);
page->SetBaseRowId(inferred_num_rows);
}
if (last_group_id != default_max) {
@@ -323,10 +372,6 @@ class SparsePageSource {
info.group_ptr_.push_back(group_size);
}
}
inferred_num_rows += page->Size();
if (!page->offset.HostVector().empty()) {
info.num_nonzero_ += page->offset.HostVector().back();
}
// Deal with empty rows/columns if necessary
if (adapter->NumColumns() == kAdapterUnknownSize) {
@@ -352,10 +397,9 @@ class SparsePageSource {
info.num_row_ = adapter->NumRows();
}
// Make sure we have at least one page if the dataset is empty
if (page->data.Size() > 0 || info.num_row_ == 0) {
writer.PushWrite(std::move(page));
}
pool.Push(page);
pool.Finalize();
std::unique_ptr<dmlc::Stream> fo(
dmlc::Stream::Create(cache_info_.name_info.c_str(), "w"));
int tmagic = kMagic;