Dmatrix refactor stage 2 (#3395)
* DMatrix refactor 2 * Remove buffered rowset usage where possible * Transition to c++11 style iterators for row access * Transition column iterators to C++ 11
This commit is contained in:
@@ -255,10 +255,11 @@ DMatrix* DMatrix::Create(dmlc::Parser<uint32_t>* parser,
|
||||
return DMatrix::Create(std::move(source), cache_prefix);
|
||||
} else {
|
||||
#if DMLC_ENABLE_STD_THREAD
|
||||
if (!data::SparsePageSource::CacheExist(cache_prefix)) {
|
||||
data::SparsePageSource::Create(parser, cache_prefix);
|
||||
if (!data::SparsePageSource::CacheExist(cache_prefix, ".row.page")) {
|
||||
data::SparsePageSource::CreateRowPage(parser, cache_prefix);
|
||||
}
|
||||
std::unique_ptr<data::SparsePageSource> source(new data::SparsePageSource(cache_prefix));
|
||||
std::unique_ptr<data::SparsePageSource> source(
|
||||
new data::SparsePageSource(cache_prefix, ".row.page"));
|
||||
return DMatrix::Create(std::move(source), cache_prefix);
|
||||
#else
|
||||
LOG(FATAL) << "External memory is not enabled in mingw";
|
||||
|
||||
@@ -18,10 +18,7 @@ void SimpleCSRSource::Clear() {
|
||||
void SimpleCSRSource::CopyFrom(DMatrix* src) {
|
||||
this->Clear();
|
||||
this->info = src->Info();
|
||||
auto iter = src->RowIterator();
|
||||
iter->BeforeFirst();
|
||||
while (iter->Next()) {
|
||||
const auto &batch = iter->Value();
|
||||
for (const auto &batch : src->GetRowBatches()) {
|
||||
page_.Push(batch);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,103 +4,79 @@
|
||||
* \brief the input data structure for gradient boosting
|
||||
* \author Tianqi Chen
|
||||
*/
|
||||
#include <xgboost/data.h>
|
||||
#include <limits>
|
||||
#include <algorithm>
|
||||
#include <vector>
|
||||
#include "./simple_dmatrix.h"
|
||||
#include <xgboost/data.h>
|
||||
#include "../common/random.h"
|
||||
#include "../common/group_data.h"
|
||||
|
||||
namespace xgboost {
|
||||
namespace data {
|
||||
MetaInfo& SimpleDMatrix::Info() { return source_->info; }
|
||||
|
||||
bool SimpleDMatrix::ColBatchIter::Next() {
|
||||
if (data_ >= 1) return false;
|
||||
data_ += 1;
|
||||
return true;
|
||||
}
|
||||
const MetaInfo& SimpleDMatrix::Info() const { return source_->info; }
|
||||
|
||||
dmlc::DataIter<SparsePage>* SimpleDMatrix::ColIterator() {
|
||||
col_iter_.BeforeFirst();
|
||||
return &col_iter_;
|
||||
}
|
||||
|
||||
void SimpleDMatrix::InitColAccess(
|
||||
size_t max_row_perbatch, bool sorted) {
|
||||
if (this->HaveColAccess(sorted)) return;
|
||||
col_iter_.sorted_ = sorted;
|
||||
col_iter_.column_page_.reset(new SparsePage());
|
||||
this->MakeOneBatch(col_iter_.column_page_.get(), sorted);
|
||||
}
|
||||
|
||||
// internal function to make one batch from row iter.
|
||||
void SimpleDMatrix::MakeOneBatch(SparsePage* pcol, bool sorted) {
|
||||
// clear rowset
|
||||
buffered_rowset_.Clear();
|
||||
// bit map
|
||||
const int nthread = omp_get_max_threads();
|
||||
pcol->Clear();
|
||||
auto& pcol_offset_vec = pcol->offset.HostVector();
|
||||
auto& pcol_data_vec = pcol->data.HostVector();
|
||||
common::ParallelGroupBuilder<Entry>
|
||||
builder(&pcol_offset_vec, &pcol_data_vec);
|
||||
builder.InitBudget(Info().num_col_, nthread);
|
||||
// start working
|
||||
auto iter = this->RowIterator();
|
||||
iter->BeforeFirst();
|
||||
while (iter->Next()) {
|
||||
const auto& batch = iter->Value();
|
||||
long batch_size = static_cast<long>(batch.Size()); // NOLINT(*)
|
||||
for (long i = 0; i < batch_size; ++i) { // NOLINT(*)
|
||||
auto ridx = static_cast<bst_uint>(batch.base_rowid + i);
|
||||
buffered_rowset_.PushBack(ridx);
|
||||
}
|
||||
#pragma omp parallel for schedule(static)
|
||||
for (long i = 0; i < batch_size; ++i) { // NOLINT(*)
|
||||
int tid = omp_get_thread_num();
|
||||
auto inst = batch[i];
|
||||
for (auto& ins : inst) {
|
||||
builder.AddBudget(ins.index, tid);
|
||||
}
|
||||
}
|
||||
}
|
||||
builder.InitStorage();
|
||||
|
||||
iter->BeforeFirst();
|
||||
while (iter->Next()) {
|
||||
auto &batch = iter->Value();
|
||||
#pragma omp parallel for schedule(static)
|
||||
for (long i = 0; i < static_cast<long>(batch.Size()); ++i) { // NOLINT(*)
|
||||
int tid = omp_get_thread_num();
|
||||
auto inst = batch[i];
|
||||
for (auto& ins : inst) {
|
||||
builder.Push(ins.index,
|
||||
Entry(static_cast<bst_uint>(batch.base_rowid + i),
|
||||
ins.fvalue),
|
||||
tid);
|
||||
}
|
||||
}
|
||||
float SimpleDMatrix::GetColDensity(size_t cidx) {
|
||||
size_t column_size = 0;
|
||||
// Use whatever version of column batches already exists
|
||||
if (sorted_column_page_) {
|
||||
auto batch = this->GetSortedColumnBatches();
|
||||
column_size = (*batch.begin())[cidx].size();
|
||||
} else {
|
||||
auto batch = this->GetColumnBatches();
|
||||
column_size = (*batch.begin())[cidx].size();
|
||||
}
|
||||
|
||||
CHECK_EQ(pcol->Size(), Info().num_col_);
|
||||
size_t nmiss = this->Info().num_row_ - column_size;
|
||||
return 1.0f - (static_cast<float>(nmiss)) / this->Info().num_row_;
|
||||
}
|
||||
|
||||
if (sorted) {
|
||||
// sort columns
|
||||
auto ncol = static_cast<bst_omp_uint>(pcol->Size());
|
||||
#pragma omp parallel for schedule(dynamic, 1) num_threads(nthread)
|
||||
for (bst_omp_uint i = 0; i < ncol; ++i) {
|
||||
if (pcol_offset_vec[i] < pcol_offset_vec[i + 1]) {
|
||||
std::sort(dmlc::BeginPtr(pcol_data_vec) + pcol_offset_vec[i],
|
||||
dmlc::BeginPtr(pcol_data_vec) + pcol_offset_vec[i + 1],
|
||||
Entry::CmpValue);
|
||||
}
|
||||
}
|
||||
class SimpleBatchIteratorImpl : public BatchIteratorImpl {
|
||||
public:
|
||||
explicit SimpleBatchIteratorImpl(SparsePage* page) : page_(page) {}
|
||||
const SparsePage& operator*() const override {
|
||||
CHECK(page_ != nullptr);
|
||||
return *page_;
|
||||
}
|
||||
void operator++() override { page_ = nullptr; }
|
||||
bool AtEnd() const override { return page_ == nullptr; }
|
||||
SimpleBatchIteratorImpl* Clone() override {
|
||||
return new SimpleBatchIteratorImpl(*this);
|
||||
}
|
||||
|
||||
private:
|
||||
SparsePage* page_{nullptr};
|
||||
};
|
||||
|
||||
BatchSet SimpleDMatrix::GetRowBatches() {
|
||||
auto cast = dynamic_cast<SimpleCSRSource*>(source_.get());
|
||||
auto begin_iter = BatchIterator(new SimpleBatchIteratorImpl(&(cast->page_)));
|
||||
return BatchSet(begin_iter);
|
||||
}
|
||||
|
||||
bool SimpleDMatrix::SingleColBlock() const {
|
||||
return true;
|
||||
BatchSet SimpleDMatrix::GetColumnBatches() {
|
||||
// column page doesn't exist, generate it
|
||||
if (!column_page_) {
|
||||
auto page = dynamic_cast<SimpleCSRSource*>(source_.get())->page_;
|
||||
column_page_.reset(
|
||||
new SparsePage(page.GetTranspose(source_->info.num_col_)));
|
||||
}
|
||||
auto begin_iter =
|
||||
BatchIterator(new SimpleBatchIteratorImpl(column_page_.get()));
|
||||
return BatchSet(begin_iter);
|
||||
}
|
||||
|
||||
BatchSet SimpleDMatrix::GetSortedColumnBatches() {
|
||||
// Sorted column page doesn't exist, generate it
|
||||
if (!sorted_column_page_) {
|
||||
auto page = dynamic_cast<SimpleCSRSource*>(source_.get())->page_;
|
||||
sorted_column_page_.reset(
|
||||
new SparsePage(page.GetTranspose(source_->info.num_col_)));
|
||||
sorted_column_page_->SortRows();
|
||||
}
|
||||
auto begin_iter =
|
||||
BatchIterator(new SimpleBatchIteratorImpl(sorted_column_page_.get()));
|
||||
return BatchSet(begin_iter);
|
||||
}
|
||||
|
||||
bool SimpleDMatrix::SingleColBlock() const { return true; }
|
||||
} // namespace data
|
||||
} // namespace xgboost
|
||||
|
||||
@@ -9,9 +9,10 @@
|
||||
|
||||
#include <xgboost/base.h>
|
||||
#include <xgboost/data.h>
|
||||
#include <vector>
|
||||
#include <algorithm>
|
||||
#include <cstring>
|
||||
#include <vector>
|
||||
#include "simple_csr_source.h"
|
||||
|
||||
namespace xgboost {
|
||||
namespace data {
|
||||
@@ -21,79 +22,26 @@ class SimpleDMatrix : public DMatrix {
|
||||
explicit SimpleDMatrix(std::unique_ptr<DataSource>&& source)
|
||||
: source_(std::move(source)) {}
|
||||
|
||||
MetaInfo& Info() override {
|
||||
return source_->info;
|
||||
}
|
||||
MetaInfo& Info() override;
|
||||
|
||||
const MetaInfo& Info() const override {
|
||||
return source_->info;
|
||||
}
|
||||
const MetaInfo& Info() const override;
|
||||
|
||||
dmlc::DataIter<SparsePage>* RowIterator() override {
|
||||
auto iter = source_.get();
|
||||
iter->BeforeFirst();
|
||||
return iter;
|
||||
}
|
||||
|
||||
bool HaveColAccess(bool sorted) const override {
|
||||
return col_iter_.sorted_ == sorted && col_iter_.column_page_!= nullptr;
|
||||
}
|
||||
|
||||
const RowSet& BufferedRowset() const override {
|
||||
return buffered_rowset_;
|
||||
}
|
||||
|
||||
size_t GetColSize(size_t cidx) const override {
|
||||
auto& batch = *col_iter_.column_page_;
|
||||
return batch[cidx].size();
|
||||
}
|
||||
|
||||
float GetColDensity(size_t cidx) const override {
|
||||
size_t nmiss = buffered_rowset_.Size() - GetColSize(cidx);
|
||||
return 1.0f - (static_cast<float>(nmiss)) / buffered_rowset_.Size();
|
||||
}
|
||||
|
||||
dmlc::DataIter<SparsePage>* ColIterator() override;
|
||||
|
||||
void InitColAccess(
|
||||
size_t max_row_perbatch, bool sorted) override;
|
||||
float GetColDensity(size_t cidx) override;
|
||||
|
||||
bool SingleColBlock() const override;
|
||||
|
||||
BatchSet GetRowBatches() override;
|
||||
|
||||
BatchSet GetColumnBatches() override;
|
||||
|
||||
BatchSet GetSortedColumnBatches() override;
|
||||
|
||||
private:
|
||||
// in-memory column batch iterator.
|
||||
struct ColBatchIter: dmlc::DataIter<SparsePage> {
|
||||
public:
|
||||
ColBatchIter() = default;
|
||||
void BeforeFirst() override {
|
||||
data_ = 0;
|
||||
}
|
||||
const SparsePage &Value() const override {
|
||||
return *column_page_;
|
||||
}
|
||||
bool Next() override;
|
||||
|
||||
private:
|
||||
// allow SimpleDMatrix to access it.
|
||||
friend class SimpleDMatrix;
|
||||
// column sparse page
|
||||
std::unique_ptr<SparsePage> column_page_;
|
||||
// data pointer
|
||||
size_t data_{0};
|
||||
// Is column sorted?
|
||||
bool sorted_{false};
|
||||
};
|
||||
|
||||
// source data pointer.
|
||||
std::unique_ptr<DataSource> source_;
|
||||
// column iterator
|
||||
ColBatchIter col_iter_;
|
||||
// list of row index that are buffered.
|
||||
RowSet buffered_rowset_;
|
||||
|
||||
// internal function to make one batch from row iter.
|
||||
void MakeOneBatch(
|
||||
SparsePage *pcol, bool sorted);
|
||||
std::unique_ptr<SparsePage> sorted_column_page_;
|
||||
std::unique_ptr<SparsePage> column_page_;
|
||||
};
|
||||
} // namespace data
|
||||
} // namespace xgboost
|
||||
|
||||
@@ -12,261 +12,92 @@
|
||||
#if DMLC_ENABLE_STD_THREAD
|
||||
#include "./sparse_page_dmatrix.h"
|
||||
#include "../common/random.h"
|
||||
#include "../common/common.h"
|
||||
#include "../common/group_data.h"
|
||||
|
||||
namespace xgboost {
|
||||
namespace data {
|
||||
|
||||
SparsePageDMatrix::ColPageIter::ColPageIter(
|
||||
std::vector<std::unique_ptr<dmlc::SeekStream> >&& files)
|
||||
: page_(nullptr), clock_ptr_(0), files_(std::move(files)) {
|
||||
load_all_ = false;
|
||||
formats_.resize(files_.size());
|
||||
prefetchers_.resize(files_.size());
|
||||
|
||||
for (size_t i = 0; i < files_.size(); ++i) {
|
||||
dmlc::SeekStream* fi = files_[i].get();
|
||||
std::string format;
|
||||
CHECK(fi->Read(&format)) << "Invalid page format";
|
||||
formats_[i].reset(SparsePageFormat::Create(format));
|
||||
SparsePageFormat* fmt = formats_[i].get();
|
||||
size_t fbegin = fi->Tell();
|
||||
prefetchers_[i].reset(new dmlc::ThreadedIter<SparsePage>(4));
|
||||
prefetchers_[i]->Init([this, fi, fmt] (SparsePage** dptr) {
|
||||
if (*dptr == nullptr) {
|
||||
*dptr = new SparsePage();
|
||||
}
|
||||
if (load_all_) {
|
||||
return fmt->Read(*dptr, fi);
|
||||
} else {
|
||||
return fmt->Read(*dptr, fi, index_set_);
|
||||
}
|
||||
}, [this, fi, fbegin] () {
|
||||
fi->Seek(fbegin);
|
||||
index_set_ = set_index_set_;
|
||||
load_all_ = set_load_all_;
|
||||
});
|
||||
}
|
||||
MetaInfo& SparsePageDMatrix::Info() {
|
||||
return row_source_->info;
|
||||
}
|
||||
|
||||
SparsePageDMatrix::ColPageIter::~ColPageIter() {
|
||||
delete page_;
|
||||
const MetaInfo& SparsePageDMatrix::Info() const {
|
||||
return row_source_->info;
|
||||
}
|
||||
|
||||
bool SparsePageDMatrix::ColPageIter::Next() {
|
||||
// doing clock rotation over shards.
|
||||
if (page_ != nullptr) {
|
||||
size_t n = prefetchers_.size();
|
||||
prefetchers_[(clock_ptr_ + n - 1) % n]->Recycle(&page_);
|
||||
class SparseBatchIteratorImpl : public BatchIteratorImpl {
|
||||
public:
|
||||
explicit SparseBatchIteratorImpl(SparsePageSource* source) : source_(source) {
|
||||
CHECK(source_ != nullptr);
|
||||
}
|
||||
if (prefetchers_[clock_ptr_]->Next(&page_)) {
|
||||
// advance clock
|
||||
clock_ptr_ = (clock_ptr_ + 1) % prefetchers_.size();
|
||||
return true;
|
||||
} else {
|
||||
return false;
|
||||
const SparsePage& operator*() const override { return source_->Value(); }
|
||||
void operator++() override { at_end_ = !source_->Next(); }
|
||||
bool AtEnd() const override { return at_end_; }
|
||||
SparseBatchIteratorImpl* Clone() override {
|
||||
return new SparseBatchIteratorImpl(*this);
|
||||
}
|
||||
|
||||
private:
|
||||
SparsePageSource* source_{nullptr};
|
||||
bool at_end_{ false };
|
||||
};
|
||||
|
||||
BatchSet SparsePageDMatrix::GetRowBatches() {
|
||||
auto cast = dynamic_cast<SparsePageSource*>(row_source_.get());
|
||||
cast->BeforeFirst();
|
||||
cast->Next();
|
||||
auto begin_iter = BatchIterator(new SparseBatchIteratorImpl(cast));
|
||||
return BatchSet(begin_iter);
|
||||
}
|
||||
|
||||
void SparsePageDMatrix::ColPageIter::BeforeFirst() {
|
||||
clock_ptr_ = 0;
|
||||
for (auto& p : prefetchers_) {
|
||||
p->BeforeFirst();
|
||||
BatchSet SparsePageDMatrix::GetSortedColumnBatches() {
|
||||
// Lazily instantiate
|
||||
if (!sorted_column_source_) {
|
||||
SparsePageSource::CreateColumnPage(this, cache_info_, true);
|
||||
sorted_column_source_.reset(
|
||||
new SparsePageSource(cache_info_, ".sorted.col.page"));
|
||||
}
|
||||
sorted_column_source_->BeforeFirst();
|
||||
sorted_column_source_->Next();
|
||||
auto begin_iter =
|
||||
BatchIterator(new SparseBatchIteratorImpl(sorted_column_source_.get()));
|
||||
return BatchSet(begin_iter);
|
||||
}
|
||||
|
||||
void SparsePageDMatrix::ColPageIter::Init(
|
||||
const std::vector<bst_uint>& index_set) {
|
||||
set_index_set_ = index_set;
|
||||
set_load_all_ = true;
|
||||
std::sort(set_index_set_.begin(), set_index_set_.end());
|
||||
this->BeforeFirst();
|
||||
BatchSet SparsePageDMatrix::GetColumnBatches() {
|
||||
// Lazily instantiate
|
||||
if (!column_source_) {
|
||||
SparsePageSource::CreateColumnPage(this, cache_info_, false);
|
||||
column_source_.reset(new SparsePageSource(cache_info_, ".col.page"));
|
||||
}
|
||||
column_source_->BeforeFirst();
|
||||
column_source_->Next();
|
||||
auto begin_iter =
|
||||
BatchIterator(new SparseBatchIteratorImpl(column_source_.get()));
|
||||
return BatchSet(begin_iter);
|
||||
}
|
||||
|
||||
dmlc::DataIter<SparsePage>* SparsePageDMatrix::ColIterator() {
|
||||
CHECK(col_iter_ != nullptr);
|
||||
std::vector<bst_uint> col_index;
|
||||
std::iota(col_index.begin(), col_index.end(), bst_uint(0));
|
||||
col_iter_->Init(col_index);
|
||||
return col_iter_.get();
|
||||
}
|
||||
|
||||
bool SparsePageDMatrix::TryInitColData(bool sorted) {
|
||||
// load meta data.
|
||||
std::vector<std::string> cache_shards = common::Split(cache_info_, ':');
|
||||
{
|
||||
std::string col_meta_name = cache_shards[0] + ".col.meta";
|
||||
std::unique_ptr<dmlc::Stream> fmeta(
|
||||
dmlc::Stream::Create(col_meta_name.c_str(), "r", true));
|
||||
if (fmeta == nullptr) return false;
|
||||
CHECK(fmeta->Read(&buffered_rowset_)) << "invalid col.meta file";
|
||||
CHECK(fmeta->Read(&col_size_)) << "invalid col.meta file";
|
||||
}
|
||||
// load real data
|
||||
std::vector<std::unique_ptr<dmlc::SeekStream> > files;
|
||||
for (const std::string& prefix : cache_shards) {
|
||||
std::string col_data_name = prefix + ".col.page";
|
||||
std::unique_ptr<dmlc::SeekStream> fdata(
|
||||
dmlc::SeekStream::CreateForRead(col_data_name.c_str(), true));
|
||||
if (fdata == nullptr) return false;
|
||||
files.push_back(std::move(fdata));
|
||||
}
|
||||
col_iter_.reset(new ColPageIter(std::move(files)));
|
||||
// warning: no attempt to check here whether the cached data was sorted
|
||||
col_iter_->sorted = sorted;
|
||||
return true;
|
||||
}
|
||||
|
||||
void SparsePageDMatrix::InitColAccess(
|
||||
size_t max_row_perbatch, bool sorted) {
|
||||
if (HaveColAccess(sorted)) return;
|
||||
if (TryInitColData(sorted)) return;
|
||||
const MetaInfo& info = this->Info();
|
||||
if (max_row_perbatch == std::numeric_limits<size_t>::max()) {
|
||||
max_row_perbatch = kMaxRowPerBatch;
|
||||
}
|
||||
buffered_rowset_.Clear();
|
||||
col_size_.resize(info.num_col_);
|
||||
std::fill(col_size_.begin(), col_size_.end(), 0);
|
||||
auto iter = this->RowIterator();
|
||||
size_t batch_ptr = 0, batch_top = 0;
|
||||
SparsePage tmp;
|
||||
|
||||
// function to create the page.
|
||||
auto make_col_batch = [&] (
|
||||
const SparsePage& prow,
|
||||
size_t begin,
|
||||
SparsePage *pcol) {
|
||||
pcol->Clear();
|
||||
pcol->base_rowid = buffered_rowset_[begin];
|
||||
const int nthread = std::max(omp_get_max_threads(), std::max(omp_get_num_procs() / 2 - 1, 1));
|
||||
auto& offset_vec = pcol->offset.HostVector();
|
||||
auto& data_vec = pcol->data.HostVector();
|
||||
common::ParallelGroupBuilder<Entry>
|
||||
builder(&offset_vec, &data_vec);
|
||||
builder.InitBudget(info.num_col_, nthread);
|
||||
bst_omp_uint ndata = static_cast<bst_uint>(prow.Size());
|
||||
const auto& prow_offset_vec = prow.offset.HostVector();
|
||||
const auto& prow_data_vec = prow.data.HostVector();
|
||||
#pragma omp parallel for schedule(static) num_threads(nthread)
|
||||
for (bst_omp_uint i = 0; i < ndata; ++i) {
|
||||
int tid = omp_get_thread_num();
|
||||
for (size_t j = prow_offset_vec[i]; j < prow_offset_vec[i+1]; ++j) {
|
||||
const auto e = prow_data_vec[j];
|
||||
builder.AddBudget(e.index, tid);
|
||||
float SparsePageDMatrix::GetColDensity(size_t cidx) {
|
||||
// Finds densities if we don't already have them
|
||||
if (col_density_.empty()) {
|
||||
std::vector<size_t> column_size(this->Info().num_col_);
|
||||
for (const auto &batch : this->GetColumnBatches()) {
|
||||
for (int i = 0; i < batch.Size(); i++) {
|
||||
column_size[i] += batch[i].size();
|
||||
}
|
||||
}
|
||||
builder.InitStorage();
|
||||
#pragma omp parallel for schedule(static) num_threads(nthread)
|
||||
for (bst_omp_uint i = 0; i < ndata; ++i) {
|
||||
int tid = omp_get_thread_num();
|
||||
for (size_t j = prow_offset_vec[i]; j < prow_offset_vec[i+1]; ++j) {
|
||||
const Entry &e = prow_data_vec[j];
|
||||
builder.Push(e.index,
|
||||
Entry(buffered_rowset_[i + begin], e.fvalue),
|
||||
tid);
|
||||
}
|
||||
col_density_.resize(column_size.size());
|
||||
for (int i = 0; i < col_density_.size(); i++) {
|
||||
size_t nmiss = this->Info().num_row_ - column_size[i];
|
||||
col_density_[i] =
|
||||
1.0f - (static_cast<float>(nmiss)) / this->Info().num_row_;
|
||||
}
|
||||
CHECK_EQ(pcol->Size(), info.num_col_);
|
||||
// sort columns
|
||||
if (sorted) {
|
||||
auto ncol = static_cast<bst_omp_uint>(pcol->Size());
|
||||
#pragma omp parallel for schedule(dynamic, 1) num_threads(nthread)
|
||||
for (bst_omp_uint i = 0; i < ncol; ++i) {
|
||||
if (offset_vec[i] < offset_vec[i + 1]) {
|
||||
std::sort(dmlc::BeginPtr(data_vec) + offset_vec[i],
|
||||
dmlc::BeginPtr(data_vec) + offset_vec[i + 1],
|
||||
Entry::CmpValue);
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
auto make_next_col = [&] (SparsePage* dptr) {
|
||||
tmp.Clear();
|
||||
size_t btop = buffered_rowset_.Size();
|
||||
|
||||
while (true) {
|
||||
if (batch_ptr != batch_top) {
|
||||
auto &batch = iter->Value();
|
||||
CHECK_EQ(batch_top, batch.Size());
|
||||
for (size_t i = batch_ptr; i < batch_top; ++i) {
|
||||
auto ridx = static_cast<bst_uint>(batch.base_rowid + i);
|
||||
buffered_rowset_.PushBack(ridx);
|
||||
tmp.Push(batch[i]);
|
||||
|
||||
if (tmp.Size() >= max_row_perbatch ||
|
||||
tmp.MemCostBytes() >= kPageSize) {
|
||||
make_col_batch(tmp, btop, dptr);
|
||||
batch_ptr = i + 1;
|
||||
return true;
|
||||
}
|
||||
}
|
||||
batch_ptr = batch_top;
|
||||
}
|
||||
if (!iter->Next()) break;
|
||||
batch_ptr = 0;
|
||||
batch_top = iter->Value().Size();
|
||||
}
|
||||
|
||||
if (tmp.Size() != 0) {
|
||||
make_col_batch(tmp, btop, dptr);
|
||||
return true;
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
};
|
||||
|
||||
std::vector<std::string> cache_shards = common::Split(cache_info_, ':');
|
||||
std::vector<std::string> name_shards, format_shards;
|
||||
for (const std::string& prefix : cache_shards) {
|
||||
name_shards.push_back(prefix + ".col.page");
|
||||
format_shards.push_back(SparsePageFormat::DecideFormat(prefix).second);
|
||||
}
|
||||
|
||||
{
|
||||
SparsePageWriter writer(name_shards, format_shards, 6);
|
||||
std::shared_ptr<SparsePage> page;
|
||||
writer.Alloc(&page); page->Clear();
|
||||
|
||||
double tstart = dmlc::GetTime();
|
||||
size_t bytes_write = 0;
|
||||
// print every 4 sec.
|
||||
constexpr double kStep = 4.0;
|
||||
size_t tick_expected = kStep;
|
||||
|
||||
while (make_next_col(page.get())) {
|
||||
const auto& page_offset_vec = page->offset.ConstHostVector();
|
||||
for (size_t i = 0; i < page->Size(); ++i) {
|
||||
col_size_[i] += page_offset_vec[i + 1] - page_offset_vec[i];
|
||||
}
|
||||
|
||||
bytes_write += page->MemCostBytes();
|
||||
writer.PushWrite(std::move(page));
|
||||
writer.Alloc(&page);
|
||||
page->Clear();
|
||||
|
||||
double tdiff = dmlc::GetTime() - tstart;
|
||||
if (tdiff >= tick_expected) {
|
||||
LOG(CONSOLE) << "Writing col.page file to " << cache_info_
|
||||
<< " in " << ((bytes_write >> 20UL) / tdiff) << " MB/s, "
|
||||
<< (bytes_write >> 20UL) << " MB writen";
|
||||
tick_expected += kStep;
|
||||
}
|
||||
}
|
||||
// save meta data
|
||||
std::string col_meta_name = cache_shards[0] + ".col.meta";
|
||||
std::unique_ptr<dmlc::Stream> fo(
|
||||
dmlc::Stream::Create(col_meta_name.c_str(), "w"));
|
||||
fo->Write(buffered_rowset_);
|
||||
fo->Write(col_size_);
|
||||
fo.reset(nullptr);
|
||||
}
|
||||
// initialize column data
|
||||
CHECK(TryInitColData(sorted));
|
||||
return col_density_.at(cidx);
|
||||
}
|
||||
|
||||
bool SparsePageDMatrix::SingleColBlock() const {
|
||||
return false;
|
||||
}
|
||||
} // namespace data
|
||||
} // namespace xgboost
|
||||
#endif
|
||||
|
||||
@@ -7,15 +7,12 @@
|
||||
#ifndef XGBOOST_DATA_SPARSE_PAGE_DMATRIX_H_
|
||||
#define XGBOOST_DATA_SPARSE_PAGE_DMATRIX_H_
|
||||
|
||||
#include <xgboost/base.h>
|
||||
#include <xgboost/data.h>
|
||||
#include <dmlc/threadediter.h>
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
#include <algorithm>
|
||||
#include <string>
|
||||
#include "../common/common.h"
|
||||
#include "./sparse_page_writer.h"
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
#include "sparse_page_source.h"
|
||||
|
||||
namespace xgboost {
|
||||
namespace data {
|
||||
@@ -23,104 +20,35 @@ namespace data {
|
||||
class SparsePageDMatrix : public DMatrix {
|
||||
public:
|
||||
explicit SparsePageDMatrix(std::unique_ptr<DataSource>&& source,
|
||||
std::string cache_info)
|
||||
: source_(std::move(source)), cache_info_(std::move(cache_info)) {
|
||||
}
|
||||
std::string cache_info)
|
||||
: row_source_(std::move(source)), cache_info_(std::move(cache_info)) {}
|
||||
|
||||
MetaInfo& Info() override {
|
||||
return source_->info;
|
||||
}
|
||||
MetaInfo& Info() override;
|
||||
|
||||
const MetaInfo& Info() const override {
|
||||
return source_->info;
|
||||
}
|
||||
const MetaInfo& Info() const override;
|
||||
|
||||
dmlc::DataIter<SparsePage>* RowIterator() override {
|
||||
auto iter = source_.get();
|
||||
iter->BeforeFirst();
|
||||
return iter;
|
||||
}
|
||||
BatchSet GetRowBatches() override;
|
||||
|
||||
bool HaveColAccess(bool sorted) const override {
|
||||
return col_iter_ != nullptr && col_iter_->sorted == sorted;
|
||||
}
|
||||
BatchSet GetSortedColumnBatches() override;
|
||||
|
||||
const RowSet& BufferedRowset() const override {
|
||||
return buffered_rowset_;
|
||||
}
|
||||
BatchSet GetColumnBatches() override;
|
||||
|
||||
size_t GetColSize(size_t cidx) const override {
|
||||
return col_size_[cidx];
|
||||
}
|
||||
float GetColDensity(size_t cidx) override;
|
||||
|
||||
float GetColDensity(size_t cidx) const override {
|
||||
size_t nmiss = buffered_rowset_.Size() - col_size_[cidx];
|
||||
return 1.0f - (static_cast<float>(nmiss)) / buffered_rowset_.Size();
|
||||
}
|
||||
|
||||
bool SingleColBlock() const override {
|
||||
return false;
|
||||
}
|
||||
|
||||
dmlc::DataIter<SparsePage>* ColIterator() override;
|
||||
|
||||
void InitColAccess(
|
||||
size_t max_row_perbatch, bool sorted) override;
|
||||
|
||||
/*! \brief page size 256 MB */
|
||||
static const size_t kPageSize = 256UL << 20UL;
|
||||
/*! \brief Maximum number of rows per batch. */
|
||||
static const size_t kMaxRowPerBatch = 64UL << 10UL;
|
||||
bool SingleColBlock() const override;
|
||||
|
||||
private:
|
||||
// declare the column batch iter.
|
||||
class ColPageIter : public dmlc::DataIter<SparsePage> {
|
||||
public:
|
||||
explicit ColPageIter(std::vector<std::unique_ptr<dmlc::SeekStream> >&& files);
|
||||
~ColPageIter() override;
|
||||
void BeforeFirst() override;
|
||||
const SparsePage &Value() const override {
|
||||
return *page_;
|
||||
}
|
||||
bool Next() override;
|
||||
// initialize the column iterator with the specified index set.
|
||||
void Init(const std::vector<bst_uint>& index_set);
|
||||
// If the column features are sorted
|
||||
bool sorted;
|
||||
/*! \brief page size 256 MB */
|
||||
static const size_t kPageSize = 256UL << 20UL;
|
||||
|
||||
private:
|
||||
// the temp page.
|
||||
SparsePage* page_;
|
||||
// internal clock ptr.
|
||||
size_t clock_ptr_;
|
||||
// data file pointer.
|
||||
std::vector<std::unique_ptr<dmlc::SeekStream> > files_;
|
||||
// page format.
|
||||
std::vector<std::unique_ptr<SparsePageFormat> > formats_;
|
||||
/*! \brief internal prefetcher. */
|
||||
std::vector<std::unique_ptr<dmlc::ThreadedIter<SparsePage> > > prefetchers_;
|
||||
// The index set to be loaded.
|
||||
std::vector<bst_uint> index_set_;
|
||||
// The index set by the outsiders
|
||||
std::vector<bst_uint> set_index_set_;
|
||||
// whether to load data dataset.
|
||||
bool set_load_all_, load_all_;
|
||||
};
|
||||
/*!
|
||||
* \brief Try to initialize column data.
|
||||
* \return true if data already exists, false if they do not.
|
||||
*/
|
||||
bool TryInitColData(bool sorted);
|
||||
// source data pointer.
|
||||
std::unique_ptr<DataSource> source_;
|
||||
// source data pointers.
|
||||
std::unique_ptr<DataSource> row_source_;
|
||||
std::unique_ptr<SparsePageSource> column_source_;
|
||||
std::unique_ptr<SparsePageSource> sorted_column_source_;
|
||||
// the cache prefix
|
||||
std::string cache_info_;
|
||||
/*! \brief list of row index that are buffered */
|
||||
RowSet buffered_rowset_;
|
||||
// count for column data
|
||||
std::vector<size_t> col_size_;
|
||||
// internal column iter.
|
||||
std::unique_ptr<ColPageIter> col_iter_;
|
||||
// Store column densities to avoid recalculating
|
||||
std::vector<float> col_density_;
|
||||
};
|
||||
} // namespace data
|
||||
} // namespace xgboost
|
||||
|
||||
@@ -14,7 +14,8 @@
|
||||
namespace xgboost {
|
||||
namespace data {
|
||||
|
||||
SparsePageSource::SparsePageSource(const std::string& cache_info)
|
||||
SparsePageSource::SparsePageSource(const std::string& cache_info,
|
||||
const std::string& page_type)
|
||||
: base_rowid_(0), page_(nullptr), clock_ptr_(0) {
|
||||
// read in the info files
|
||||
std::vector<std::string> cache_shards = common::Split(cache_info, ':');
|
||||
@@ -32,7 +33,7 @@ SparsePageSource::SparsePageSource(const std::string& cache_info)
|
||||
|
||||
// read in the cache files.
|
||||
for (size_t i = 0; i < cache_shards.size(); ++i) {
|
||||
std::string name_row = cache_shards[i] + ".row.page";
|
||||
std::string name_row = cache_shards[i] + page_type;
|
||||
files_[i].reset(dmlc::SeekStream::CreateForRead(name_row.c_str()));
|
||||
dmlc::SeekStream* fi = files_[i].get();
|
||||
std::string format;
|
||||
@@ -83,7 +84,8 @@ const SparsePage& SparsePageSource::Value() const {
|
||||
return *page_;
|
||||
}
|
||||
|
||||
bool SparsePageSource::CacheExist(const std::string& cache_info) {
|
||||
bool SparsePageSource::CacheExist(const std::string& cache_info,
|
||||
const std::string& page_type) {
|
||||
std::vector<std::string> cache_shards = common::Split(cache_info, ':');
|
||||
CHECK_NE(cache_shards.size(), 0U);
|
||||
{
|
||||
@@ -92,22 +94,23 @@ bool SparsePageSource::CacheExist(const std::string& cache_info) {
|
||||
if (finfo == nullptr) return false;
|
||||
}
|
||||
for (const std::string& prefix : cache_shards) {
|
||||
std::string name_row = prefix + ".row.page";
|
||||
std::string name_row = prefix + page_type;
|
||||
std::unique_ptr<dmlc::Stream> frow(dmlc::Stream::Create(name_row.c_str(), "r", true));
|
||||
if (frow == nullptr) return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
void SparsePageSource::Create(dmlc::Parser<uint32_t>* src,
|
||||
void SparsePageSource::CreateRowPage(dmlc::Parser<uint32_t>* src,
|
||||
const std::string& cache_info) {
|
||||
const std::string page_type = ".row.page";
|
||||
std::vector<std::string> cache_shards = common::Split(cache_info, ':');
|
||||
CHECK_NE(cache_shards.size(), 0U);
|
||||
// read in the info files.
|
||||
std::string name_info = cache_shards[0];
|
||||
std::vector<std::string> name_shards, format_shards;
|
||||
for (const std::string& prefix : cache_shards) {
|
||||
name_shards.push_back(prefix + ".row.page");
|
||||
name_shards.push_back(prefix + page_type);
|
||||
format_shards.push_back(SparsePageFormat::DecideFormat(prefix).first);
|
||||
}
|
||||
{
|
||||
@@ -164,8 +167,8 @@ void SparsePageSource::Create(dmlc::Parser<uint32_t>* src,
|
||||
|
||||
double tdiff = dmlc::GetTime() - tstart;
|
||||
if (tdiff >= tick_expected) {
|
||||
LOG(CONSOLE) << "Writing row.page to " << cache_info << " in "
|
||||
<< ((bytes_write >> 20UL) / tdiff) << " MB/s, "
|
||||
LOG(CONSOLE) << "Writing " << page_type << " to " << cache_info
|
||||
<< " in " << ((bytes_write >> 20UL) / tdiff) << " MB/s, "
|
||||
<< (bytes_write >> 20UL) << " written";
|
||||
tick_expected += static_cast<size_t>(kStep);
|
||||
}
|
||||
@@ -192,29 +195,40 @@ void SparsePageSource::Create(dmlc::Parser<uint32_t>* src,
|
||||
LOG(CONSOLE) << "SparsePageSource: Finished writing to " << name_info;
|
||||
}
|
||||
|
||||
void SparsePageSource::Create(DMatrix* src,
|
||||
const std::string& cache_info) {
|
||||
void SparsePageSource::CreatePageFromDMatrix(DMatrix* src,
|
||||
const std::string& cache_info,
|
||||
const std::string& page_type) {
|
||||
std::vector<std::string> cache_shards = common::Split(cache_info, ':');
|
||||
CHECK_NE(cache_shards.size(), 0U);
|
||||
// read in the info files.
|
||||
std::string name_info = cache_shards[0];
|
||||
std::vector<std::string> name_shards, format_shards;
|
||||
for (const std::string& prefix : cache_shards) {
|
||||
name_shards.push_back(prefix + ".row.page");
|
||||
name_shards.push_back(prefix + page_type);
|
||||
format_shards.push_back(SparsePageFormat::DecideFormat(prefix).first);
|
||||
}
|
||||
{
|
||||
SparsePageWriter writer(name_shards, format_shards, 6);
|
||||
std::shared_ptr<SparsePage> page;
|
||||
writer.Alloc(&page); page->Clear();
|
||||
writer.Alloc(&page);
|
||||
page->Clear();
|
||||
|
||||
MetaInfo info = src->Info();
|
||||
size_t bytes_write = 0;
|
||||
double tstart = dmlc::GetTime();
|
||||
auto iter = src->RowIterator();
|
||||
for (auto& batch : src->GetRowBatches()) {
|
||||
if (page_type == ".row.page") {
|
||||
page->Push(batch);
|
||||
} else if (page_type == ".col.page") {
|
||||
page->Push(batch.GetTranspose(src->Info().num_col_));
|
||||
} else if (page_type == ".sorted.col.page") {
|
||||
auto tmp = batch.GetTranspose(src->Info().num_col_);
|
||||
tmp.SortRows();
|
||||
page->Push(tmp);
|
||||
} else {
|
||||
LOG(FATAL) << "Unknown page type: " << page_type;
|
||||
}
|
||||
|
||||
while (iter->Next()) {
|
||||
page->Push(iter->Value());
|
||||
if (page->MemCostBytes() >= kPageSize) {
|
||||
bytes_write += page->MemCostBytes();
|
||||
writer.PushWrite(std::move(page));
|
||||
@@ -239,6 +253,18 @@ void SparsePageSource::Create(DMatrix* src,
|
||||
LOG(CONSOLE) << "SparsePageSource: Finished writing to " << name_info;
|
||||
}
|
||||
|
||||
void SparsePageSource::CreateRowPage(DMatrix* src,
|
||||
const std::string& cache_info) {
|
||||
const std::string page_type = ".row.page";
|
||||
CreatePageFromDMatrix(src, cache_info, page_type);
|
||||
}
|
||||
|
||||
void SparsePageSource::CreateColumnPage(DMatrix* src,
|
||||
const std::string& cache_info,
|
||||
bool sorted) {
|
||||
const std::string page_type = sorted ? ".sorted.col.page" : ".col.page";
|
||||
CreatePageFromDMatrix(src, cache_info, page_type);
|
||||
}
|
||||
} // namespace data
|
||||
} // namespace xgboost
|
||||
#endif
|
||||
|
||||
@@ -31,7 +31,8 @@ class SparsePageSource : public DataSource {
|
||||
* \brief Create source from cache files the cache_prefix.
|
||||
* \param cache_prefix The prefix of cache we want to solve.
|
||||
*/
|
||||
explicit SparsePageSource(const std::string& cache_prefix) noexcept(false);
|
||||
explicit SparsePageSource(const std::string& cache_prefix,
|
||||
const std::string& page_type) noexcept(false);
|
||||
/*! \brief destructor */
|
||||
~SparsePageSource() override;
|
||||
// implement Next
|
||||
@@ -45,26 +46,38 @@ class SparsePageSource : public DataSource {
|
||||
* \param src source parser.
|
||||
* \param cache_info The cache_info of cache file location.
|
||||
*/
|
||||
static void Create(dmlc::Parser<uint32_t>* src,
|
||||
static void CreateRowPage(dmlc::Parser<uint32_t>* src,
|
||||
const std::string& cache_info);
|
||||
/*!
|
||||
* \brief Create source cache by copy content from DMatrix.
|
||||
* \param cache_info The cache_info of cache file location.
|
||||
*/
|
||||
static void Create(DMatrix* src,
|
||||
static void CreateRowPage(DMatrix* src,
|
||||
const std::string& cache_info);
|
||||
|
||||
/*!
|
||||
* \brief Create source cache by copy content from DMatrix. Creates transposed column page, may be sorted or not.
|
||||
* \param cache_info The cache_info of cache file location.
|
||||
* \param sorted Whether columns should be pre-sorted
|
||||
*/
|
||||
static void CreateColumnPage(DMatrix* src,
|
||||
const std::string& cache_info, bool sorted);
|
||||
/*!
|
||||
* \brief Check if the cache file already exists.
|
||||
* \param cache_info The cache prefix of files.
|
||||
* \param page_type Type of the page.
|
||||
* \return Whether cache file already exists.
|
||||
*/
|
||||
static bool CacheExist(const std::string& cache_info);
|
||||
static bool CacheExist(const std::string& cache_info,
|
||||
const std::string& page_type);
|
||||
/*! \brief page size 32 MB */
|
||||
static const size_t kPageSize = 32UL << 20UL;
|
||||
/*! \brief magic number used to identify Page */
|
||||
static const int kMagic = 0xffffab02;
|
||||
|
||||
private:
|
||||
static void CreatePageFromDMatrix(DMatrix* src, const std::string& cache_info,
|
||||
const std::string& page_type);
|
||||
/*! \brief number of rows */
|
||||
size_t base_rowid_;
|
||||
/*! \brief page currently on hold. */
|
||||
|
||||
Reference in New Issue
Block a user