[IO] Enable external memory

This commit is contained in:
tqchen 2016-01-10 12:13:39 -08:00
parent 5f28617d7d
commit ef1021e759
11 changed files with 654 additions and 626 deletions

View File

@ -102,7 +102,7 @@ lint:
python2 dmlc-core/scripts/lint.py xgboost ${LINT_LANG} include src
clean:
$(RM) -r build lib bin *~ */*~ */*/*~ */*/*/*~ $(AMALGA_OBJ)
$(RM) -rf build lib bin *~ */*~ */*/*~ */*/*/*~ $(AMALGA_OBJ) xgboost
clean_all: clean
cd $(DMLC_CORE); make clean; cd -

View File

@ -22,7 +22,7 @@
* "[21:47:50] 6513x126 matrix with 143286 entries loaded from ../data/agaricus.txt.train"
*/
#ifndef XGBOOST_LOG_WITH_TIME
#define XGBOOST_LOG_WITH_TIME 0
#define XGBOOST_LOG_WITH_TIME 1
#endif
/*! \brief namespace of xgboo st*/

View File

@ -8,6 +8,8 @@
#include "./sparse_batch_page.h"
#include "./simple_dmatrix.h"
#include "./simple_csr_source.h"
#include "./sparse_page_source.h"
#include "./sparse_page_dmatrix.h"
#include "../common/io.h"
namespace xgboost {
@ -151,8 +153,11 @@ DMatrix* DMatrix::Create(dmlc::Parser<uint32_t>* parser,
source->CopyFrom(parser);
return DMatrix::Create(std::move(source), cache_prefix);
} else {
LOG(FATAL) << "external memory not yet implemented";
return nullptr;
if (!data::SparsePageSource::CacheExist(cache_prefix)) {
data::SparsePageSource::Create(parser, cache_prefix);
}
std::unique_ptr<data::SparsePageSource> source(new data::SparsePageSource(cache_prefix));
return DMatrix::Create(std::move(source), cache_prefix);
}
}
@ -165,6 +170,10 @@ void DMatrix::SaveToLocalFile(const std::string& fname) {
DMatrix* DMatrix::Create(std::unique_ptr<DataSource>&& source,
const std::string& cache_prefix) {
if (cache_prefix.length() == 0) {
return new data::SimpleDMatrix(std::move(source));
} else {
return new data::SparsePageDMatrix(std::move(source), cache_prefix);
}
}
} // namespace xgboost

View File

