[DISK] Add shard option to disk

This commit is contained in:
tqchen 2016-01-19 13:39:10 -08:00
parent 72961d914b
commit 2230f1273f
8 changed files with 356 additions and 144 deletions

@ -1 +1 @@
Subproject commit ad2ddde8b6624abf3007a71b2923c3925530cc81 Subproject commit 257b09a0ba18625a9fcf3b9471a9b1c35a767b7b

31
src/common/common.h Normal file
View File

@ -0,0 +1,31 @@
/*!
* Copyright 2015 by Contributors
* \file common.h
* \brief Common utilities
*/
#ifndef XGBOOST_COMMON_COMMON_H_
#define XGBOOST_COMMON_COMMON_H_
#include <vector>
#include <string>
#include <sstream>
namespace xgboost {
namespace common {
/*!
* \brief Split a string by delimiter
* \param s String to be splitted.
* \param delim The delimiter.
*/
inline std::vector<std::string> Split(const std::string& s, char delim) {
std::string item;
std::istringstream is(s);
std::vector<std::string> ret;
while (std::getline(is, item, delim)) {
ret.push_back(item);
}
return ret;
}
} // namespace common
} // namespace xgboost
#endif // XGBOOST_COMMON_COMMON_H_

View File

@ -16,6 +16,12 @@
#include <cstring> #include <cstring>
#include <string> #include <string>
#include <utility> #include <utility>
#include <memory>
#if DMLC_ENABLE_STD_THREAD
#include <dmlc/concurrency.h>
#include <thread>
#endif
namespace xgboost { namespace xgboost {
namespace data { namespace data {
@ -26,6 +32,8 @@ class SparsePage {
public: public:
/*! \brief Format of the sparse page. */ /*! \brief Format of the sparse page. */
class Format; class Format;
/*! \brief Writer to write the sparse page to files. */
class Writer;
/*! \brief minimum index of all index, used as hint for compression. */ /*! \brief minimum index of all index, used as hint for compression. */
bst_uint min_index; bst_uint min_index;
/*! \brief offset of the segments */ /*! \brief offset of the segments */
@ -171,6 +179,53 @@ class SparsePage::Format {
static std::pair<std::string, std::string> DecideFormat(const std::string& cache_prefix); static std::pair<std::string, std::string> DecideFormat(const std::string& cache_prefix);
}; };
#if DMLC_ENABLE_STD_THREAD
/*!
* \brief A threaded writer to write sparse batch page to sharded files.
*/
class SparsePage::Writer {
public:
/*!
* \brief constructor
* \param name_shards name of shard files.
* \param format_shards format of each shard.
* \param extra_buffer_capacity Extra buffer capacity before block.
*/
explicit Writer(
const std::vector<std::string>& name_shards,
const std::vector<std::string>& format_shards,
size_t extra_buffer_capacity);
/*! \brief destructor, will close the files automatically */
~Writer();
/*!
* \brief Push a write job to the writer.
* This function won't block,
* writing is done by another thread inside writer.
* \param page The page to be wriiten
*/
void PushWrite(std::unique_ptr<SparsePage>&& page);
/*!
* \brief Allocate a page to store results.
* This function can block when the writer is too slow and buffer pages
* have not yet been recycled.
* \param out_page Used to store the allocated pages.
*/
void Alloc(std::unique_ptr<SparsePage>* out_page);
private:
/*! \brief number of allocated pages */
size_t num_free_buffer_;
/*! \brief clock_pointer */
size_t clock_ptr_;
/*! \brief writer threads */
std::vector<std::unique_ptr<std::thread> > workers_;
/*! \brief recycler queue */
dmlc::ConcurrentBlockingQueue<std::unique_ptr<SparsePage> > qrecycle_;
/*! \brief worker threads */
std::vector<dmlc::ConcurrentBlockingQueue<std::unique_ptr<SparsePage> > > qworkers_;
};
#endif // DMLC_ENABLE_STD_THREAD
/*! /*!
* \brief Registry entry for sparse page format. * \brief Registry entry for sparse page format.
*/ */

View File

@ -12,34 +12,42 @@
#if DMLC_ENABLE_STD_THREAD #if DMLC_ENABLE_STD_THREAD
#include "./sparse_page_dmatrix.h" #include "./sparse_page_dmatrix.h"
#include "../common/random.h" #include "../common/random.h"
#include "../common/common.h"
#include "../common/group_data.h" #include "../common/group_data.h"
namespace xgboost { namespace xgboost {
namespace data { namespace data {
SparsePageDMatrix::ColPageIter::ColPageIter(std::unique_ptr<dmlc::SeekStream>&& fi) SparsePageDMatrix::ColPageIter::ColPageIter(
: fi_(std::move(fi)), page_(nullptr) { std::vector<std::unique_ptr<dmlc::SeekStream> >&& files)
: page_(nullptr), clock_ptr_(0), files_(std::move(files)) {
load_all_ = false; load_all_ = false;
formats_.resize(files_.size());
prefetchers_.resize(files_.size());
for (size_t i = 0; i < files_.size(); ++i) {
dmlc::SeekStream* fi = files_[i].get();
std::string format; std::string format;
CHECK(fi_->Read(&format)) << "Invalid page format"; CHECK(fi->Read(&format)) << "Invalid page format";
format_.reset(SparsePage::Format::Create(format)); formats_[i].reset(SparsePage::Format::Create(format));
size_t fbegin = fi_->Tell(); SparsePage::Format* fmt = formats_[i].get();
size_t fbegin = fi->Tell();
prefetcher_.Init([this](SparsePage** dptr) { prefetchers_[i].reset(new dmlc::ThreadedIter<SparsePage>(4));
prefetchers_[i]->Init([this, fi, fmt] (SparsePage** dptr) {
if (*dptr == nullptr) { if (*dptr == nullptr) {
*dptr = new SparsePage(); *dptr = new SparsePage();
} }
if (load_all_) { if (load_all_) {
return format_->Read(*dptr, fi_.get()); return fmt->Read(*dptr, fi);
} else { } else {
return format_->Read(*dptr, fi_.get(), index_set_); return fmt->Read(*dptr, fi, index_set_);
} }
}, [this, fbegin] () { }, [this, fi, fbegin] () {
fi_->Seek(fbegin); fi->Seek(fbegin);
index_set_ = set_index_set_; index_set_ = set_index_set_;
load_all_ = set_load_all_; load_all_ = set_load_all_;
}); });
}
} }
SparsePageDMatrix::ColPageIter::~ColPageIter() { SparsePageDMatrix::ColPageIter::~ColPageIter() {
@ -47,10 +55,12 @@ SparsePageDMatrix::ColPageIter::~ColPageIter() {
} }
bool SparsePageDMatrix::ColPageIter::Next() { bool SparsePageDMatrix::ColPageIter::Next() {
// doing clock rotation over shards.
if (page_ != nullptr) { if (page_ != nullptr) {
prefetcher_.Recycle(&page_); size_t n = prefetchers_.size();
prefetchers_[(clock_ptr_ + n - 1) % n]->Recycle(&page_);
} }
if (prefetcher_.Next(&page_)) { if (prefetchers_[clock_ptr_]->Next(&page_)) {
out_.col_index = dmlc::BeginPtr(index_set_); out_.col_index = dmlc::BeginPtr(index_set_);
col_data_.resize(page_->offset.size() - 1, SparseBatch::Inst(nullptr, 0)); col_data_.resize(page_->offset.size() - 1, SparseBatch::Inst(nullptr, 0));
for (size_t i = 0; i < col_data_.size(); ++i) { for (size_t i = 0; i < col_data_.size(); ++i) {
@ -60,18 +70,26 @@ bool SparsePageDMatrix::ColPageIter::Next() {
} }
out_.col_data = dmlc::BeginPtr(col_data_); out_.col_data = dmlc::BeginPtr(col_data_);
out_.size = col_data_.size(); out_.size = col_data_.size();
// advance clock
clock_ptr_ = (clock_ptr_ + 1) % prefetchers_.size();
return true; return true;
} else { } else {
return false; return false;
} }
} }
void SparsePageDMatrix::ColPageIter::BeforeFirst() {
clock_ptr_ = 0;
for (auto& p : prefetchers_) {
p->BeforeFirst();
}
}
void SparsePageDMatrix::ColPageIter::Init(const std::vector<bst_uint>& index_set, void SparsePageDMatrix::ColPageIter::Init(const std::vector<bst_uint>& index_set,
bool load_all) { bool load_all) {
set_index_set_ = index_set; set_index_set_ = index_set;
set_load_all_ = load_all; set_load_all_ = load_all;
std::sort(set_index_set_.begin(), set_index_set_.end()); std::sort(set_index_set_.begin(), set_index_set_.end());
this->BeforeFirst(); this->BeforeFirst();
} }
@ -103,8 +121,9 @@ ColIterator(const std::vector<bst_uint>& fset) {
bool SparsePageDMatrix::TryInitColData() { bool SparsePageDMatrix::TryInitColData() {
// load meta data. // load meta data.
std::vector<std::string> cache_shards = common::Split(cache_info_, ':');
{ {
std::string col_meta_name = cache_prefix_ + ".col.meta"; std::string col_meta_name = cache_shards[0] + ".col.meta";
std::unique_ptr<dmlc::Stream> fmeta( std::unique_ptr<dmlc::Stream> fmeta(
dmlc::Stream::Create(col_meta_name.c_str(), "r", true)); dmlc::Stream::Create(col_meta_name.c_str(), "r", true));
if (fmeta.get() == nullptr) return false; if (fmeta.get() == nullptr) return false;
@ -112,13 +131,15 @@ bool SparsePageDMatrix::TryInitColData() {
CHECK(fmeta->Read(&col_size_)) << "invalid col.meta file"; CHECK(fmeta->Read(&col_size_)) << "invalid col.meta file";
} }
// load real data // load real data
{ std::vector<std::unique_ptr<dmlc::SeekStream> > files;
std::string col_data_name = cache_prefix_ + ".col.page"; for (const std::string& prefix : cache_shards) {
std::string col_data_name = prefix + ".col.page";
std::unique_ptr<dmlc::SeekStream> fdata( std::unique_ptr<dmlc::SeekStream> fdata(
dmlc::SeekStream::CreateForRead(col_data_name.c_str(), true)); dmlc::SeekStream::CreateForRead(col_data_name.c_str(), true));
if (fdata.get() == nullptr) return false; if (fdata.get() == nullptr) return false;
col_iter_.reset(new ColPageIter(std::move(fdata))); files.push_back(std::move(fdata));
} }
col_iter_.reset(new ColPageIter(std::move(files)));
return true; return true;
} }
@ -135,24 +156,17 @@ void SparsePageDMatrix::InitColAccess(const std::vector<bool>& enabled,
buffered_rowset_.clear(); buffered_rowset_.clear();
col_size_.resize(info.num_col); col_size_.resize(info.num_col);
std::fill(col_size_.begin(), col_size_.end(), 0); std::fill(col_size_.begin(), col_size_.end(), 0);
// make the sparse page.
dmlc::ThreadedIter<SparsePage> cmaker;
SparsePage tmp;
size_t batch_ptr = 0, batch_top = 0;
dmlc::DataIter<RowBatch>* iter = this->RowIterator(); dmlc::DataIter<RowBatch>* iter = this->RowIterator();
std::bernoulli_distribution coin_flip(pkeep); std::bernoulli_distribution coin_flip(pkeep);
size_t batch_ptr = 0, batch_top = 0;
SparsePage tmp;
auto& rnd = common::GlobalRandom(); auto& rnd = common::GlobalRandom();
// function to create the page. // function to create the page.
auto make_col_batch = [&] ( auto make_col_batch = [&] (
const SparsePage& prow, const SparsePage& prow,
const bst_uint* ridx, const bst_uint* ridx,
SparsePage **dptr) { SparsePage *pcol) {
if (*dptr == nullptr) {
*dptr = new SparsePage();
}
SparsePage* pcol = *dptr;
pcol->Clear(); pcol->Clear();
pcol->min_index = ridx[0]; pcol->min_index = ridx[0];
int nthread; int nthread;
@ -199,7 +213,7 @@ void SparsePageDMatrix::InitColAccess(const std::vector<bool>& enabled,
} }
}; };
auto make_next_col = [&] (SparsePage** dptr) { auto make_next_col = [&] (SparsePage* dptr) {
tmp.Clear(); tmp.Clear();
size_t btop = buffered_rowset_.size(); size_t btop = buffered_rowset_.size();
@ -236,41 +250,44 @@ void SparsePageDMatrix::InitColAccess(const std::vector<bool>& enabled,
} }
}; };
cmaker.Init(make_next_col, []() {}); std::vector<std::string> cache_shards = common::Split(cache_info_, ':');
std::vector<std::string> name_shards, format_shards;
std::string col_data_name = cache_prefix_ + ".col.page"; for (const std::string& prefix : cache_shards) {
std::unique_ptr<dmlc::Stream> fo(dmlc::Stream::Create(col_data_name.c_str(), "w")); name_shards.push_back(prefix + ".col.page");
// find format. format_shards.push_back(SparsePage::Format::DecideFormat(prefix).second);
std::string name_format = SparsePage::Format::DecideFormat(cache_prefix_).second; }
fo->Write(name_format); SparsePage::Writer writer(name_shards, format_shards, 6);
std::unique_ptr<SparsePage::Format> format(SparsePage::Format::Create(name_format)); std::unique_ptr<SparsePage> page;
writer.Alloc(&page); page->Clear();
double tstart = dmlc::GetTime(); double tstart = dmlc::GetTime();
size_t bytes_write = 0; size_t bytes_write = 0;
// print every 4 sec. // print every 4 sec.
const double kStep = 4.0; const double kStep = 4.0;
size_t tick_expected = kStep; size_t tick_expected = kStep;
SparsePage* pcol = nullptr;
while (cmaker.Next(&pcol)) { while (make_next_col(page.get())) {
for (size_t i = 0; i < pcol->Size(); ++i) { for (size_t i = 0; i < page->Size(); ++i) {
col_size_[i] += pcol->offset[i + 1] - pcol->offset[i]; col_size_[i] += page->offset[i + 1] - page->offset[i];
} }
format->Write(*pcol, fo.get());
size_t spage = pcol->MemCostBytes(); bytes_write += page->MemCostBytes();
bytes_write += spage; writer.PushWrite(std::move(page));
writer.Alloc(&page);
page->Clear();
double tdiff = dmlc::GetTime() - tstart; double tdiff = dmlc::GetTime() - tstart;
if (tdiff >= tick_expected) { if (tdiff >= tick_expected) {
LOG(CONSOLE) << "Writing to " << col_data_name LOG(CONSOLE) << "Writing col.page file to " << cache_info_
<< " in " << ((bytes_write >> 20UL) / tdiff) << " MB/s, " << " in " << ((bytes_write >> 20UL) / tdiff) << " MB/s, "
<< (bytes_write >> 20UL) << " MB writen"; << (bytes_write >> 20UL) << " MB writen";
tick_expected += kStep; tick_expected += kStep;
} }
cmaker.Recycle(&pcol);
} }
// save meta data // save meta data
std::string col_meta_name = cache_prefix_ + ".col.meta"; std::string col_meta_name = cache_shards[0] + ".col.meta";
fo.reset(dmlc::Stream::Create(col_meta_name.c_str(), "w")); std::unique_ptr<dmlc::Stream> fo(
dmlc::Stream::Create(col_meta_name.c_str(), "w"));
fo->Write(buffered_rowset_); fo->Write(buffered_rowset_);
fo->Write(col_size_); fo->Write(col_size_);
fo.reset(nullptr); fo.reset(nullptr);

View File

@ -14,6 +14,7 @@
#include <algorithm> #include <algorithm>
#include <string> #include <string>
#include "./sparse_batch_page.h" #include "./sparse_batch_page.h"
#include "../common/common.h"
namespace xgboost { namespace xgboost {
namespace data { namespace data {
@ -21,9 +22,9 @@ namespace data {
class SparsePageDMatrix : public DMatrix { class SparsePageDMatrix : public DMatrix {
public: public:
explicit SparsePageDMatrix(std::unique_ptr<DataSource>&& source, explicit SparsePageDMatrix(std::unique_ptr<DataSource>&& source,
const std::string& cache_prefix) const std::string& cache_info)
: source_(std::move(source)), : source_(std::move(source)), cache_info_(cache_info) {
cache_prefix_(cache_prefix) {} }
MetaInfo& info() override { MetaInfo& info() override {
return source_->info; return source_->info;
@ -77,11 +78,9 @@ class SparsePageDMatrix : public DMatrix {
// declare the column batch iter. // declare the column batch iter.
class ColPageIter : public dmlc::DataIter<ColBatch> { class ColPageIter : public dmlc::DataIter<ColBatch> {
public: public:
explicit ColPageIter(std::unique_ptr<dmlc::SeekStream>&& fi); explicit ColPageIter(std::vector<std::unique_ptr<dmlc::SeekStream> >&& files);
virtual ~ColPageIter(); virtual ~ColPageIter();
void BeforeFirst() override { void BeforeFirst() override;
prefetcher_.BeforeFirst();
}
const ColBatch &Value() const override { const ColBatch &Value() const override {
return out_; return out_;
} }
@ -90,20 +89,22 @@ class SparsePageDMatrix : public DMatrix {
void Init(const std::vector<bst_uint>& index_set, bool load_all); void Init(const std::vector<bst_uint>& index_set, bool load_all);
private: private:
// data file pointer.
std::unique_ptr<dmlc::SeekStream> fi_;
// the temp page. // the temp page.
SparsePage* page_; SparsePage* page_;
// internal clock ptr.
size_t clock_ptr_;
// data file pointer.
std::vector<std::unique_ptr<dmlc::SeekStream> > files_;
// page format. // page format.
std::unique_ptr<SparsePage::Format> format_; std::vector<std::unique_ptr<SparsePage::Format> > formats_;
/*! \brief internal prefetcher. */
std::vector<std::unique_ptr<dmlc::ThreadedIter<SparsePage> > > prefetchers_;
// The index set to be loaded. // The index set to be loaded.
std::vector<bst_uint> index_set_; std::vector<bst_uint> index_set_;
// The index set by the outsiders // The index set by the outsiders
std::vector<bst_uint> set_index_set_; std::vector<bst_uint> set_index_set_;
// whether to load data dataset. // whether to load data dataset.
bool set_load_all_, load_all_; bool set_load_all_, load_all_;
// data prefetcher.
dmlc::ThreadedIter<SparsePage> prefetcher_;
// temporal space for batch // temporal space for batch
ColBatch out_; ColBatch out_;
// the pointer data. // the pointer data.
@ -117,7 +118,7 @@ class SparsePageDMatrix : public DMatrix {
// source data pointer. // source data pointer.
std::unique_ptr<DataSource> source_; std::unique_ptr<DataSource> source_;
// the cache prefix // the cache prefix
std::string cache_prefix_; std::string cache_info_;
/*! \brief list of row index that are buffered */ /*! \brief list of row index that are buffered */
std::vector<bst_uint> buffered_rowset_; std::vector<bst_uint> buffered_rowset_;
// count for column data // count for column data

View File

@ -9,35 +9,45 @@
#if DMLC_ENABLE_STD_THREAD #if DMLC_ENABLE_STD_THREAD
#include "./sparse_page_source.h" #include "./sparse_page_source.h"
#include "../common/common.h"
namespace xgboost { namespace xgboost {
namespace data { namespace data {
SparsePageSource::SparsePageSource(const std::string& cache_prefix) SparsePageSource::SparsePageSource(const std::string& cache_info)
: base_rowid_(0), page_(nullptr) { : base_rowid_(0), page_(nullptr), clock_ptr_(0) {
// read in the info files. // read in the info files
std::vector<std::string> cache_shards = common::Split(cache_info, ':');
CHECK_NE(cache_shards.size(), 0);
{ {
std::string name_info = cache_prefix; std::string name_info = cache_shards[0];
std::unique_ptr<dmlc::Stream> finfo(dmlc::Stream::Create(name_info.c_str(), "r")); std::unique_ptr<dmlc::Stream> finfo(dmlc::Stream::Create(name_info.c_str(), "r"));
int tmagic; int tmagic;
CHECK_EQ(finfo->Read(&tmagic, sizeof(tmagic)), sizeof(tmagic)); CHECK_EQ(finfo->Read(&tmagic, sizeof(tmagic)), sizeof(tmagic));
this->info.LoadBinary(finfo.get()); this->info.LoadBinary(finfo.get());
} }
files_.resize(cache_shards.size());
formats_.resize(cache_shards.size());
prefetchers_.resize(cache_shards.size());
// read in the cache files. // read in the cache files.
std::string name_row = cache_prefix + ".row.page"; for (size_t i = 0; i < cache_shards.size(); ++i) {
fi_.reset(dmlc::SeekStream::CreateForRead(name_row.c_str())); std::string name_row = cache_shards[i] + ".row.page";
files_[i].reset(dmlc::SeekStream::CreateForRead(name_row.c_str()));
dmlc::SeekStream* fi = files_[i].get();
std::string format; std::string format;
CHECK(fi_->Read(&format)) << "Invalid page format"; CHECK(fi->Read(&format)) << "Invalid page format";
format_.reset(SparsePage::Format::Create(format)); formats_[i].reset(SparsePage::Format::Create(format));
size_t fbegin = fi_->Tell(); SparsePage::Format* fmt = formats_[i].get();
size_t fbegin = fi->Tell();
prefetcher_.Init([this] (SparsePage** dptr) { prefetchers_[i].reset(new dmlc::ThreadedIter<SparsePage>(4));
prefetchers_[i]->Init([fi, fmt] (SparsePage** dptr) {
if (*dptr == nullptr) { if (*dptr == nullptr) {
*dptr = new SparsePage(); *dptr = new SparsePage();
} }
return format_->Read(*dptr, fi_.get()); return fmt->Read(*dptr, fi);
}, [this, fbegin] () { fi_->Seek(fbegin); }); }, [fi, fbegin] () { fi->Seek(fbegin); });
}
} }
SparsePageSource::~SparsePageSource() { SparsePageSource::~SparsePageSource() {
@ -45,12 +55,16 @@ SparsePageSource::~SparsePageSource() {
} }
bool SparsePageSource::Next() { bool SparsePageSource::Next() {
// doing clock rotation over shards.
if (page_ != nullptr) { if (page_ != nullptr) {
prefetcher_.Recycle(&page_); size_t n = prefetchers_.size();
prefetchers_[(clock_ptr_ + n - 1) % n]->Recycle(&page_);
} }
if (prefetcher_.Next(&page_)) { if (prefetchers_[clock_ptr_]->Next(&page_)) {
batch_ = page_->GetRowBatch(base_rowid_); batch_ = page_->GetRowBatch(base_rowid_);
base_rowid_ += batch_.size; base_rowid_ += batch_.size;
// advance clock
clock_ptr_ = (clock_ptr_ + 1) % prefetchers_.size();
return true; return true;
} else { } else {
return false; return false;
@ -59,33 +73,48 @@ bool SparsePageSource::Next() {
void SparsePageSource::BeforeFirst() { void SparsePageSource::BeforeFirst() {
base_rowid_ = 0; base_rowid_ = 0;
prefetcher_.BeforeFirst(); clock_ptr_ = 0;
for (auto& p : prefetchers_) {
p->BeforeFirst();
}
} }
const RowBatch& SparsePageSource::Value() const { const RowBatch& SparsePageSource::Value() const {
return batch_; return batch_;
} }
bool SparsePageSource::CacheExist(const std::string& cache_prefix) { bool SparsePageSource::CacheExist(const std::string& cache_info) {
std::string name_info = cache_prefix; std::vector<std::string> cache_shards = common::Split(cache_info, ':');
std::string name_row = cache_prefix + ".row.page"; CHECK_NE(cache_shards.size(), 0);
{
std::string name_info = cache_shards[0];
std::unique_ptr<dmlc::Stream> finfo(dmlc::Stream::Create(name_info.c_str(), "r", true)); std::unique_ptr<dmlc::Stream> finfo(dmlc::Stream::Create(name_info.c_str(), "r", true));
if (finfo.get() == nullptr) return false;
}
for (const std::string& prefix : cache_shards) {
std::string name_row = prefix + ".row.page";
std::unique_ptr<dmlc::Stream> frow(dmlc::Stream::Create(name_row.c_str(), "r", true)); std::unique_ptr<dmlc::Stream> frow(dmlc::Stream::Create(name_row.c_str(), "r", true));
return finfo.get() != nullptr && frow.get() != nullptr; if (frow.get() == nullptr) return false;
}
return true;
} }
void SparsePageSource::Create(dmlc::Parser<uint32_t>* src, void SparsePageSource::Create(dmlc::Parser<uint32_t>* src,
const std::string& cache_prefix) { const std::string& cache_info) {
std::vector<std::string> cache_shards = common::Split(cache_info, ':');
CHECK_NE(cache_shards.size(), 0);
// read in the info files. // read in the info files.
std::string name_info = cache_prefix; std::string name_info = cache_shards[0];
std::string name_row = cache_prefix + ".row.page"; std::vector<std::string> name_shards, format_shards;
std::unique_ptr<dmlc::Stream> fo(dmlc::Stream::Create(name_row.c_str(), "w")); for (const std::string& prefix : cache_shards) {
std::string name_format = SparsePage::Format::DecideFormat(cache_prefix).first; name_shards.push_back(prefix + ".row.page");
fo->Write(name_format); format_shards.push_back(SparsePage::Format::DecideFormat(prefix).first);
std::unique_ptr<SparsePage::Format> format(SparsePage::Format::Create(name_format)); }
SparsePage::Writer writer(name_shards, format_shards, 6);
std::unique_ptr<SparsePage> page;
writer.Alloc(&page); page->Clear();
MetaInfo info; MetaInfo info;
SparsePage page;
size_t bytes_write = 0; size_t bytes_write = 0;
double tstart = dmlc::GetTime(); double tstart = dmlc::GetTime();
// print every 4 sec. // print every 4 sec.
@ -107,14 +136,16 @@ void SparsePageSource::Create(dmlc::Parser<uint32_t>* src,
info.num_col = std::max(info.num_col, info.num_col = std::max(info.num_col,
static_cast<uint64_t>(index + 1)); static_cast<uint64_t>(index + 1));
} }
page.Push(batch); page->Push(batch);
if (page.MemCostBytes() >= kPageSize) { if (page->MemCostBytes() >= kPageSize) {
bytes_write += page.MemCostBytes(); bytes_write += page->MemCostBytes();
format->Write(page, fo.get()); writer.PushWrite(std::move(page));
page.Clear(); writer.Alloc(&page);
page->Clear();
double tdiff = dmlc::GetTime() - tstart; double tdiff = dmlc::GetTime() - tstart;
if (tdiff >= tick_expected) { if (tdiff >= tick_expected) {
LOG(CONSOLE) << "Writing to " << name_row << " in " LOG(CONSOLE) << "Writing row.page to " << cache_info << " in "
<< ((bytes_write >> 20UL) / tdiff) << " MB/s, " << ((bytes_write >> 20UL) / tdiff) << " MB/s, "
<< (bytes_write >> 20UL) << " written"; << (bytes_write >> 20UL) << " written";
tick_expected += kStep; tick_expected += kStep;
@ -122,57 +153,62 @@ void SparsePageSource::Create(dmlc::Parser<uint32_t>* src,
} }
} }
if (page.data.size() != 0) { if (page->data.size() != 0) {
format->Write(page, fo.get()); writer.PushWrite(std::move(page));
} }
fo.reset(dmlc::Stream::Create(name_info.c_str(), "w")); std::unique_ptr<dmlc::Stream> fo(
dmlc::Stream::Create(name_info.c_str(), "w"));
int tmagic = kMagic; int tmagic = kMagic;
fo->Write(&tmagic, sizeof(tmagic)); fo->Write(&tmagic, sizeof(tmagic));
info.SaveBinary(fo.get()); info.SaveBinary(fo.get());
LOG(CONSOLE) << "SparsePageSource: Finished writing to " << name_info;
LOG(CONSOLE) << "SparsePageSource: Finished writing to " << cache_prefix;
} }
void SparsePageSource::Create(DMatrix* src, void SparsePageSource::Create(DMatrix* src,
const std::string& cache_prefix) { const std::string& cache_info) {
std::vector<std::string> cache_shards = common::Split(cache_info, ':');
CHECK_NE(cache_shards.size(), 0);
// read in the info files. // read in the info files.
std::string name_info = cache_prefix; std::string name_info = cache_shards[0];
std::string name_row = cache_prefix + ".row.page"; std::vector<std::string> name_shards, format_shards;
std::unique_ptr<dmlc::Stream> fo(dmlc::Stream::Create(name_row.c_str(), "w")); for (const std::string& prefix : cache_shards) {
// find format. name_shards.push_back(prefix + ".row.page");
std::string name_format = SparsePage::Format::DecideFormat(cache_prefix).first; format_shards.push_back(SparsePage::Format::DecideFormat(prefix).first);
fo->Write(name_format); }
std::unique_ptr<SparsePage::Format> format(SparsePage::Format::Create(name_format)); SparsePage::Writer writer(name_shards, format_shards, 6);
std::unique_ptr<SparsePage> page;
writer.Alloc(&page); page->Clear();
SparsePage page; MetaInfo info;
size_t bytes_write = 0; size_t bytes_write = 0;
double tstart = dmlc::GetTime(); double tstart = dmlc::GetTime();
dmlc::DataIter<RowBatch>* iter = src->RowIterator(); dmlc::DataIter<RowBatch>* iter = src->RowIterator();
while (iter->Next()) { while (iter->Next()) {
page.Push(iter->Value()); page->Push(iter->Value());
if (page.MemCostBytes() >= kPageSize) { if (page->MemCostBytes() >= kPageSize) {
bytes_write += page.MemCostBytes(); bytes_write += page->MemCostBytes();
format->Write(page, fo.get()); writer.PushWrite(std::move(page));
page.Clear(); writer.Alloc(&page);
page->Clear();
double tdiff = dmlc::GetTime() - tstart; double tdiff = dmlc::GetTime() - tstart;
LOG(CONSOLE) << "Writing to " << name_row << " in " LOG(CONSOLE) << "Writing to " << cache_info << " in "
<< ((bytes_write >> 20UL) / tdiff) << " MB/s, " << ((bytes_write >> 20UL) / tdiff) << " MB/s, "
<< (bytes_write >> 20UL) << " written"; << (bytes_write >> 20UL) << " written";
} }
} }
if (page.data.size() != 0) { if (page->data.size() != 0) {
format->Write(page, fo.get()); writer.PushWrite(std::move(page));
} }
fo.reset(dmlc::Stream::Create(name_info.c_str(), "w")); std::unique_ptr<dmlc::Stream> fo(
dmlc::Stream::Create(name_info.c_str(), "w"));
int tmagic = kMagic; int tmagic = kMagic;
fo->Write(&tmagic, sizeof(tmagic)); fo->Write(&tmagic, sizeof(tmagic));
src->info().SaveBinary(fo.get()); info.SaveBinary(fo.get());
LOG(CONSOLE) << "SparsePageSource: Finished writing to " << name_info;
LOG(CONSOLE) << "SparsePageSource: Finished writing to " << cache_prefix;
} }
} // namespace data } // namespace data

View File

@ -71,14 +71,14 @@ class SparsePageSource : public DataSource {
RowBatch batch_; RowBatch batch_;
/*! \brief page currently on hold. */ /*! \brief page currently on hold. */
SparsePage *page_; SparsePage *page_;
/*! \brief The cache predix of the dataset. */ /*! \brief internal clock ptr */
std::string cache_prefix_; size_t clock_ptr_;
/*! \brief file pointer to the row blob file. */ /*! \brief file pointer to the row blob file. */
std::unique_ptr<dmlc::SeekStream> fi_; std::vector<std::unique_ptr<dmlc::SeekStream> > files_;
/*! \brief Sparse page format file. */ /*! \brief Sparse page format file. */
std::unique_ptr<SparsePage::Format> format_; std::vector<std::unique_ptr<SparsePage::Format> > formats_;
/*! \brief internal prefetcher. */ /*! \brief internal prefetcher. */
dmlc::ThreadedIter<SparsePage> prefetcher_; std::vector<std::unique_ptr<dmlc::ThreadedIter<SparsePage> > > prefetchers_;
}; };
} // namespace data } // namespace data
} // namespace xgboost } // namespace xgboost

View File

@ -0,0 +1,72 @@
/*!
* Copyright (c) 2015 by Contributors
* \file sparse_batch_writer.cc
* \param Writer class sparse page.
*/
#include <xgboost/base.h>
#include <xgboost/logging.h>
#include "./sparse_batch_page.h"
#if DMLC_ENABLE_STD_THREAD
namespace xgboost {
namespace data {
SparsePage::Writer::Writer(
const std::vector<std::string>& name_shards,
const std::vector<std::string>& format_shards,
size_t extra_buffer_capacity)
: num_free_buffer_(extra_buffer_capacity + name_shards.size()),
clock_ptr_(0),
workers_(name_shards.size()),
qworkers_(name_shards.size()) {
CHECK_EQ(name_shards.size(), format_shards.size());
// start writer threads
for (size_t i = 0; i < name_shards.size(); ++i) {
std::string name_shard = name_shards[i];
std::string format_shard = format_shards[i];
auto* wqueue = &qworkers_[i];
workers_[i].reset(new std::thread(
[this, name_shard, format_shard, wqueue] () {
std::unique_ptr<dmlc::Stream> fo(
dmlc::Stream::Create(name_shard.c_str(), "w"));
std::unique_ptr<SparsePage::Format> fmt(
SparsePage::Format::Create(format_shard));
fo->Write(format_shard);
std::unique_ptr<SparsePage> page;
while (wqueue->Pop(&page)) {
fmt->Write(*page, fo.get());
qrecycle_.Push(std::move(page));
}
fo.reset(nullptr);
LOG(CONSOLE) << "SparsePage::Writer Finished writing to " << name_shard;
}));
}
}
SparsePage::Writer::~Writer() {
for (auto& queue : qworkers_) {
queue.SignalForKill();
}
for (auto& thread : workers_) {
thread->join();
}
}
void SparsePage::Writer::PushWrite(std::unique_ptr<SparsePage>&& page) {
qworkers_[clock_ptr_].Push(std::move(page));
clock_ptr_ = (clock_ptr_ + 1) % workers_.size();
}
void SparsePage::Writer::Alloc(std::unique_ptr<SparsePage>* out_page) {
CHECK(out_page->get() == nullptr);
if (num_free_buffer_ != 0) {
out_page->reset(new SparsePage());
--num_free_buffer_;
} else {
CHECK(qrecycle_.Pop(out_page));
}
}
} // namespace data
} // namespace xgboost
#endif // DMLC_ENABLE_STD_THREAD