diff --git a/dmlc-core b/dmlc-core index ad2ddde8b..257b09a0b 160000 --- a/dmlc-core +++ b/dmlc-core @@ -1 +1 @@ -Subproject commit ad2ddde8b6624abf3007a71b2923c3925530cc81 +Subproject commit 257b09a0ba18625a9fcf3b9471a9b1c35a767b7b diff --git a/src/common/common.h b/src/common/common.h new file mode 100644 index 000000000..d8adadae3 --- /dev/null +++ b/src/common/common.h @@ -0,0 +1,31 @@ +/*! + * Copyright 2015 by Contributors + * \file common.h + * \brief Common utilities + */ +#ifndef XGBOOST_COMMON_COMMON_H_ +#define XGBOOST_COMMON_COMMON_H_ + +#include +#include +#include + +namespace xgboost { +namespace common { +/*! + * \brief Split a string by delimiter + * \param s String to be splitted. + * \param delim The delimiter. + */ +inline std::vector Split(const std::string& s, char delim) { + std::string item; + std::istringstream is(s); + std::vector ret; + while (std::getline(is, item, delim)) { + ret.push_back(item); + } + return ret; +} +} // namespace common +} // namespace xgboost +#endif // XGBOOST_COMMON_COMMON_H_ diff --git a/src/data/sparse_batch_page.h b/src/data/sparse_batch_page.h index 41893e6b5..81dbe1368 100644 --- a/src/data/sparse_batch_page.h +++ b/src/data/sparse_batch_page.h @@ -16,6 +16,12 @@ #include #include #include +#include + +#if DMLC_ENABLE_STD_THREAD +#include +#include +#endif namespace xgboost { namespace data { @@ -26,6 +32,8 @@ class SparsePage { public: /*! \brief Format of the sparse page. */ class Format; + /*! \brief Writer to write the sparse page to files. */ + class Writer; /*! \brief minimum index of all index, used as hint for compression. */ bst_uint min_index; /*! \brief offset of the segments */ @@ -171,6 +179,53 @@ class SparsePage::Format { static std::pair DecideFormat(const std::string& cache_prefix); }; +#if DMLC_ENABLE_STD_THREAD +/*! + * \brief A threaded writer to write sparse batch page to sharded files. + */ +class SparsePage::Writer { + public: + /*! + * \brief constructor + * \param name_shards name of shard files. + * \param format_shards format of each shard. + * \param extra_buffer_capacity Extra buffer capacity before block. + */ + explicit Writer( + const std::vector& name_shards, + const std::vector& format_shards, + size_t extra_buffer_capacity); + /*! \brief destructor, will close the files automatically */ + ~Writer(); + /*! + * \brief Push a write job to the writer. + * This function won't block, + * writing is done by another thread inside writer. + * \param page The page to be wriiten + */ + void PushWrite(std::unique_ptr&& page); + /*! + * \brief Allocate a page to store results. + * This function can block when the writer is too slow and buffer pages + * have not yet been recycled. + * \param out_page Used to store the allocated pages. + */ + void Alloc(std::unique_ptr* out_page); + + private: + /*! \brief number of allocated pages */ + size_t num_free_buffer_; + /*! \brief clock_pointer */ + size_t clock_ptr_; + /*! \brief writer threads */ + std::vector > workers_; + /*! \brief recycler queue */ + dmlc::ConcurrentBlockingQueue > qrecycle_; + /*! \brief worker threads */ + std::vector > > qworkers_; +}; +#endif // DMLC_ENABLE_STD_THREAD + /*! * \brief Registry entry for sparse page format. */ diff --git a/src/data/sparse_page_dmatrix.cc b/src/data/sparse_page_dmatrix.cc index 0cbf27e5d..eb3ef3ca0 100644 --- a/src/data/sparse_page_dmatrix.cc +++ b/src/data/sparse_page_dmatrix.cc @@ -12,34 +12,42 @@ #if DMLC_ENABLE_STD_THREAD #include "./sparse_page_dmatrix.h" #include "../common/random.h" +#include "../common/common.h" #include "../common/group_data.h" namespace xgboost { namespace data { -SparsePageDMatrix::ColPageIter::ColPageIter(std::unique_ptr&& fi) - : fi_(std::move(fi)), page_(nullptr) { +SparsePageDMatrix::ColPageIter::ColPageIter( + std::vector >&& files) + : page_(nullptr), clock_ptr_(0), files_(std::move(files)) { load_all_ = false; + formats_.resize(files_.size()); + prefetchers_.resize(files_.size()); - std::string format; - CHECK(fi_->Read(&format)) << "Invalid page format"; - format_.reset(SparsePage::Format::Create(format)); - size_t fbegin = fi_->Tell(); - - prefetcher_.Init([this](SparsePage** dptr) { - if (*dptr == nullptr) { - *dptr = new SparsePage(); - } - if (load_all_) { - return format_->Read(*dptr, fi_.get()); - } else { - return format_->Read(*dptr, fi_.get(), index_set_); - } - }, [this, fbegin] () { - fi_->Seek(fbegin); - index_set_ = set_index_set_; - load_all_ = set_load_all_; - }); + for (size_t i = 0; i < files_.size(); ++i) { + dmlc::SeekStream* fi = files_[i].get(); + std::string format; + CHECK(fi->Read(&format)) << "Invalid page format"; + formats_[i].reset(SparsePage::Format::Create(format)); + SparsePage::Format* fmt = formats_[i].get(); + size_t fbegin = fi->Tell(); + prefetchers_[i].reset(new dmlc::ThreadedIter(4)); + prefetchers_[i]->Init([this, fi, fmt] (SparsePage** dptr) { + if (*dptr == nullptr) { + *dptr = new SparsePage(); + } + if (load_all_) { + return fmt->Read(*dptr, fi); + } else { + return fmt->Read(*dptr, fi, index_set_); + } + }, [this, fi, fbegin] () { + fi->Seek(fbegin); + index_set_ = set_index_set_; + load_all_ = set_load_all_; + }); + } } SparsePageDMatrix::ColPageIter::~ColPageIter() { @@ -47,10 +55,12 @@ SparsePageDMatrix::ColPageIter::~ColPageIter() { } bool SparsePageDMatrix::ColPageIter::Next() { + // doing clock rotation over shards. if (page_ != nullptr) { - prefetcher_.Recycle(&page_); + size_t n = prefetchers_.size(); + prefetchers_[(clock_ptr_ + n - 1) % n]->Recycle(&page_); } - if (prefetcher_.Next(&page_)) { + if (prefetchers_[clock_ptr_]->Next(&page_)) { out_.col_index = dmlc::BeginPtr(index_set_); col_data_.resize(page_->offset.size() - 1, SparseBatch::Inst(nullptr, 0)); for (size_t i = 0; i < col_data_.size(); ++i) { @@ -60,18 +70,26 @@ bool SparsePageDMatrix::ColPageIter::Next() { } out_.col_data = dmlc::BeginPtr(col_data_); out_.size = col_data_.size(); + // advance clock + clock_ptr_ = (clock_ptr_ + 1) % prefetchers_.size(); return true; } else { return false; } } +void SparsePageDMatrix::ColPageIter::BeforeFirst() { + clock_ptr_ = 0; + for (auto& p : prefetchers_) { + p->BeforeFirst(); + } +} + void SparsePageDMatrix::ColPageIter::Init(const std::vector& index_set, bool load_all) { set_index_set_ = index_set; set_load_all_ = load_all; std::sort(set_index_set_.begin(), set_index_set_.end()); - this->BeforeFirst(); } @@ -103,8 +121,9 @@ ColIterator(const std::vector& fset) { bool SparsePageDMatrix::TryInitColData() { // load meta data. + std::vector cache_shards = common::Split(cache_info_, ':'); { - std::string col_meta_name = cache_prefix_ + ".col.meta"; + std::string col_meta_name = cache_shards[0] + ".col.meta"; std::unique_ptr fmeta( dmlc::Stream::Create(col_meta_name.c_str(), "r", true)); if (fmeta.get() == nullptr) return false; @@ -112,13 +131,15 @@ bool SparsePageDMatrix::TryInitColData() { CHECK(fmeta->Read(&col_size_)) << "invalid col.meta file"; } // load real data - { - std::string col_data_name = cache_prefix_ + ".col.page"; + std::vector > files; + for (const std::string& prefix : cache_shards) { + std::string col_data_name = prefix + ".col.page"; std::unique_ptr fdata( dmlc::SeekStream::CreateForRead(col_data_name.c_str(), true)); if (fdata.get() == nullptr) return false; - col_iter_.reset(new ColPageIter(std::move(fdata))); + files.push_back(std::move(fdata)); } + col_iter_.reset(new ColPageIter(std::move(files))); return true; } @@ -135,24 +156,17 @@ void SparsePageDMatrix::InitColAccess(const std::vector& enabled, buffered_rowset_.clear(); col_size_.resize(info.num_col); std::fill(col_size_.begin(), col_size_.end(), 0); - // make the sparse page. - dmlc::ThreadedIter cmaker; - SparsePage tmp; - size_t batch_ptr = 0, batch_top = 0; dmlc::DataIter* iter = this->RowIterator(); std::bernoulli_distribution coin_flip(pkeep); - + size_t batch_ptr = 0, batch_top = 0; + SparsePage tmp; auto& rnd = common::GlobalRandom(); // function to create the page. auto make_col_batch = [&] ( const SparsePage& prow, const bst_uint* ridx, - SparsePage **dptr) { - if (*dptr == nullptr) { - *dptr = new SparsePage(); - } - SparsePage* pcol = *dptr; + SparsePage *pcol) { pcol->Clear(); pcol->min_index = ridx[0]; int nthread; @@ -199,7 +213,7 @@ void SparsePageDMatrix::InitColAccess(const std::vector& enabled, } }; - auto make_next_col = [&] (SparsePage** dptr) { + auto make_next_col = [&] (SparsePage* dptr) { tmp.Clear(); size_t btop = buffered_rowset_.size(); @@ -236,41 +250,44 @@ void SparsePageDMatrix::InitColAccess(const std::vector& enabled, } }; - cmaker.Init(make_next_col, []() {}); - - std::string col_data_name = cache_prefix_ + ".col.page"; - std::unique_ptr fo(dmlc::Stream::Create(col_data_name.c_str(), "w")); - // find format. - std::string name_format = SparsePage::Format::DecideFormat(cache_prefix_).second; - fo->Write(name_format); - std::unique_ptr format(SparsePage::Format::Create(name_format)); + std::vector cache_shards = common::Split(cache_info_, ':'); + std::vector name_shards, format_shards; + for (const std::string& prefix : cache_shards) { + name_shards.push_back(prefix + ".col.page"); + format_shards.push_back(SparsePage::Format::DecideFormat(prefix).second); + } + SparsePage::Writer writer(name_shards, format_shards, 6); + std::unique_ptr page; + writer.Alloc(&page); page->Clear(); double tstart = dmlc::GetTime(); size_t bytes_write = 0; // print every 4 sec. const double kStep = 4.0; size_t tick_expected = kStep; - SparsePage* pcol = nullptr; - while (cmaker.Next(&pcol)) { - for (size_t i = 0; i < pcol->Size(); ++i) { - col_size_[i] += pcol->offset[i + 1] - pcol->offset[i]; + while (make_next_col(page.get())) { + for (size_t i = 0; i < page->Size(); ++i) { + col_size_[i] += page->offset[i + 1] - page->offset[i]; } - format->Write(*pcol, fo.get()); - size_t spage = pcol->MemCostBytes(); - bytes_write += spage; + + bytes_write += page->MemCostBytes(); + writer.PushWrite(std::move(page)); + writer.Alloc(&page); + page->Clear(); + double tdiff = dmlc::GetTime() - tstart; if (tdiff >= tick_expected) { - LOG(CONSOLE) << "Writing to " << col_data_name + LOG(CONSOLE) << "Writing col.page file to " << cache_info_ << " in " << ((bytes_write >> 20UL) / tdiff) << " MB/s, " << (bytes_write >> 20UL) << " MB writen"; tick_expected += kStep; } - cmaker.Recycle(&pcol); } // save meta data - std::string col_meta_name = cache_prefix_ + ".col.meta"; - fo.reset(dmlc::Stream::Create(col_meta_name.c_str(), "w")); + std::string col_meta_name = cache_shards[0] + ".col.meta"; + std::unique_ptr fo( + dmlc::Stream::Create(col_meta_name.c_str(), "w")); fo->Write(buffered_rowset_); fo->Write(col_size_); fo.reset(nullptr); diff --git a/src/data/sparse_page_dmatrix.h b/src/data/sparse_page_dmatrix.h index e4aebee9c..129d1f016 100644 --- a/src/data/sparse_page_dmatrix.h +++ b/src/data/sparse_page_dmatrix.h @@ -14,6 +14,7 @@ #include #include #include "./sparse_batch_page.h" +#include "../common/common.h" namespace xgboost { namespace data { @@ -21,9 +22,9 @@ namespace data { class SparsePageDMatrix : public DMatrix { public: explicit SparsePageDMatrix(std::unique_ptr&& source, - const std::string& cache_prefix) - : source_(std::move(source)), - cache_prefix_(cache_prefix) {} + const std::string& cache_info) + : source_(std::move(source)), cache_info_(cache_info) { + } MetaInfo& info() override { return source_->info; @@ -77,11 +78,9 @@ class SparsePageDMatrix : public DMatrix { // declare the column batch iter. class ColPageIter : public dmlc::DataIter { public: - explicit ColPageIter(std::unique_ptr&& fi); + explicit ColPageIter(std::vector >&& files); virtual ~ColPageIter(); - void BeforeFirst() override { - prefetcher_.BeforeFirst(); - } + void BeforeFirst() override; const ColBatch &Value() const override { return out_; } @@ -90,20 +89,22 @@ class SparsePageDMatrix : public DMatrix { void Init(const std::vector& index_set, bool load_all); private: - // data file pointer. - std::unique_ptr fi_; // the temp page. SparsePage* page_; + // internal clock ptr. + size_t clock_ptr_; + // data file pointer. + std::vector > files_; // page format. - std::unique_ptr format_; + std::vector > formats_; + /*! \brief internal prefetcher. */ + std::vector > > prefetchers_; // The index set to be loaded. std::vector index_set_; // The index set by the outsiders std::vector set_index_set_; // whether to load data dataset. bool set_load_all_, load_all_; - // data prefetcher. - dmlc::ThreadedIter prefetcher_; // temporal space for batch ColBatch out_; // the pointer data. @@ -117,7 +118,7 @@ class SparsePageDMatrix : public DMatrix { // source data pointer. std::unique_ptr source_; // the cache prefix - std::string cache_prefix_; + std::string cache_info_; /*! \brief list of row index that are buffered */ std::vector buffered_rowset_; // count for column data diff --git a/src/data/sparse_page_source.cc b/src/data/sparse_page_source.cc index 5730da9b5..3f499739f 100644 --- a/src/data/sparse_page_source.cc +++ b/src/data/sparse_page_source.cc @@ -9,35 +9,45 @@ #if DMLC_ENABLE_STD_THREAD #include "./sparse_page_source.h" +#include "../common/common.h" namespace xgboost { namespace data { -SparsePageSource::SparsePageSource(const std::string& cache_prefix) - : base_rowid_(0), page_(nullptr) { - // read in the info files. +SparsePageSource::SparsePageSource(const std::string& cache_info) + : base_rowid_(0), page_(nullptr), clock_ptr_(0) { + // read in the info files + std::vector cache_shards = common::Split(cache_info, ':'); + CHECK_NE(cache_shards.size(), 0); { - std::string name_info = cache_prefix; + std::string name_info = cache_shards[0]; std::unique_ptr finfo(dmlc::Stream::Create(name_info.c_str(), "r")); int tmagic; CHECK_EQ(finfo->Read(&tmagic, sizeof(tmagic)), sizeof(tmagic)); this->info.LoadBinary(finfo.get()); } + files_.resize(cache_shards.size()); + formats_.resize(cache_shards.size()); + prefetchers_.resize(cache_shards.size()); + // read in the cache files. - std::string name_row = cache_prefix + ".row.page"; - fi_.reset(dmlc::SeekStream::CreateForRead(name_row.c_str())); - - std::string format; - CHECK(fi_->Read(&format)) << "Invalid page format"; - format_.reset(SparsePage::Format::Create(format)); - size_t fbegin = fi_->Tell(); - - prefetcher_.Init([this] (SparsePage** dptr) { - if (*dptr == nullptr) { - *dptr = new SparsePage(); - } - return format_->Read(*dptr, fi_.get()); - }, [this, fbegin] () { fi_->Seek(fbegin); }); + for (size_t i = 0; i < cache_shards.size(); ++i) { + std::string name_row = cache_shards[i] + ".row.page"; + files_[i].reset(dmlc::SeekStream::CreateForRead(name_row.c_str())); + dmlc::SeekStream* fi = files_[i].get(); + std::string format; + CHECK(fi->Read(&format)) << "Invalid page format"; + formats_[i].reset(SparsePage::Format::Create(format)); + SparsePage::Format* fmt = formats_[i].get(); + size_t fbegin = fi->Tell(); + prefetchers_[i].reset(new dmlc::ThreadedIter(4)); + prefetchers_[i]->Init([fi, fmt] (SparsePage** dptr) { + if (*dptr == nullptr) { + *dptr = new SparsePage(); + } + return fmt->Read(*dptr, fi); + }, [fi, fbegin] () { fi->Seek(fbegin); }); + } } SparsePageSource::~SparsePageSource() { @@ -45,12 +55,16 @@ SparsePageSource::~SparsePageSource() { } bool SparsePageSource::Next() { + // doing clock rotation over shards. if (page_ != nullptr) { - prefetcher_.Recycle(&page_); + size_t n = prefetchers_.size(); + prefetchers_[(clock_ptr_ + n - 1) % n]->Recycle(&page_); } - if (prefetcher_.Next(&page_)) { + if (prefetchers_[clock_ptr_]->Next(&page_)) { batch_ = page_->GetRowBatch(base_rowid_); base_rowid_ += batch_.size; + // advance clock + clock_ptr_ = (clock_ptr_ + 1) % prefetchers_.size(); return true; } else { return false; @@ -59,33 +73,48 @@ bool SparsePageSource::Next() { void SparsePageSource::BeforeFirst() { base_rowid_ = 0; - prefetcher_.BeforeFirst(); + clock_ptr_ = 0; + for (auto& p : prefetchers_) { + p->BeforeFirst(); + } } const RowBatch& SparsePageSource::Value() const { return batch_; } -bool SparsePageSource::CacheExist(const std::string& cache_prefix) { - std::string name_info = cache_prefix; - std::string name_row = cache_prefix + ".row.page"; - std::unique_ptr finfo(dmlc::Stream::Create(name_info.c_str(), "r", true)); - std::unique_ptr frow(dmlc::Stream::Create(name_row.c_str(), "r", true)); - return finfo.get() != nullptr && frow.get() != nullptr; +bool SparsePageSource::CacheExist(const std::string& cache_info) { + std::vector cache_shards = common::Split(cache_info, ':'); + CHECK_NE(cache_shards.size(), 0); + { + std::string name_info = cache_shards[0]; + std::unique_ptr finfo(dmlc::Stream::Create(name_info.c_str(), "r", true)); + if (finfo.get() == nullptr) return false; + } + for (const std::string& prefix : cache_shards) { + std::string name_row = prefix + ".row.page"; + std::unique_ptr frow(dmlc::Stream::Create(name_row.c_str(), "r", true)); + if (frow.get() == nullptr) return false; + } + return true; } void SparsePageSource::Create(dmlc::Parser* src, - const std::string& cache_prefix) { + const std::string& cache_info) { + std::vector cache_shards = common::Split(cache_info, ':'); + CHECK_NE(cache_shards.size(), 0); // read in the info files. - std::string name_info = cache_prefix; - std::string name_row = cache_prefix + ".row.page"; - std::unique_ptr fo(dmlc::Stream::Create(name_row.c_str(), "w")); - std::string name_format = SparsePage::Format::DecideFormat(cache_prefix).first; - fo->Write(name_format); - std::unique_ptr format(SparsePage::Format::Create(name_format)); + std::string name_info = cache_shards[0]; + std::vector name_shards, format_shards; + for (const std::string& prefix : cache_shards) { + name_shards.push_back(prefix + ".row.page"); + format_shards.push_back(SparsePage::Format::DecideFormat(prefix).first); + } + SparsePage::Writer writer(name_shards, format_shards, 6); + std::unique_ptr page; + writer.Alloc(&page); page->Clear(); MetaInfo info; - SparsePage page; size_t bytes_write = 0; double tstart = dmlc::GetTime(); // print every 4 sec. @@ -107,14 +136,16 @@ void SparsePageSource::Create(dmlc::Parser* src, info.num_col = std::max(info.num_col, static_cast(index + 1)); } - page.Push(batch); - if (page.MemCostBytes() >= kPageSize) { - bytes_write += page.MemCostBytes(); - format->Write(page, fo.get()); - page.Clear(); + page->Push(batch); + if (page->MemCostBytes() >= kPageSize) { + bytes_write += page->MemCostBytes(); + writer.PushWrite(std::move(page)); + writer.Alloc(&page); + page->Clear(); + double tdiff = dmlc::GetTime() - tstart; if (tdiff >= tick_expected) { - LOG(CONSOLE) << "Writing to " << name_row << " in " + LOG(CONSOLE) << "Writing row.page to " << cache_info << " in " << ((bytes_write >> 20UL) / tdiff) << " MB/s, " << (bytes_write >> 20UL) << " written"; tick_expected += kStep; @@ -122,57 +153,62 @@ void SparsePageSource::Create(dmlc::Parser* src, } } - if (page.data.size() != 0) { - format->Write(page, fo.get()); + if (page->data.size() != 0) { + writer.PushWrite(std::move(page)); } - fo.reset(dmlc::Stream::Create(name_info.c_str(), "w")); + std::unique_ptr fo( + dmlc::Stream::Create(name_info.c_str(), "w")); int tmagic = kMagic; fo->Write(&tmagic, sizeof(tmagic)); info.SaveBinary(fo.get()); - - LOG(CONSOLE) << "SparsePageSource: Finished writing to " << cache_prefix; + LOG(CONSOLE) << "SparsePageSource: Finished writing to " << name_info; } void SparsePageSource::Create(DMatrix* src, - const std::string& cache_prefix) { + const std::string& cache_info) { + std::vector cache_shards = common::Split(cache_info, ':'); + CHECK_NE(cache_shards.size(), 0); // read in the info files. - std::string name_info = cache_prefix; - std::string name_row = cache_prefix + ".row.page"; - std::unique_ptr fo(dmlc::Stream::Create(name_row.c_str(), "w")); - // find format. - std::string name_format = SparsePage::Format::DecideFormat(cache_prefix).first; - fo->Write(name_format); - std::unique_ptr format(SparsePage::Format::Create(name_format)); + std::string name_info = cache_shards[0]; + std::vector name_shards, format_shards; + for (const std::string& prefix : cache_shards) { + name_shards.push_back(prefix + ".row.page"); + format_shards.push_back(SparsePage::Format::DecideFormat(prefix).first); + } + SparsePage::Writer writer(name_shards, format_shards, 6); + std::unique_ptr page; + writer.Alloc(&page); page->Clear(); - SparsePage page; + MetaInfo info; size_t bytes_write = 0; double tstart = dmlc::GetTime(); dmlc::DataIter* iter = src->RowIterator(); while (iter->Next()) { - page.Push(iter->Value()); - if (page.MemCostBytes() >= kPageSize) { - bytes_write += page.MemCostBytes(); - format->Write(page, fo.get()); - page.Clear(); + page->Push(iter->Value()); + if (page->MemCostBytes() >= kPageSize) { + bytes_write += page->MemCostBytes(); + writer.PushWrite(std::move(page)); + writer.Alloc(&page); + page->Clear(); double tdiff = dmlc::GetTime() - tstart; - LOG(CONSOLE) << "Writing to " << name_row << " in " + LOG(CONSOLE) << "Writing to " << cache_info << " in " << ((bytes_write >> 20UL) / tdiff) << " MB/s, " << (bytes_write >> 20UL) << " written"; } } - if (page.data.size() != 0) { - format->Write(page, fo.get()); + if (page->data.size() != 0) { + writer.PushWrite(std::move(page)); } - fo.reset(dmlc::Stream::Create(name_info.c_str(), "w")); + std::unique_ptr fo( + dmlc::Stream::Create(name_info.c_str(), "w")); int tmagic = kMagic; fo->Write(&tmagic, sizeof(tmagic)); - src->info().SaveBinary(fo.get()); - - LOG(CONSOLE) << "SparsePageSource: Finished writing to " << cache_prefix; + info.SaveBinary(fo.get()); + LOG(CONSOLE) << "SparsePageSource: Finished writing to " << name_info; } } // namespace data diff --git a/src/data/sparse_page_source.h b/src/data/sparse_page_source.h index 79c55b4ba..02a3445ec 100644 --- a/src/data/sparse_page_source.h +++ b/src/data/sparse_page_source.h @@ -71,14 +71,14 @@ class SparsePageSource : public DataSource { RowBatch batch_; /*! \brief page currently on hold. */ SparsePage *page_; - /*! \brief The cache predix of the dataset. */ - std::string cache_prefix_; + /*! \brief internal clock ptr */ + size_t clock_ptr_; /*! \brief file pointer to the row blob file. */ - std::unique_ptr fi_; + std::vector > files_; /*! \brief Sparse page format file. */ - std::unique_ptr format_; + std::vector > formats_; /*! \brief internal prefetcher. */ - dmlc::ThreadedIter prefetcher_; + std::vector > > prefetchers_; }; } // namespace data } // namespace xgboost diff --git a/src/data/sparse_page_writer.cc b/src/data/sparse_page_writer.cc new file mode 100644 index 000000000..33f9172d6 --- /dev/null +++ b/src/data/sparse_page_writer.cc @@ -0,0 +1,72 @@ +/*! + * Copyright (c) 2015 by Contributors + * \file sparse_batch_writer.cc + * \param Writer class sparse page. + */ +#include +#include +#include "./sparse_batch_page.h" + +#if DMLC_ENABLE_STD_THREAD +namespace xgboost { +namespace data { + +SparsePage::Writer::Writer( + const std::vector& name_shards, + const std::vector& format_shards, + size_t extra_buffer_capacity) + : num_free_buffer_(extra_buffer_capacity + name_shards.size()), + clock_ptr_(0), + workers_(name_shards.size()), + qworkers_(name_shards.size()) { + CHECK_EQ(name_shards.size(), format_shards.size()); + // start writer threads + for (size_t i = 0; i < name_shards.size(); ++i) { + std::string name_shard = name_shards[i]; + std::string format_shard = format_shards[i]; + auto* wqueue = &qworkers_[i]; + workers_[i].reset(new std::thread( + [this, name_shard, format_shard, wqueue] () { + std::unique_ptr fo( + dmlc::Stream::Create(name_shard.c_str(), "w")); + std::unique_ptr fmt( + SparsePage::Format::Create(format_shard)); + fo->Write(format_shard); + std::unique_ptr page; + while (wqueue->Pop(&page)) { + fmt->Write(*page, fo.get()); + qrecycle_.Push(std::move(page)); + } + fo.reset(nullptr); + LOG(CONSOLE) << "SparsePage::Writer Finished writing to " << name_shard; + })); + } +} + +SparsePage::Writer::~Writer() { + for (auto& queue : qworkers_) { + queue.SignalForKill(); + } + for (auto& thread : workers_) { + thread->join(); + } +} + +void SparsePage::Writer::PushWrite(std::unique_ptr&& page) { + qworkers_[clock_ptr_].Push(std::move(page)); + clock_ptr_ = (clock_ptr_ + 1) % workers_.size(); +} + +void SparsePage::Writer::Alloc(std::unique_ptr* out_page) { + CHECK(out_page->get() == nullptr); + if (num_free_buffer_ != 0) { + out_page->reset(new SparsePage()); + --num_free_buffer_; + } else { + CHECK(qrecycle_.Pop(out_page)); + } +} +} // namespace data +} // namespace xgboost + +#endif // DMLC_ENABLE_STD_THREAD