@ -1,260 +0,0 @@
/*!
* Copyright (c) 2014 by Contributors
* \file page_dmatrix-inl.hpp
* row iterator based on sparse page
* \author Tianqi Chen
*/
#ifndef XGBOOST_IO_PAGE_DMATRIX_INL_HPP_
#define XGBOOST_IO_PAGE_DMATRIX_INL_HPP_
#include <vector>
#include <string>
#include <algorithm>
#include "../data.h"
#include "../utils/iterator.h"
#include "../utils/thread_buffer.h"
#include "./simple_fmatrix-inl.hpp"
#include "./sparse_batch_page.h"
#include "./page_fmatrix-inl.hpp"
#include "./libsvm_parser.h"
namespace xgboost {
namespace io {
/*! \brief thread buffer iterator */
class ThreadRowPageIterator: public utils::IIterator<RowBatch> {
public:
ThreadRowPageIterator(void) {
itr.SetParam("buffer_size", "4");
page_ = NULL;
base_rowid_ = 0;
}
virtual ~ThreadRowPageIterator(void) {}
virtual void Init(void) {
}
virtual void BeforeFirst(void) {
itr.BeforeFirst();
base_rowid_ = 0;
}
virtual bool Next(void) {
if (!itr.Next(page_)) return false;
out_ = page_->GetRowBatch(base_rowid_);
base_rowid_ += out_.size;
return true;
}
virtual const RowBatch &Value(void) const {
return out_;
}
/*! \brief load and initialize the iterator with fi */
inline void Load(const utils::FileStream &fi) {
itr.get_factory().SetFile(fi, 0);
itr.Init();
this->BeforeFirst();
}
private:
// base row id
size_t base_rowid_;
// output data
RowBatch out_;
SparsePage *page_;
utils::ThreadBuffer<SparsePage*, SparsePageFactory> itr;
};
/*! \brief data matrix using page */
template<int TKMagic>
class DMatrixPageBase : public DataMatrix {
public:
DMatrixPageBase(void) : DataMatrix(kMagic) {
iter_ = new ThreadRowPageIterator();
}
// virtual destructor
virtual ~DMatrixPageBase(void) {
// do not delete row iterator, since it is owned by fmat
// to be cleaned up in a more clear way
}
/*! \brief save a DataMatrix as DMatrixPage */
inline static void Save(const char *fname_, const DataMatrix &mat, bool silent) {
std::string fname = fname_;
utils::FileStream fs(utils::FopenCheck(fname.c_str(), "wb"));
int magic = kMagic;
fs.Write(&magic, sizeof(magic));
mat.info.SaveBinary(fs);
fs.Close();
fname += ".row.blob";
utils::IIterator<RowBatch> *iter = mat.fmat()->RowIterator();
utils::FileStream fbin(utils::FopenCheck(fname.c_str(), "wb"));
SparsePage page;
iter->BeforeFirst();
while (iter->Next()) {
const RowBatch &batch = iter->Value();
for (size_t i = 0; i < batch.size; ++i) {
page.Push(batch[i]);
if (page.MemCostBytes() >= kPageSize) {
page.Save(&fbin); page.Clear();
}
}
}
if (page.data.size() != 0) page.Save(&fbin);
fbin.Close();
if (!silent) {
utils::Printf("DMatrixPage: %lux%lu is saved to %s\n",
static_cast<unsigned long>(mat.info.num_row()), // NOLINT(*)
static_cast<unsigned long>(mat.info.num_col()), fname_); // NOLINT(*)
}
}
/*! \brief load and initialize the iterator with fi */
inline void LoadBinary(utils::FileStream &fi, // NOLINT(*)
bool silent,
const char *fname_) {
this->set_cache_file(fname_);
std::string fname = fname_;
int tmagic;
utils::Check(fi.Read(&tmagic, sizeof(tmagic)) != 0, "invalid input file format");
this->CheckMagic(tmagic);
this->info.LoadBinary(fi);
// load in the row data file
fname += ".row.blob";
utils::FileStream fs(utils::FopenCheck(fname.c_str(), "rb"));
iter_->Load(fs);
if (!silent) {
utils::Printf("DMatrixPage: %lux%lu matrix is loaded",
static_cast<unsigned long>(info.num_row()), // NOLINT(*)
static_cast<unsigned long>(info.num_col())); // NOLINT(*)
if (fname_ != NULL) {
utils::Printf(" from %s\n", fname_);
} else {
utils::Printf("\n");
}
if (info.group_ptr.size() != 0) {
utils::Printf("data contains %u groups\n", (unsigned)info.group_ptr.size() - 1);
}
}
}
/*! \brief save a LibSVM format file as DMatrixPage */
inline void LoadText(const char *uri,
const char* cache_file,
bool silent,
bool loadsplit) {
if (!silent) {
utils::Printf("start generate text file from %s\n", uri);
}
int rank = 0, npart = 1;
if (loadsplit) {
rank = rabit::GetRank();
npart = rabit::GetWorldSize();
}
this->set_cache_file(cache_file);
std::string fname_row = std::string(cache_file) + ".row.blob";
utils::FileStream fo(utils::FopenCheck(fname_row.c_str(), "wb"));
SparsePage page;
size_t bytes_write = 0;
double tstart = rabit::utils::GetTime();
LibSVMParser parser(
dmlc::InputSplit::Create(uri, rank, npart, "text"), 16);
info.Clear();
while (parser.Next()) {
const LibSVMPage &batch = parser.Value();
size_t nlabel = info.labels.size();
info.labels.resize(nlabel + batch.label.size());
if (batch.label.size() != 0) {
std::memcpy(BeginPtr(info.labels) + nlabel,
BeginPtr(batch.label),
batch.label.size() * sizeof(float));
}
page.Push(batch);
for (size_t i = 0; i < batch.data.size(); ++i) {
info.info.num_col = std::max(info.info.num_col,
static_cast<size_t>(batch.data[i].index+1));
}
if (page.MemCostBytes() >= kPageSize) {
bytes_write += page.MemCostBytes();
page.Save(&fo);
page.Clear();
double tdiff = rabit::utils::GetTime() - tstart;
if (!silent) {
utils::Printf("Writting to %s in %g MB/s, %lu MB written\n",
cache_file, (bytes_write >> 20UL) / tdiff,
(bytes_write >> 20UL));
}
}
info.info.num_row += batch.label.size();
}
if (page.data.size() != 0) {
page.Save(&fo);
}
fo.Close();
iter_->Load(utils::FileStream(utils::FopenCheck(fname_row.c_str(), "rb")));
// save data matrix
utils::FileStream fs(utils::FopenCheck(cache_file, "wb"));
int tmagic = kMagic;
fs.Write(&tmagic, sizeof(tmagic));
this->info.SaveBinary(fs);
fs.Close();
if (!silent) {
utils::Printf("DMatrixPage: %lux%lu is parsed from %s\n",
static_cast<unsigned long>(info.num_row()), // NOLINT(*)
static_cast<unsigned long>(info.num_col()), // NOLINT(*)
uri);
}
}
/*! \brief magic number used to identify DMatrix */
static const int kMagic = TKMagic;
/*! \brief page size 32 MB */
static const size_t kPageSize = 32UL << 20UL;
protected:
virtual void set_cache_file(const std::string &cache_file) = 0;
virtual void CheckMagic(int tmagic) = 0;
/*! \brief row iterator */
ThreadRowPageIterator *iter_;
};
class DMatrixPage : public DMatrixPageBase<0xffffab02> {
public:
DMatrixPage(void) {
fmat_ = new FMatrixPage(iter_, this->info);
}
virtual ~DMatrixPage(void) {
delete fmat_;
}
virtual IFMatrix *fmat(void) const {
return fmat_;
}
virtual void set_cache_file(const std::string &cache_file) {
fmat_->set_cache_file(cache_file);
}
virtual void CheckMagic(int tmagic) {
utils::Check(tmagic == DMatrixPageBase<0xffffab02>::kMagic ||
tmagic == DMatrixPageBase<0xffffab03>::kMagic,
"invalid format,magic number mismatch");
}
/*! \brief the real fmatrix */
FMatrixPage *fmat_;
};
// mix of FMatrix S and DMatrix
// cost half of ram usually as DMatrixSimple
class DMatrixHalfRAM : public DMatrixPageBase<0xffffab03> {
public:
DMatrixHalfRAM(void) {
fmat_ = new FMatrixS(iter_, this->info);
}
virtual ~DMatrixHalfRAM(void) {
delete fmat_;
}
virtual IFMatrix *fmat(void) const {
return fmat_;
}
virtual void set_cache_file(const std::string &cache_file) {
}
virtual void CheckMagic(int tmagic) {
utils::Check(tmagic == DMatrixPageBase<0xffffab02>::kMagic ||
tmagic == DMatrixPageBase<0xffffab03>::kMagic,
"invalid format,magic number mismatch");
}
/*! \brief the real fmatrix */
IFMatrix *fmat_;
};
} // namespace io
} // namespace xgboost
#endif // XGBOOST_IO_PAGE_ROW_ITER_INL_HPP_

View File

@ -1,360 +0,0 @@
/*!
* Copyright (c) 2014 by Contributors
* \file page_fmatrix-inl.hpp
* col iterator based on sparse page
* \author Tianqi Chen
*/
#ifndef XGBOOST_IO_PAGE_FMATRIX_INL_HPP_
#define XGBOOST_IO_PAGE_FMATRIX_INL_HPP_
#include <vector>
#include <string>
#include <algorithm>
namespace xgboost {
namespace io {
/*! \brief thread buffer iterator */
class ThreadColPageIterator: public utils::IIterator<ColBatch> {
public:
ThreadColPageIterator(void) {
itr.SetParam("buffer_size", "2");
page_ = NULL;
}
virtual ~ThreadColPageIterator(void) {}
virtual void Init(void) {}
virtual void BeforeFirst(void) {
itr.BeforeFirst();
}
virtual bool Next(void) {
if (!itr.Next(page_)) return false;
out_.col_index = BeginPtr(itr.get_factory().index_set());
col_data_.resize(page_->offset.size() - 1, SparseBatch::Inst(NULL, 0));
for (size_t i = 0; i < col_data_.size(); ++i) {
col_data_[i] = SparseBatch::Inst
(BeginPtr(page_->data) + page_->offset[i],
static_cast<bst_uint>(page_->offset[i + 1] - page_->offset[i]));
}
out_.col_data = BeginPtr(col_data_);
out_.size = col_data_.size();
return true;
}
virtual const ColBatch &Value(void) const {
return out_;
}
/*! \brief load and initialize the iterator with fi */
inline void SetFile(const utils::FileStream &fi) {
itr.get_factory().SetFile(fi);
itr.Init();
}
// set index set
inline void SetIndexSet(const std::vector<bst_uint> &fset, bool load_all) {
itr.get_factory().SetIndexSet(fset, load_all);
}
private:
// output data
ColBatch out_;
SparsePage *page_;
std::vector<SparseBatch::Inst> col_data_;
utils::ThreadBuffer<SparsePage*, SparsePageFactory> itr;
};
struct ColConvertFactory {
inline bool Init(void) {
return true;
}
inline void Setup(float pkeep,
size_t max_row_perbatch,
size_t num_col,
utils::IIterator<RowBatch> *iter,
std::vector<bst_uint> *buffered_rowset,
const std::vector<bool> *enabled) {
pkeep_ = pkeep;
max_row_perbatch_ = max_row_perbatch;
num_col_ = num_col;
iter_ = iter;
buffered_rowset_ = buffered_rowset;
enabled_ = enabled;
}
inline SparsePage *Create(void) {
return new SparsePage();
}
inline void FreeSpace(SparsePage *a) {
delete a;
}
inline void SetParam(const char *name, const char *val) {}
inline bool LoadNext(SparsePage *val) {
tmp_.Clear();
size_t btop = buffered_rowset_->size();
while (iter_->Next()) {
const RowBatch &batch = iter_->Value();
for (size_t i = 0; i < batch.size; ++i) {
bst_uint ridx = static_cast<bst_uint>(batch.base_rowid + i);
if (pkeep_ == 1.0f || random::SampleBinary(pkeep_)) {
buffered_rowset_->push_back(ridx);
tmp_.Push(batch[i]);
}
}
if (tmp_.MemCostBytes() >= kPageSize ||
tmp_.Size() >= max_row_perbatch_) {
this->MakeColPage(tmp_, BeginPtr(*buffered_rowset_) + btop,
*enabled_, val);
return true;
}
}
if (tmp_.Size() != 0) {
this->MakeColPage(tmp_, BeginPtr(*buffered_rowset_) + btop,
*enabled_, val);
return true;
} else {
return false;
}
}
inline void Destroy(void) {}
inline void BeforeFirst(void) {}
inline void MakeColPage(const SparsePage &prow,
const bst_uint *ridx,
const std::vector<bool> &enabled,
SparsePage *pcol) {
pcol->Clear();
int nthread;
#pragma omp parallel
{
nthread = omp_get_num_threads();
int max_nthread = std::max(omp_get_num_procs() / 2 - 4, 1);
if (nthread > max_nthread) {
nthread = max_nthread;
}
}
pcol->Clear();
utils::ParallelGroupBuilder<SparseBatch::Entry>
builder(&pcol->offset, &pcol->data);
builder.InitBudget(num_col_, nthread);
bst_omp_uint ndata = static_cast<bst_uint>(prow.Size());
#pragma omp parallel for schedule(static) num_threads(nthread)
for (bst_omp_uint i = 0; i < ndata; ++i) {
int tid = omp_get_thread_num();
for (size_t j = prow.offset[i]; j < prow.offset[i+1]; ++j) {
const SparseBatch::Entry &e = prow.data[j];
if (enabled[e.index]) {
builder.AddBudget(e.index, tid);
}
}
}
builder.InitStorage();
#pragma omp parallel for schedule(static) num_threads(nthread)
for (bst_omp_uint i = 0; i < ndata; ++i) {
int tid = omp_get_thread_num();
for (size_t j = prow.offset[i]; j < prow.offset[i+1]; ++j) {
const SparseBatch::Entry &e = prow.data[j];
builder.Push(e.index,
SparseBatch::Entry(ridx[i], e.fvalue),
tid);
}
}
utils::Assert(pcol->Size() == num_col_, "inconsistent col data");
// sort columns
bst_omp_uint ncol = static_cast<bst_omp_uint>(pcol->Size());
#pragma omp parallel for schedule(dynamic, 1) num_threads(nthread)
for (bst_omp_uint i = 0; i < ncol; ++i) {
if (pcol->offset[i] < pcol->offset[i + 1]) {
std::sort(BeginPtr(pcol->data) + pcol->offset[i],
BeginPtr(pcol->data) + pcol->offset[i + 1],
SparseBatch::Entry::CmpValue);
}
}
}
// probability of keep
float pkeep_;
// maximum number of rows per batch
size_t max_row_perbatch_;
// number of columns
size_t num_col_;
// row batch iterator
utils::IIterator<RowBatch> *iter_;
// buffered rowset
std::vector<bst_uint> *buffered_rowset_;
// enabled marks
const std::vector<bool> *enabled_;
// internal temp cache
SparsePage tmp_;
/*! \brief page size 256 M */
static const size_t kPageSize = 256 << 20UL;
};
/*!
* \brief sparse matrix that support column access, CSC
*/
class FMatrixPage : public IFMatrix {
public:
typedef SparseBatch::Entry Entry;
/*! \brief constructor */
FMatrixPage(utils::IIterator<RowBatch> *iter,
const learner::MetaInfo &info) : info(info) {
this->iter_ = iter;
}
// destructor
virtual ~FMatrixPage(void) {
if (iter_ != NULL) delete iter_;
}
/*! \return whether column access is enabled */
virtual bool HaveColAccess(void) const {
return col_size_.size() != 0;
}
/*! \brief get number of columns */
virtual size_t NumCol(void) const {
utils::Check(this->HaveColAccess(), "NumCol:need column access");
return col_size_.size();
}
/*! \brief get number of buffered rows */
virtual const std::vector<bst_uint> &buffered_rowset(void) const {
return buffered_rowset_;
}
/*! \brief get column size */
virtual size_t GetColSize(size_t cidx) const {
return col_size_[cidx];
}
/*! \brief get column density */
virtual float GetColDensity(size_t cidx) const {
size_t nmiss = num_buffered_row_ - (col_size_[cidx]);
return 1.0f - (static_cast<float>(nmiss)) / num_buffered_row_;
}
virtual void InitColAccess(const std::vector<bool> &enabled,
float pkeep, size_t max_row_perbatch) {
if (this->HaveColAccess()) return;
if (TryLoadColData()) return;
this->InitColData(enabled, pkeep, max_row_perbatch);
utils::Check(TryLoadColData(), "failed on creating col.blob");
}
/*!
* \brief get the row iterator associated with FMatrix
*/
virtual utils::IIterator<RowBatch>* RowIterator(void) {
iter_->BeforeFirst();
return iter_;
}
/*!
* \brief get the column based iterator
*/
virtual utils::IIterator<ColBatch>* ColIterator(void) {
size_t ncol = this->NumCol();
col_index_.resize(0);
for (size_t i = 0; i < ncol; ++i) {
col_index_.push_back(static_cast<bst_uint>(i));
}
col_iter_.SetIndexSet(col_index_, false);
col_iter_.BeforeFirst();
return &col_iter_;
}
/*!
* \brief column based iterator
*/
virtual utils::IIterator<ColBatch> *ColIterator(const std::vector<bst_uint> &fset) {
size_t ncol = this->NumCol();
col_index_.resize(0);
for (size_t i = 0; i < fset.size(); ++i) {
if (fset[i] < ncol) col_index_.push_back(fset[i]);
}
col_iter_.SetIndexSet(col_index_, false);
col_iter_.BeforeFirst();
return &col_iter_;
}
// set the cache file name
inline void set_cache_file(const std::string &cache_file) {
col_data_name_ = std::string(cache_file) + ".col.blob";
col_meta_name_ = std::string(cache_file) + ".col.meta";
}
protected:
inline bool TryLoadColData(void) {
std::FILE *fi = fopen64(col_meta_name_.c_str(), "rb");
if (fi == NULL) return false;
utils::FileStream fs(fi);
LoadMeta(&fs);
fs.Close();
fi = utils::FopenCheck(col_data_name_.c_str(), "rb");
if (fi == NULL) return false;
col_iter_.SetFile(utils::FileStream(fi));
return true;
}
inline void LoadMeta(utils::IStream *fi) {
utils::Check(fi->Read(&num_buffered_row_, sizeof(num_buffered_row_)) != 0,
"invalid col.blob file");
utils::Check(fi->Read(&buffered_rowset_),
"invalid col.blob file");
utils::Check(fi->Read(&col_size_),
"invalid col.blob file");
}
inline void SaveMeta(utils::IStream *fo) {
fo->Write(&num_buffered_row_, sizeof(num_buffered_row_));
fo->Write(buffered_rowset_);
fo->Write(col_size_);
}
/*!
* \brief initialize column data
* \param enabled the list of enabled columns
* \param pkeep probability to keep a row
* \param max_row_perbatch maximum row per batch
*/
inline void InitColData(const std::vector<bool> &enabled,
float pkeep, size_t max_row_perbatch) {
// clear rowset
buffered_rowset_.clear();
col_size_.resize(info.num_col());
std::fill(col_size_.begin(), col_size_.end(), 0);
utils::FileStream fo;
fo = utils::FileStream(utils::FopenCheck(col_data_name_.c_str(), "wb"));
iter_->BeforeFirst();
double tstart = rabit::utils::GetTime();
size_t bytes_write = 0;
utils::ThreadBuffer<SparsePage*, ColConvertFactory> citer;
citer.SetParam("buffer_size", "2");
citer.get_factory().Setup(pkeep, max_row_perbatch, info.num_col(),
iter_, &buffered_rowset_, &enabled);
citer.Init();
SparsePage *pcol;
while (citer.Next(pcol)) {
for (size_t i = 0; i < pcol->Size(); ++i) {
col_size_[i] += pcol->offset[i + 1] - pcol->offset[i];
}
pcol->Save(&fo);
size_t spage = pcol->MemCostBytes();
bytes_write += spage;
double tnow = rabit::utils::GetTime();
double tdiff = tnow - tstart;
utils::Printf("Writing to %s in %g MB/s, %lu MB written\n",
col_data_name_.c_str(),
(bytes_write >> 20UL) / tdiff,
(bytes_write >> 20UL));
}
fo.Close();
num_buffered_row_ = buffered_rowset_.size();
fo = utils::FileStream(utils::FopenCheck(col_meta_name_.c_str(), "wb"));
this->SaveMeta(&fo);
fo.Close();
}
private:
/*! \brief page size 256 M */
static const size_t kPageSize = 256 << 20UL;
// shared meta info with DMatrix
const learner::MetaInfo &info;
// row iterator
utils::IIterator<RowBatch> *iter_;
/*! \brief column based data file name */
std::string col_data_name_;
/*! \brief column based data file name */
std::string col_meta_name_;
/*! \brief list of row index that are buffered */
std::vector<bst_uint> buffered_rowset_;
// number of buffered rows
size_t num_buffered_row_;
// count for column data
std::vector<size_t> col_size_;
// internal column index for output
std::vector<bst_uint> col_index_;
// internal thread backed col iterator
ThreadColPageIterator col_iter_;
};
} // namespace io
} // namespace xgboost
#endif // XGBOOST_IO_PAGE_FMATRIX_INL_HPP_

View File

@ -15,7 +15,6 @@
namespace xgboost {
/*! \brief namespace of internal data structures*/
namespace data {
/*!
* \brief The simplest form of data holder, can be used to create DMatrix.

View File

@ -13,6 +13,7 @@
#include <dmlc/io.h>
#include <vector>
#include <algorithm>
#include <cstring>
namespace xgboost {
namespace data {
@ -163,6 +164,24 @@ class SparsePage {
offset[i + begin] = top + batch.ind_ptr[i + 1] - batch.ind_ptr[0];
}
}
/*!
* \brief Push row block into the page.
* \param batch the row batch.
*/
inline void Push(const dmlc::RowBlock<uint32_t>& batch) {
data.reserve(data.size() + batch.offset[batch.size] - batch.offset[0]);
offset.reserve(offset.size() + batch.size);
CHECK(batch.index != nullptr);
for (size_t i = 0; i < batch.size; ++i) {
offset.push_back(offset.back() + batch.offset[i + 1] - batch.offset[i]);
}
for (size_t i = batch.offset[0]; i < batch.offset[batch.size]; ++i) {
uint32_t index = batch.index[i];
bst_float fvalue = batch.value == nullptr ? 1.0f : batch.value[i];
data.push_back(SparseBatch::Entry(index, fvalue));
}
CHECK_EQ(offset.back(), data.size());
}
/*!
* \brief Push a sparse page
* \param batch the row page

View File

@ -0,0 +1,253 @@
/*!
* Copyright 2014 by Contributors
* \file sparse_page_dmatrix.cc
* \brief The external memory version of Page Iterator.
* \author Tianqi Chen
*/
#include <dmlc/base.h>
#include <dmlc/timer.h>
#include <xgboost/logging.h>
#include <memory>
#include "./sparse_page_dmatrix.h"
#include "../common/random.h"
#include "../common/group_data.h"
namespace xgboost {
namespace data {
SparsePageDMatrix::ColPageIter::ColPageIter(std::unique_ptr<dmlc::SeekStream>&& fi)
: fi_(std::move(fi)), page_(nullptr) {
load_all_ = false;
prefetcher_.Init([this](SparsePage** dptr) {
if (*dptr == nullptr) {
*dptr = new SparsePage();
}
if (load_all_) {
return (*dptr)->Load(fi_.get());
} else {
return (*dptr)->Load(fi_.get(), index_set_);
}
}, [this] () {
fi_->Seek(0);
index_set_ = set_index_set_;
load_all_ = set_load_all_;
});
}
SparsePageDMatrix::ColPageIter::~ColPageIter() {
delete page_;
}
bool SparsePageDMatrix::ColPageIter::Next() {
if (page_ != nullptr) {
prefetcher_.Recycle(&page_);
}
if (prefetcher_.Next(&page_)) {
out_.col_index = dmlc::BeginPtr(index_set_);
col_data_.resize(page_->offset.size() - 1, SparseBatch::Inst(nullptr, 0));
for (size_t i = 0; i < col_data_.size(); ++i) {
col_data_[i] = SparseBatch::Inst
(dmlc::BeginPtr(page_->data) + page_->offset[i],
static_cast<bst_uint>(page_->offset[i + 1] - page_->offset[i]));
}
out_.col_data = dmlc::BeginPtr(col_data_);
out_.size = col_data_.size();
return true;
} else {
return false;
}
}
void SparsePageDMatrix::ColPageIter::Init(const std::vector<bst_uint>& index_set,
bool load_all) {
set_index_set_ = index_set;
set_load_all_ = load_all;
std::sort(set_index_set_.begin(), set_index_set_.end());
this->BeforeFirst();
}
dmlc::DataIter<ColBatch>* SparsePageDMatrix::ColIterator() {
CHECK(col_iter_.get() != nullptr);
std::vector<bst_uint> col_index;
size_t ncol = this->info().num_col;
for (size_t i = 0; i < ncol; ++i) {
col_index.push_back(static_cast<bst_uint>(i));
}
col_iter_->Init(col_index, true);
return col_iter_.get();
}
dmlc::DataIter<ColBatch>* SparsePageDMatrix::
ColIterator(const std::vector<bst_uint>& fset) {
CHECK(col_iter_.get() != nullptr);
std::vector<bst_uint> col_index;
size_t ncol = this->info().num_col;
for (size_t i = 0; i < fset.size(); ++i) {
if (fset[i] < ncol) {
col_index.push_back(fset[i]);
}
}
col_iter_->Init(col_index, false);
return col_iter_.get();
}
bool SparsePageDMatrix::TryInitColData() {
// load meta data.
{
std::string col_meta_name = cache_prefix_ + ".col.meta";
std::unique_ptr<dmlc::Stream> fmeta(
dmlc::Stream::Create(col_meta_name.c_str(), "r", true));
if (fmeta.get() == nullptr) return false;
CHECK(fmeta->Read(&buffered_rowset_)) << "invalid col.meta file";
CHECK(fmeta->Read(&col_size_)) << "invalid col.meta file";
}
// load real data
{
std::string col_data_name = cache_prefix_ + ".col.page";
std::unique_ptr<dmlc::SeekStream> fdata(
dmlc::SeekStream::CreateForRead(col_data_name.c_str(), true));
if (fdata.get() == nullptr) return false;
col_iter_.reset(new ColPageIter(std::move(fdata)));
}
return true;
}
void SparsePageDMatrix::InitColAccess(const std::vector<bool>& enabled,
float pkeep,
size_t max_row_perbatch) {
if (HaveColAccess()) return;
if (TryInitColData()) return;
const MetaInfo& info = this->info();
if (max_row_perbatch == std::numeric_limits<size_t>::max()) {
max_row_perbatch = kMaxRowPerBatch;
}
buffered_rowset_.clear();
col_size_.resize(info.num_col);
std::fill(col_size_.begin(), col_size_.end(), 0);
// make the sparse page.
dmlc::ThreadedIter<SparsePage> cmaker;
SparsePage tmp;
dmlc::DataIter<RowBatch>* iter = this->RowIterator();
std::bernoulli_distribution coin_flip(pkeep);
auto& rnd = common::GlobalRandom();
// function to create the page.
auto make_col_batch = [&] (
const SparsePage& prow,
const bst_uint* ridx,
SparsePage **dptr) {
if (*dptr == nullptr) {
*dptr = new SparsePage();
}
SparsePage* pcol = *dptr;
pcol->Clear();
int nthread;
#pragma omp parallel
{
nthread = omp_get_num_threads();
nthread = std::max(nthread, std::max(omp_get_num_procs() / 2 - 1, 1));
}
pcol->Clear();
common::ParallelGroupBuilder<SparseBatch::Entry>
builder(&pcol->offset, &pcol->data);
builder.InitBudget(info.num_col, nthread);
bst_omp_uint ndata = static_cast<bst_uint>(prow.Size());
#pragma omp parallel for schedule(static) num_threads(nthread)
for (bst_omp_uint i = 0; i < ndata; ++i) {
int tid = omp_get_thread_num();
for (size_t j = prow.offset[i]; j < prow.offset[i+1]; ++j) {
const SparseBatch::Entry &e = prow.data[j];
if (enabled[e.index]) {
builder.AddBudget(e.index, tid);
}
}
}
builder.InitStorage();
#pragma omp parallel for schedule(static) num_threads(nthread)
for (bst_omp_uint i = 0; i < ndata; ++i) {
int tid = omp_get_thread_num();
for (size_t j = prow.offset[i]; j < prow.offset[i+1]; ++j) {
const SparseBatch::Entry &e = prow.data[j];
builder.Push(e.index,
SparseBatch::Entry(ridx[i], e.fvalue),
tid);
}
}
CHECK_EQ(pcol->Size(), info.num_col);
// sort columns
bst_omp_uint ncol = static_cast<bst_omp_uint>(pcol->Size());
#pragma omp parallel for schedule(dynamic, 1) num_threads(nthread)
for (bst_omp_uint i = 0; i < ncol; ++i) {
if (pcol->offset[i] < pcol->offset[i + 1]) {
std::sort(dmlc::BeginPtr(pcol->data) + pcol->offset[i],
dmlc::BeginPtr(pcol->data) + pcol->offset[i + 1],
SparseBatch::Entry::CmpValue);
}
}
};
auto make_next_col = [&] (SparsePage** dptr) {
tmp.Clear();
size_t btop = buffered_rowset_.size();
while (iter->Next()) {
const RowBatch& batch = iter->Value();
for (size_t i = 0; i < batch.size; ++i) {
bst_uint ridx = static_cast<bst_uint>(batch.base_rowid + i);
if (pkeep == 1.0f || coin_flip(rnd)) {
buffered_rowset_.push_back(ridx);
tmp.Push(batch[i]);
}
}
if (tmp.MemCostBytes() >= kPageSize ||
tmp.Size() >= max_row_perbatch) {
make_col_batch(tmp, dmlc::BeginPtr(buffered_rowset_) + btop, dptr);
return true;
}
}
if (tmp.Size() != 0) {
make_col_batch(tmp, dmlc::BeginPtr(buffered_rowset_) + btop, dptr);
return true;
} else {
return false;
}
};
cmaker.Init(make_next_col, []() {});
std::string col_data_name = cache_prefix_ + ".col.page";
std::unique_ptr<dmlc::Stream> fo(dmlc::Stream::Create(col_data_name.c_str(), "w"));
double tstart = dmlc::GetTime();
size_t bytes_write = 0;
SparsePage* pcol = nullptr;
while (cmaker.Next(&pcol)) {
for (size_t i = 0; i < pcol->Size(); ++i) {
col_size_[i] += pcol->offset[i + 1] - pcol->offset[i];
}
pcol->Save(fo.get());
size_t spage = pcol->MemCostBytes();
bytes_write += spage;
double tdiff = dmlc::GetTime() - tstart;
LOG(CONSOLE) << "Writing to " << col_data_name
<< " in " << ((bytes_write >> 20UL) / tdiff) << " MB/s, "
<< (bytes_write >> 20UL) << " MB writen";
cmaker.Recycle(&pcol);
}
// save meta data
std::string col_meta_name = cache_prefix_ + ".col.meta";
fo.reset(dmlc::Stream::Create(col_meta_name.c_str(), "w"));
fo->Write(buffered_rowset_);
fo->Write(col_size_);
fo.reset(nullptr);
// initialize column data
CHECK(TryInitColData());
}
} // namespace data
} // namespace xgboost

View File

@ -0,0 +1,128 @@
/*!
* Copyright 2015 by Contributors
* \file simple_dmatrix.h
* \brief In-memory version of DMatrix.
* \author Tianqi Chen
*/
#ifndef XGBOOST_SPARSE_PAGE_DMATRIX_H_
#define XGBOOST_SPARSE_PAGE_DMATRIX_H_
#include <xgboost/base.h>
#include <xgboost/data.h>
#include <dmlc/threadediter.h>
#include <vector>
#include <algorithm>
#include <cstring>
#include "./sparse_batch_page.h"
namespace xgboost {
namespace data {
class SparsePageDMatrix : public DMatrix {
public:
explicit SparsePageDMatrix(std::unique_ptr<DataSource>&& source,
const std::string& cache_prefix)
: source_(std::move(source)),
cache_prefix_(cache_prefix) {}
MetaInfo& info() override {
return source_->info;
}
const MetaInfo& info() const override {
return source_->info;
}
dmlc::DataIter<RowBatch>* RowIterator() override {
dmlc::DataIter<RowBatch>* iter = source_.get();
iter->BeforeFirst();
return iter;
}
bool HaveColAccess() const override {
return col_iter_.get() != nullptr;
}
const std::vector<bst_uint>& buffered_rowset() const override {
return buffered_rowset_;
}
size_t GetColSize(size_t cidx) const {
return col_size_[cidx];
}
float GetColDensity(size_t cidx) const override {
size_t nmiss = buffered_rowset_.size() - col_size_[cidx];
return 1.0f - (static_cast<float>(nmiss)) / buffered_rowset_.size();
}
bool SingleColBlock() const override {
return false;
}
dmlc::DataIter<ColBatch>* ColIterator() override;
dmlc::DataIter<ColBatch>* ColIterator(const std::vector<bst_uint>& fset) override;
void InitColAccess(const std::vector<bool>& enabled,
float subsample,
size_t max_row_perbatch) override;
/*! \brief page size 256 MB */
static const size_t kPageSize = 256UL << 20UL;
/*! \brief Maximum number of rows per batch. */
static const size_t kMaxRowPerBatch = 32UL << 10UL;
private:
// declare the column batch iter.
class ColPageIter : public dmlc::DataIter<ColBatch> {
public:
explicit ColPageIter(std::unique_ptr<dmlc::SeekStream>&& fi);
virtual ~ColPageIter();
void BeforeFirst() override {
prefetcher_.BeforeFirst();
}
const ColBatch &Value() const override {
return out_;
}
bool Next() override;
// initialize the column iterator with the specified index set.
void Init(const std::vector<bst_uint>& index_set, bool load_all);
private:
// data file pointer.
std::unique_ptr<dmlc::SeekStream> fi_;
// The index set to be loaded.
std::vector<bst_uint> index_set_;
// The index set by the outsiders
std::vector<bst_uint> set_index_set_;
// whether to load data dataset.
bool set_load_all_, load_all_;
// data prefetcher.
dmlc::ThreadedIter<SparsePage> prefetcher_;
// the temp page.
SparsePage* page_;
// temporal space for batch
ColBatch out_;
// the pointer data.
std::vector<SparseBatch::Inst> col_data_;
};
/*!
* \brief Try to intitialize column data.
* \return true if data already exists, false if they do not.
*/
bool TryInitColData();
// source data pointer.
std::unique_ptr<DataSource> source_;
// the cache prefix
std::string cache_prefix_;
/*! \brief list of row index that are buffered */
std::vector<bst_uint> buffered_rowset_;
// count for column data
std::vector<size_t> col_size_;
// internal column iter.
std::unique_ptr<ColPageIter> col_iter_;
};
} // namespace data
} // namespace xgboost
#endif // XGBOOST_SPARSE_PAGE_DMATRIX_H_

View File

@ -0,0 +1,157 @@
/*!
* Copyright 2015 by Contributors
* \file sparse_page_source.cc
*/
#include <dmlc/base.h>
#include <dmlc/timer.h>
#include <xgboost/logging.h>
#include <memory>
#include "./sparse_page_source.h"
namespace xgboost {
namespace data {
SparsePageSource::SparsePageSource(const std::string& cache_prefix)
: base_rowid_(0), page_(nullptr) {
// read in the info files.
{
std::string name_info = cache_prefix;
std::unique_ptr<dmlc::Stream> finfo(dmlc::Stream::Create(name_info.c_str(), "r"));
int tmagic;
CHECK_EQ(finfo->Read(&tmagic, sizeof(tmagic)), sizeof(tmagic));
this->info.LoadBinary(finfo.get());
}
// read in the cache files.
std::string name_row = cache_prefix + ".row.page";
fi_.reset(dmlc::SeekStream::CreateForRead(name_row.c_str()));
prefetcher_.Init([this] (SparsePage** dptr) {
if (*dptr == nullptr) {
*dptr = new SparsePage();
}
return (*dptr)->Load(fi_.get());
}, [this] () { fi_->Seek(0); });
}
SparsePageSource::~SparsePageSource() {
delete page_;
}
bool SparsePageSource::Next() {
if (page_ != nullptr) {
prefetcher_.Recycle(&page_);
}
if (prefetcher_.Next(&page_)) {
batch_ = page_->GetRowBatch(base_rowid_);
base_rowid_ += batch_.size;
return true;
} else {
return false;
}
}
void SparsePageSource::BeforeFirst() {
base_rowid_ = 0;
prefetcher_.BeforeFirst();
}
const RowBatch& SparsePageSource::Value() const {
return batch_;
}
bool SparsePageSource::CacheExist(const std::string& cache_prefix) {
std::string name_info = cache_prefix;
std::string name_row = cache_prefix + ".row.page";
std::unique_ptr<dmlc::Stream> finfo(dmlc::Stream::Create(name_info.c_str(), "r", true));
std::unique_ptr<dmlc::Stream> frow(dmlc::Stream::Create(name_row.c_str(), "r", true));
return finfo.get() != nullptr && frow.get() != nullptr;
}
void SparsePageSource::Create(dmlc::Parser<uint32_t>* src,
const std::string& cache_prefix) {
// read in the info files.
std::string name_info = cache_prefix;
std::string name_row = cache_prefix + ".row.page";
std::unique_ptr<dmlc::Stream> fo(dmlc::Stream::Create(name_row.c_str(), "w"));
MetaInfo info;
SparsePage page;
size_t bytes_write = 0;
double tstart = dmlc::GetTime();
while (src->Next()) {
const dmlc::RowBlock<uint32_t>& batch = src->Value();
if (batch.label != nullptr) {
info.labels.insert(info.labels.end(), batch.label, batch.label + batch.size);
}
if (batch.weight != nullptr) {
info.weights.insert(info.weights.end(), batch.weight, batch.weight + batch.size);
}
info.num_row += batch.size;
info.num_nonzero += batch.offset[batch.size] - batch.offset[0];
for (size_t i = batch.offset[0]; i < batch.offset[batch.size]; ++i) {
uint32_t index = batch.index[i];
info.num_col = std::max(info.num_col,
static_cast<size_t>(index + 1));
}
page.Push(batch);
if (page.MemCostBytes() >= kPageSize) {
bytes_write += page.MemCostBytes();
page.Save(fo.get());
page.Clear();
double tdiff = dmlc::GetTime() - tstart;
LOG(CONSOLE) << "Writing to " << name_row << " in "
<< ((bytes_write >> 20UL) / tdiff) << " MB/s, "
<< (bytes_write >> 20UL) << " written";
}
}
if (page.data.size() != 0) {
page.Save(fo.get());
}
fo.reset(dmlc::Stream::Create(name_info.c_str(), "w"));
int tmagic = kMagic;
fo->Write(&tmagic, sizeof(tmagic));
info.SaveBinary(fo.get());
LOG(CONSOLE) << "SparsePageSource: Finished writing to " << cache_prefix;
}
void SparsePageSource::Create(DMatrix* src,
const std::string& cache_prefix) {
// read in the info files.
std::string name_info = cache_prefix;
std::string name_row = cache_prefix + ".row.page";
std::unique_ptr<dmlc::Stream> fo(dmlc::Stream::Create(name_row.c_str(), "w"));
SparsePage page;
size_t bytes_write = 0;
double tstart = dmlc::GetTime();
dmlc::DataIter<RowBatch>* iter = src->RowIterator();
while (iter->Next()) {
page.Push(iter->Value());
if (page.MemCostBytes() >= kPageSize) {
bytes_write += page.MemCostBytes();
page.Save(fo.get());
page.Clear();
double tdiff = dmlc::GetTime() - tstart;
LOG(CONSOLE) << "Writing to " << name_row << " in "
<< ((bytes_write >> 20UL) / tdiff) << " MB/s, "
<< (bytes_write >> 20UL) << " written";
}
}
if (page.data.size() != 0) {
page.Save(fo.get());
}
fo.reset(dmlc::Stream::Create(name_info.c_str(), "w"));
int tmagic = kMagic;
fo->Write(&tmagic, sizeof(tmagic));
src->info().SaveBinary(fo.get());
LOG(CONSOLE) << "SparsePageSource: Finished writing to " << cache_prefix;
}
} // namespace data
} // namespace xgboost

View File

@ -0,0 +1,83 @@
/*!
* Copyright (c) 2014 by Contributors
* \file page_csr_source.h
* External memory data source, saved with sparse_batch_page binary format.
* \author Tianqi Chen
*/
#ifndef XGBOOST_DATA_SPARSE_PAGE_SOURCE_H_
#define XGBOOST_DATA_SPARSE_PAGE_SOURCE_H_
#include <xgboost/base.h>
#include <xgboost/data.h>
#include <dmlc/threadediter.h>
#include <vector>
#include <algorithm>
#include <string>
#include "./sparse_batch_page.h"
namespace xgboost {
namespace data {
/*!
* \brief External memory data source.
* \code
* std::unique_ptr<DataSource> source(new SimpleCSRSource(cache_prefix));
* // add data to source
* DMatrix* dmat = DMatrix::Create(std::move(source));
* \encode
*/
class SparsePageSource : public DataSource {
public:
/*!
* \brief Create source from cache files the cache_prefix.
* \param cache_prefix The prefix of cache we want to solve.
*/
explicit SparsePageSource(const std::string& cache_prefix) noexcept(false);
/*! \brief destructor */
virtual ~SparsePageSource();
// implement Next
bool Next() override;
// implement BeforeFirst
void BeforeFirst() override;
// implement Value
const RowBatch& Value() const override;
/*!
* \brief Create source by taking data from parser.
* \param src source parser.
* \param cache_prefix The cache_prefix of cache file location.
*/
static void Create(dmlc::Parser<uint32_t>* src,
const std::string& cache_prefix);
/*!
* \brief Create source cache by copy content from DMatrix.
* \param cache_prefix The cache_prefix of cache file location.
*/
static void Create(DMatrix* src,
const std::string& cache_prefix);
/*!
* \brief Check if the cache file already exists.
* \param cache_prefix The cache prefix of files.
* \return Whether cache file already exists.
*/
static bool CacheExist(const std::string& cache_prefix);
/*! \brief page size 32 MB */
static const size_t kPageSize = 32UL << 20UL;
/*! \brief magic number used to identify Page */
static const int kMagic = 0xffffab02;
private:
/*! \brief number of rows */
size_t base_rowid_;
/*! \brief temp data. */
RowBatch batch_;
/*! \brief page currently on hold. */
SparsePage *page_;
/*! \brief The cache predix of the dataset. */
std::string cache_prefix_;
/*! \brief file pointer to the row blob file. */
std::unique_ptr<dmlc::SeekStream> fi_;
/*! \brief internal prefetcher. */
dmlc::ThreadedIter<SparsePage> prefetcher_;
};
} // namespace data
} // namespace xgboost
#endif // XGBOOST_DATA_SPARSE_PAGE_SOURCE_H_