From 2230f1273f2e966db80de5c64965e205f436746c Mon Sep 17 00:00:00 2001 From: tqchen Date: Tue, 19 Jan 2016 13:39:10 -0800 Subject: [PATCH 01/16] [DISK] Add shard option to disk --- dmlc-core | 2 +- src/common/common.h | 31 ++++++ src/data/sparse_batch_page.h | 55 +++++++++++ src/data/sparse_page_dmatrix.cc | 133 ++++++++++++++----------- src/data/sparse_page_dmatrix.h | 27 ++--- src/data/sparse_page_source.cc | 170 +++++++++++++++++++------------- src/data/sparse_page_source.h | 10 +- src/data/sparse_page_writer.cc | 72 ++++++++++++++ 8 files changed, 356 insertions(+), 144 deletions(-) create mode 100644 src/common/common.h create mode 100644 src/data/sparse_page_writer.cc diff --git a/dmlc-core b/dmlc-core index ad2ddde8b..257b09a0b 160000 --- a/dmlc-core +++ b/dmlc-core @@ -1 +1 @@ -Subproject commit ad2ddde8b6624abf3007a71b2923c3925530cc81 +Subproject commit 257b09a0ba18625a9fcf3b9471a9b1c35a767b7b diff --git a/src/common/common.h b/src/common/common.h new file mode 100644 index 000000000..d8adadae3 --- /dev/null +++ b/src/common/common.h @@ -0,0 +1,31 @@ +/*! + * Copyright 2015 by Contributors + * \file common.h + * \brief Common utilities + */ +#ifndef XGBOOST_COMMON_COMMON_H_ +#define XGBOOST_COMMON_COMMON_H_ + +#include +#include +#include + +namespace xgboost { +namespace common { +/*! + * \brief Split a string by delimiter + * \param s String to be splitted. + * \param delim The delimiter. + */ +inline std::vector Split(const std::string& s, char delim) { + std::string item; + std::istringstream is(s); + std::vector ret; + while (std::getline(is, item, delim)) { + ret.push_back(item); + } + return ret; +} +} // namespace common +} // namespace xgboost +#endif // XGBOOST_COMMON_COMMON_H_ diff --git a/src/data/sparse_batch_page.h b/src/data/sparse_batch_page.h index 41893e6b5..81dbe1368 100644 --- a/src/data/sparse_batch_page.h +++ b/src/data/sparse_batch_page.h @@ -16,6 +16,12 @@ #include #include #include +#include + +#if DMLC_ENABLE_STD_THREAD +#include +#include +#endif namespace xgboost { namespace data { @@ -26,6 +32,8 @@ class SparsePage { public: /*! \brief Format of the sparse page. */ class Format; + /*! \brief Writer to write the sparse page to files. */ + class Writer; /*! \brief minimum index of all index, used as hint for compression. */ bst_uint min_index; /*! \brief offset of the segments */ @@ -171,6 +179,53 @@ class SparsePage::Format { static std::pair DecideFormat(const std::string& cache_prefix); }; +#if DMLC_ENABLE_STD_THREAD +/*! + * \brief A threaded writer to write sparse batch page to sharded files. + */ +class SparsePage::Writer { + public: + /*! + * \brief constructor + * \param name_shards name of shard files. + * \param format_shards format of each shard. + * \param extra_buffer_capacity Extra buffer capacity before block. + */ + explicit Writer( + const std::vector& name_shards, + const std::vector& format_shards, + size_t extra_buffer_capacity); + /*! \brief destructor, will close the files automatically */ + ~Writer(); + /*! + * \brief Push a write job to the writer. + * This function won't block, + * writing is done by another thread inside writer. + * \param page The page to be wriiten + */ + void PushWrite(std::unique_ptr&& page); + /*! + * \brief Allocate a page to store results. + * This function can block when the writer is too slow and buffer pages + * have not yet been recycled. + * \param out_page Used to store the allocated pages. + */ + void Alloc(std::unique_ptr* out_page); + + private: + /*! \brief number of allocated pages */ + size_t num_free_buffer_; + /*! \brief clock_pointer */ + size_t clock_ptr_; + /*! \brief writer threads */ + std::vector > workers_; + /*! \brief recycler queue */ + dmlc::ConcurrentBlockingQueue > qrecycle_; + /*! \brief worker threads */ + std::vector > > qworkers_; +}; +#endif // DMLC_ENABLE_STD_THREAD + /*! * \brief Registry entry for sparse page format. */ diff --git a/src/data/sparse_page_dmatrix.cc b/src/data/sparse_page_dmatrix.cc index 0cbf27e5d..eb3ef3ca0 100644 --- a/src/data/sparse_page_dmatrix.cc +++ b/src/data/sparse_page_dmatrix.cc @@ -12,34 +12,42 @@ #if DMLC_ENABLE_STD_THREAD #include "./sparse_page_dmatrix.h" #include "../common/random.h" +#include "../common/common.h" #include "../common/group_data.h" namespace xgboost { namespace data { -SparsePageDMatrix::ColPageIter::ColPageIter(std::unique_ptr&& fi) - : fi_(std::move(fi)), page_(nullptr) { +SparsePageDMatrix::ColPageIter::ColPageIter( + std::vector >&& files) + : page_(nullptr), clock_ptr_(0), files_(std::move(files)) { load_all_ = false; + formats_.resize(files_.size()); + prefetchers_.resize(files_.size()); - std::string format; - CHECK(fi_->Read(&format)) << "Invalid page format"; - format_.reset(SparsePage::Format::Create(format)); - size_t fbegin = fi_->Tell(); - - prefetcher_.Init([this](SparsePage** dptr) { - if (*dptr == nullptr) { - *dptr = new SparsePage(); - } - if (load_all_) { - return format_->Read(*dptr, fi_.get()); - } else { - return format_->Read(*dptr, fi_.get(), index_set_); - } - }, [this, fbegin] () { - fi_->Seek(fbegin); - index_set_ = set_index_set_; - load_all_ = set_load_all_; - }); + for (size_t i = 0; i < files_.size(); ++i) { + dmlc::SeekStream* fi = files_[i].get(); + std::string format; + CHECK(fi->Read(&format)) << "Invalid page format"; + formats_[i].reset(SparsePage::Format::Create(format)); + SparsePage::Format* fmt = formats_[i].get(); + size_t fbegin = fi->Tell(); + prefetchers_[i].reset(new dmlc::ThreadedIter(4)); + prefetchers_[i]->Init([this, fi, fmt] (SparsePage** dptr) { + if (*dptr == nullptr) { + *dptr = new SparsePage(); + } + if (load_all_) { + return fmt->Read(*dptr, fi); + } else { + return fmt->Read(*dptr, fi, index_set_); + } + }, [this, fi, fbegin] () { + fi->Seek(fbegin); + index_set_ = set_index_set_; + load_all_ = set_load_all_; + }); + } } SparsePageDMatrix::ColPageIter::~ColPageIter() { @@ -47,10 +55,12 @@ SparsePageDMatrix::ColPageIter::~ColPageIter() { } bool SparsePageDMatrix::ColPageIter::Next() { + // doing clock rotation over shards. if (page_ != nullptr) { - prefetcher_.Recycle(&page_); + size_t n = prefetchers_.size(); + prefetchers_[(clock_ptr_ + n - 1) % n]->Recycle(&page_); } - if (prefetcher_.Next(&page_)) { + if (prefetchers_[clock_ptr_]->Next(&page_)) { out_.col_index = dmlc::BeginPtr(index_set_); col_data_.resize(page_->offset.size() - 1, SparseBatch::Inst(nullptr, 0)); for (size_t i = 0; i < col_data_.size(); ++i) { @@ -60,18 +70,26 @@ bool SparsePageDMatrix::ColPageIter::Next() { } out_.col_data = dmlc::BeginPtr(col_data_); out_.size = col_data_.size(); + // advance clock + clock_ptr_ = (clock_ptr_ + 1) % prefetchers_.size(); return true; } else { return false; } } +void SparsePageDMatrix::ColPageIter::BeforeFirst() { + clock_ptr_ = 0; + for (auto& p : prefetchers_) { + p->BeforeFirst(); + } +} + void SparsePageDMatrix::ColPageIter::Init(const std::vector& index_set, bool load_all) { set_index_set_ = index_set; set_load_all_ = load_all; std::sort(set_index_set_.begin(), set_index_set_.end()); - this->BeforeFirst(); } @@ -103,8 +121,9 @@ ColIterator(const std::vector& fset) { bool SparsePageDMatrix::TryInitColData() { // load meta data. + std::vector cache_shards = common::Split(cache_info_, ':'); { - std::string col_meta_name = cache_prefix_ + ".col.meta"; + std::string col_meta_name = cache_shards[0] + ".col.meta"; std::unique_ptr fmeta( dmlc::Stream::Create(col_meta_name.c_str(), "r", true)); if (fmeta.get() == nullptr) return false; @@ -112,13 +131,15 @@ bool SparsePageDMatrix::TryInitColData() { CHECK(fmeta->Read(&col_size_)) << "invalid col.meta file"; } // load real data - { - std::string col_data_name = cache_prefix_ + ".col.page"; + std::vector > files; + for (const std::string& prefix : cache_shards) { + std::string col_data_name = prefix + ".col.page"; std::unique_ptr fdata( dmlc::SeekStream::CreateForRead(col_data_name.c_str(), true)); if (fdata.get() == nullptr) return false; - col_iter_.reset(new ColPageIter(std::move(fdata))); + files.push_back(std::move(fdata)); } + col_iter_.reset(new ColPageIter(std::move(files))); return true; } @@ -135,24 +156,17 @@ void SparsePageDMatrix::InitColAccess(const std::vector& enabled, buffered_rowset_.clear(); col_size_.resize(info.num_col); std::fill(col_size_.begin(), col_size_.end(), 0); - // make the sparse page. - dmlc::ThreadedIter cmaker; - SparsePage tmp; - size_t batch_ptr = 0, batch_top = 0; dmlc::DataIter* iter = this->RowIterator(); std::bernoulli_distribution coin_flip(pkeep); - + size_t batch_ptr = 0, batch_top = 0; + SparsePage tmp; auto& rnd = common::GlobalRandom(); // function to create the page. auto make_col_batch = [&] ( const SparsePage& prow, const bst_uint* ridx, - SparsePage **dptr) { - if (*dptr == nullptr) { - *dptr = new SparsePage(); - } - SparsePage* pcol = *dptr; + SparsePage *pcol) { pcol->Clear(); pcol->min_index = ridx[0]; int nthread; @@ -199,7 +213,7 @@ void SparsePageDMatrix::InitColAccess(const std::vector& enabled, } }; - auto make_next_col = [&] (SparsePage** dptr) { + auto make_next_col = [&] (SparsePage* dptr) { tmp.Clear(); size_t btop = buffered_rowset_.size(); @@ -236,41 +250,44 @@ void SparsePageDMatrix::InitColAccess(const std::vector& enabled, } }; - cmaker.Init(make_next_col, []() {}); - - std::string col_data_name = cache_prefix_ + ".col.page"; - std::unique_ptr fo(dmlc::Stream::Create(col_data_name.c_str(), "w")); - // find format. - std::string name_format = SparsePage::Format::DecideFormat(cache_prefix_).second; - fo->Write(name_format); - std::unique_ptr format(SparsePage::Format::Create(name_format)); + std::vector cache_shards = common::Split(cache_info_, ':'); + std::vector name_shards, format_shards; + for (const std::string& prefix : cache_shards) { + name_shards.push_back(prefix + ".col.page"); + format_shards.push_back(SparsePage::Format::DecideFormat(prefix).second); + } + SparsePage::Writer writer(name_shards, format_shards, 6); + std::unique_ptr page; + writer.Alloc(&page); page->Clear(); double tstart = dmlc::GetTime(); size_t bytes_write = 0; // print every 4 sec. const double kStep = 4.0; size_t tick_expected = kStep; - SparsePage* pcol = nullptr; - while (cmaker.Next(&pcol)) { - for (size_t i = 0; i < pcol->Size(); ++i) { - col_size_[i] += pcol->offset[i + 1] - pcol->offset[i]; + while (make_next_col(page.get())) { + for (size_t i = 0; i < page->Size(); ++i) { + col_size_[i] += page->offset[i + 1] - page->offset[i]; } - format->Write(*pcol, fo.get()); - size_t spage = pcol->MemCostBytes(); - bytes_write += spage; + + bytes_write += page->MemCostBytes(); + writer.PushWrite(std::move(page)); + writer.Alloc(&page); + page->Clear(); + double tdiff = dmlc::GetTime() - tstart; if (tdiff >= tick_expected) { - LOG(CONSOLE) << "Writing to " << col_data_name + LOG(CONSOLE) << "Writing col.page file to " << cache_info_ << " in " << ((bytes_write >> 20UL) / tdiff) << " MB/s, " << (bytes_write >> 20UL) << " MB writen"; tick_expected += kStep; } - cmaker.Recycle(&pcol); } // save meta data - std::string col_meta_name = cache_prefix_ + ".col.meta"; - fo.reset(dmlc::Stream::Create(col_meta_name.c_str(), "w")); + std::string col_meta_name = cache_shards[0] + ".col.meta"; + std::unique_ptr fo( + dmlc::Stream::Create(col_meta_name.c_str(), "w")); fo->Write(buffered_rowset_); fo->Write(col_size_); fo.reset(nullptr); diff --git a/src/data/sparse_page_dmatrix.h b/src/data/sparse_page_dmatrix.h index e4aebee9c..129d1f016 100644 --- a/src/data/sparse_page_dmatrix.h +++ b/src/data/sparse_page_dmatrix.h @@ -14,6 +14,7 @@ #include #include #include "./sparse_batch_page.h" +#include "../common/common.h" namespace xgboost { namespace data { @@ -21,9 +22,9 @@ namespace data { class SparsePageDMatrix : public DMatrix { public: explicit SparsePageDMatrix(std::unique_ptr&& source, - const std::string& cache_prefix) - : source_(std::move(source)), - cache_prefix_(cache_prefix) {} + const std::string& cache_info) + : source_(std::move(source)), cache_info_(cache_info) { + } MetaInfo& info() override { return source_->info; @@ -77,11 +78,9 @@ class SparsePageDMatrix : public DMatrix { // declare the column batch iter. class ColPageIter : public dmlc::DataIter { public: - explicit ColPageIter(std::unique_ptr&& fi); + explicit ColPageIter(std::vector >&& files); virtual ~ColPageIter(); - void BeforeFirst() override { - prefetcher_.BeforeFirst(); - } + void BeforeFirst() override; const ColBatch &Value() const override { return out_; } @@ -90,20 +89,22 @@ class SparsePageDMatrix : public DMatrix { void Init(const std::vector& index_set, bool load_all); private: - // data file pointer. - std::unique_ptr fi_; // the temp page. SparsePage* page_; + // internal clock ptr. + size_t clock_ptr_; + // data file pointer. + std::vector > files_; // page format. - std::unique_ptr format_; + std::vector > formats_; + /*! \brief internal prefetcher. */ + std::vector > > prefetchers_; // The index set to be loaded. std::vector index_set_; // The index set by the outsiders std::vector set_index_set_; // whether to load data dataset. bool set_load_all_, load_all_; - // data prefetcher. - dmlc::ThreadedIter prefetcher_; // temporal space for batch ColBatch out_; // the pointer data. @@ -117,7 +118,7 @@ class SparsePageDMatrix : public DMatrix { // source data pointer. std::unique_ptr source_; // the cache prefix - std::string cache_prefix_; + std::string cache_info_; /*! \brief list of row index that are buffered */ std::vector buffered_rowset_; // count for column data diff --git a/src/data/sparse_page_source.cc b/src/data/sparse_page_source.cc index 5730da9b5..3f499739f 100644 --- a/src/data/sparse_page_source.cc +++ b/src/data/sparse_page_source.cc @@ -9,35 +9,45 @@ #if DMLC_ENABLE_STD_THREAD #include "./sparse_page_source.h" +#include "../common/common.h" namespace xgboost { namespace data { -SparsePageSource::SparsePageSource(const std::string& cache_prefix) - : base_rowid_(0), page_(nullptr) { - // read in the info files. +SparsePageSource::SparsePageSource(const std::string& cache_info) + : base_rowid_(0), page_(nullptr), clock_ptr_(0) { + // read in the info files + std::vector cache_shards = common::Split(cache_info, ':'); + CHECK_NE(cache_shards.size(), 0); { - std::string name_info = cache_prefix; + std::string name_info = cache_shards[0]; std::unique_ptr finfo(dmlc::Stream::Create(name_info.c_str(), "r")); int tmagic; CHECK_EQ(finfo->Read(&tmagic, sizeof(tmagic)), sizeof(tmagic)); this->info.LoadBinary(finfo.get()); } + files_.resize(cache_shards.size()); + formats_.resize(cache_shards.size()); + prefetchers_.resize(cache_shards.size()); + // read in the cache files. - std::string name_row = cache_prefix + ".row.page"; - fi_.reset(dmlc::SeekStream::CreateForRead(name_row.c_str())); - - std::string format; - CHECK(fi_->Read(&format)) << "Invalid page format"; - format_.reset(SparsePage::Format::Create(format)); - size_t fbegin = fi_->Tell(); - - prefetcher_.Init([this] (SparsePage** dptr) { - if (*dptr == nullptr) { - *dptr = new SparsePage(); - } - return format_->Read(*dptr, fi_.get()); - }, [this, fbegin] () { fi_->Seek(fbegin); }); + for (size_t i = 0; i < cache_shards.size(); ++i) { + std::string name_row = cache_shards[i] + ".row.page"; + files_[i].reset(dmlc::SeekStream::CreateForRead(name_row.c_str())); + dmlc::SeekStream* fi = files_[i].get(); + std::string format; + CHECK(fi->Read(&format)) << "Invalid page format"; + formats_[i].reset(SparsePage::Format::Create(format)); + SparsePage::Format* fmt = formats_[i].get(); + size_t fbegin = fi->Tell(); + prefetchers_[i].reset(new dmlc::ThreadedIter(4)); + prefetchers_[i]->Init([fi, fmt] (SparsePage** dptr) { + if (*dptr == nullptr) { + *dptr = new SparsePage(); + } + return fmt->Read(*dptr, fi); + }, [fi, fbegin] () { fi->Seek(fbegin); }); + } } SparsePageSource::~SparsePageSource() { @@ -45,12 +55,16 @@ SparsePageSource::~SparsePageSource() { } bool SparsePageSource::Next() { + // doing clock rotation over shards. if (page_ != nullptr) { - prefetcher_.Recycle(&page_); + size_t n = prefetchers_.size(); + prefetchers_[(clock_ptr_ + n - 1) % n]->Recycle(&page_); } - if (prefetcher_.Next(&page_)) { + if (prefetchers_[clock_ptr_]->Next(&page_)) { batch_ = page_->GetRowBatch(base_rowid_); base_rowid_ += batch_.size; + // advance clock + clock_ptr_ = (clock_ptr_ + 1) % prefetchers_.size(); return true; } else { return false; @@ -59,33 +73,48 @@ bool SparsePageSource::Next() { void SparsePageSource::BeforeFirst() { base_rowid_ = 0; - prefetcher_.BeforeFirst(); + clock_ptr_ = 0; + for (auto& p : prefetchers_) { + p->BeforeFirst(); + } } const RowBatch& SparsePageSource::Value() const { return batch_; } -bool SparsePageSource::CacheExist(const std::string& cache_prefix) { - std::string name_info = cache_prefix; - std::string name_row = cache_prefix + ".row.page"; - std::unique_ptr finfo(dmlc::Stream::Create(name_info.c_str(), "r", true)); - std::unique_ptr frow(dmlc::Stream::Create(name_row.c_str(), "r", true)); - return finfo.get() != nullptr && frow.get() != nullptr; +bool SparsePageSource::CacheExist(const std::string& cache_info) { + std::vector cache_shards = common::Split(cache_info, ':'); + CHECK_NE(cache_shards.size(), 0); + { + std::string name_info = cache_shards[0]; + std::unique_ptr finfo(dmlc::Stream::Create(name_info.c_str(), "r", true)); + if (finfo.get() == nullptr) return false; + } + for (const std::string& prefix : cache_shards) { + std::string name_row = prefix + ".row.page"; + std::unique_ptr frow(dmlc::Stream::Create(name_row.c_str(), "r", true)); + if (frow.get() == nullptr) return false; + } + return true; } void SparsePageSource::Create(dmlc::Parser* src, - const std::string& cache_prefix) { + const std::string& cache_info) { + std::vector cache_shards = common::Split(cache_info, ':'); + CHECK_NE(cache_shards.size(), 0); // read in the info files. - std::string name_info = cache_prefix; - std::string name_row = cache_prefix + ".row.page"; - std::unique_ptr fo(dmlc::Stream::Create(name_row.c_str(), "w")); - std::string name_format = SparsePage::Format::DecideFormat(cache_prefix).first; - fo->Write(name_format); - std::unique_ptr format(SparsePage::Format::Create(name_format)); + std::string name_info = cache_shards[0]; + std::vector name_shards, format_shards; + for (const std::string& prefix : cache_shards) { + name_shards.push_back(prefix + ".row.page"); + format_shards.push_back(SparsePage::Format::DecideFormat(prefix).first); + } + SparsePage::Writer writer(name_shards, format_shards, 6); + std::unique_ptr page; + writer.Alloc(&page); page->Clear(); MetaInfo info; - SparsePage page; size_t bytes_write = 0; double tstart = dmlc::GetTime(); // print every 4 sec. @@ -107,14 +136,16 @@ void SparsePageSource::Create(dmlc::Parser* src, info.num_col = std::max(info.num_col, static_cast(index + 1)); } - page.Push(batch); - if (page.MemCostBytes() >= kPageSize) { - bytes_write += page.MemCostBytes(); - format->Write(page, fo.get()); - page.Clear(); + page->Push(batch); + if (page->MemCostBytes() >= kPageSize) { + bytes_write += page->MemCostBytes(); + writer.PushWrite(std::move(page)); + writer.Alloc(&page); + page->Clear(); + double tdiff = dmlc::GetTime() - tstart; if (tdiff >= tick_expected) { - LOG(CONSOLE) << "Writing to " << name_row << " in " + LOG(CONSOLE) << "Writing row.page to " << cache_info << " in " << ((bytes_write >> 20UL) / tdiff) << " MB/s, " << (bytes_write >> 20UL) << " written"; tick_expected += kStep; @@ -122,57 +153,62 @@ void SparsePageSource::Create(dmlc::Parser* src, } } - if (page.data.size() != 0) { - format->Write(page, fo.get()); + if (page->data.size() != 0) { + writer.PushWrite(std::move(page)); } - fo.reset(dmlc::Stream::Create(name_info.c_str(), "w")); + std::unique_ptr fo( + dmlc::Stream::Create(name_info.c_str(), "w")); int tmagic = kMagic; fo->Write(&tmagic, sizeof(tmagic)); info.SaveBinary(fo.get()); - - LOG(CONSOLE) << "SparsePageSource: Finished writing to " << cache_prefix; + LOG(CONSOLE) << "SparsePageSource: Finished writing to " << name_info; } void SparsePageSource::Create(DMatrix* src, - const std::string& cache_prefix) { + const std::string& cache_info) { + std::vector cache_shards = common::Split(cache_info, ':'); + CHECK_NE(cache_shards.size(), 0); // read in the info files. - std::string name_info = cache_prefix; - std::string name_row = cache_prefix + ".row.page"; - std::unique_ptr fo(dmlc::Stream::Create(name_row.c_str(), "w")); - // find format. - std::string name_format = SparsePage::Format::DecideFormat(cache_prefix).first; - fo->Write(name_format); - std::unique_ptr format(SparsePage::Format::Create(name_format)); + std::string name_info = cache_shards[0]; + std::vector name_shards, format_shards; + for (const std::string& prefix : cache_shards) { + name_shards.push_back(prefix + ".row.page"); + format_shards.push_back(SparsePage::Format::DecideFormat(prefix).first); + } + SparsePage::Writer writer(name_shards, format_shards, 6); + std::unique_ptr page; + writer.Alloc(&page); page->Clear(); - SparsePage page; + MetaInfo info; size_t bytes_write = 0; double tstart = dmlc::GetTime(); dmlc::DataIter* iter = src->RowIterator(); while (iter->Next()) { - page.Push(iter->Value()); - if (page.MemCostBytes() >= kPageSize) { - bytes_write += page.MemCostBytes(); - format->Write(page, fo.get()); - page.Clear(); + page->Push(iter->Value()); + if (page->MemCostBytes() >= kPageSize) { + bytes_write += page->MemCostBytes(); + writer.PushWrite(std::move(page)); + writer.Alloc(&page); + page->Clear(); double tdiff = dmlc::GetTime() - tstart; - LOG(CONSOLE) << "Writing to " << name_row << " in " + LOG(CONSOLE) << "Writing to " << cache_info << " in " << ((bytes_write >> 20UL) / tdiff) << " MB/s, " << (bytes_write >> 20UL) << " written"; } } - if (page.data.size() != 0) { - format->Write(page, fo.get()); + if (page->data.size() != 0) { + writer.PushWrite(std::move(page)); } - fo.reset(dmlc::Stream::Create(name_info.c_str(), "w")); + std::unique_ptr fo( + dmlc::Stream::Create(name_info.c_str(), "w")); int tmagic = kMagic; fo->Write(&tmagic, sizeof(tmagic)); - src->info().SaveBinary(fo.get()); - - LOG(CONSOLE) << "SparsePageSource: Finished writing to " << cache_prefix; + info.SaveBinary(fo.get()); + LOG(CONSOLE) << "SparsePageSource: Finished writing to " << name_info; } } // namespace data diff --git a/src/data/sparse_page_source.h b/src/data/sparse_page_source.h index 79c55b4ba..02a3445ec 100644 --- a/src/data/sparse_page_source.h +++ b/src/data/sparse_page_source.h @@ -71,14 +71,14 @@ class SparsePageSource : public DataSource { RowBatch batch_; /*! \brief page currently on hold. */ SparsePage *page_; - /*! \brief The cache predix of the dataset. */ - std::string cache_prefix_; + /*! \brief internal clock ptr */ + size_t clock_ptr_; /*! \brief file pointer to the row blob file. */ - std::unique_ptr fi_; + std::vector > files_; /*! \brief Sparse page format file. */ - std::unique_ptr format_; + std::vector > formats_; /*! \brief internal prefetcher. */ - dmlc::ThreadedIter prefetcher_; + std::vector > > prefetchers_; }; } // namespace data } // namespace xgboost diff --git a/src/data/sparse_page_writer.cc b/src/data/sparse_page_writer.cc new file mode 100644 index 000000000..33f9172d6 --- /dev/null +++ b/src/data/sparse_page_writer.cc @@ -0,0 +1,72 @@ +/*! + * Copyright (c) 2015 by Contributors + * \file sparse_batch_writer.cc + * \param Writer class sparse page. + */ +#include +#include +#include "./sparse_batch_page.h" + +#if DMLC_ENABLE_STD_THREAD +namespace xgboost { +namespace data { + +SparsePage::Writer::Writer( + const std::vector& name_shards, + const std::vector& format_shards, + size_t extra_buffer_capacity) + : num_free_buffer_(extra_buffer_capacity + name_shards.size()), + clock_ptr_(0), + workers_(name_shards.size()), + qworkers_(name_shards.size()) { + CHECK_EQ(name_shards.size(), format_shards.size()); + // start writer threads + for (size_t i = 0; i < name_shards.size(); ++i) { + std::string name_shard = name_shards[i]; + std::string format_shard = format_shards[i]; + auto* wqueue = &qworkers_[i]; + workers_[i].reset(new std::thread( + [this, name_shard, format_shard, wqueue] () { + std::unique_ptr fo( + dmlc::Stream::Create(name_shard.c_str(), "w")); + std::unique_ptr fmt( + SparsePage::Format::Create(format_shard)); + fo->Write(format_shard); + std::unique_ptr page; + while (wqueue->Pop(&page)) { + fmt->Write(*page, fo.get()); + qrecycle_.Push(std::move(page)); + } + fo.reset(nullptr); + LOG(CONSOLE) << "SparsePage::Writer Finished writing to " << name_shard; + })); + } +} + +SparsePage::Writer::~Writer() { + for (auto& queue : qworkers_) { + queue.SignalForKill(); + } + for (auto& thread : workers_) { + thread->join(); + } +} + +void SparsePage::Writer::PushWrite(std::unique_ptr&& page) { + qworkers_[clock_ptr_].Push(std::move(page)); + clock_ptr_ = (clock_ptr_ + 1) % workers_.size(); +} + +void SparsePage::Writer::Alloc(std::unique_ptr* out_page) { + CHECK(out_page->get() == nullptr); + if (num_free_buffer_ != 0) { + out_page->reset(new SparsePage()); + --num_free_buffer_; + } else { + CHECK(qrecycle_.Pop(out_page)); + } +} +} // namespace data +} // namespace xgboost + +#endif // DMLC_ENABLE_STD_THREAD From 88447ca32e2ee6ec54175ed1a05337fc157a2386 Mon Sep 17 00:00:00 2001 From: tqchen Date: Tue, 19 Jan 2016 16:40:07 -0800 Subject: [PATCH 02/16] [MEM] Add rowset struct to save memory with billion level rows --- include/xgboost/data.h | 76 +++++++++++++++++++++++++++++++- src/data/simple_dmatrix.cc | 17 +++---- src/data/simple_dmatrix.h | 6 +-- src/data/sparse_page_dmatrix.cc | 10 ++--- src/data/sparse_page_dmatrix.h | 4 +- src/gbm/gblinear.cc | 2 +- src/gbm/gbtree.cc | 2 +- src/tree/updater_basemaker-inl.h | 4 +- src/tree/updater_colmaker.cc | 10 ++--- 9 files changed, 101 insertions(+), 30 deletions(-) diff --git a/include/xgboost/data.h b/include/xgboost/data.h index 65e7ff0f6..770e982b7 100644 --- a/include/xgboost/data.h +++ b/include/xgboost/data.h @@ -183,6 +183,41 @@ class DataSource : public dmlc::DataIter { MetaInfo info; }; +/*! + * \brief A vector-like structure to represent set of rows. + * But saves the memory when all rows are in the set (common case in xgb) + */ +struct RowSet { + public: + /*! \return i-th row index */ + inline bst_uint operator[](size_t i) const; + /*! \return the size of the set. */ + inline size_t size() const; + /*! \brief push the index back to the set */ + inline void push_back(bst_uint i); + /*! \brief clear the set */ + inline void clear(); + /*! + * \brief save rowset to file. + * \param fo The file to be saved. + */ + inline void Save(dmlc::Stream* fo) const; + /*! + * \brief Load rowset from file. + * \param fi The file to be loaded. + * \return if read is successful. + */ + inline bool Load(dmlc::Stream* fi); + /*! \brief constructor */ + RowSet() : size_(0) {} + + private: + /*! \brief The internal data structure of size */ + uint64_t size_; + /*! \brief The internal data structure of row set if not all*/ + std::vector rows_; +}; + /*! * \brief Internal data structured used by XGBoost during training. * There are two ways to create a customized DMatrix that reads in user defined-format. @@ -235,7 +270,7 @@ class DMatrix { /*! \brief get column density */ virtual float GetColDensity(size_t cidx) const = 0; /*! \return reference of buffered rowset, in column access */ - virtual const std::vector& buffered_rowset() const = 0; + virtual const RowSet& buffered_rowset() const = 0; /*! \brief virtual destructor */ virtual ~DMatrix() {} /*! @@ -290,9 +325,48 @@ class DMatrix { LearnerImpl* cache_learner_ptr_; }; +// implementation of inline functions +inline bst_uint RowSet::operator[](size_t i) const { + return rows_.size() == 0 ? i : rows_[i]; +} + +inline size_t RowSet::size() const { + return size_; +} + +inline void RowSet::clear() { + rows_.clear(); size_ = 0; +} + +inline void RowSet::push_back(bst_uint i) { + if (rows_.size() == 0) { + if (i == size_) { + ++size_; return; + } else { + rows_.resize(size_); + for (size_t i = 0; i < size_; ++i) { + rows_[i] = static_cast(i); + } + } + } + rows_.push_back(i); + ++size_; +} + +inline void RowSet::Save(dmlc::Stream* fo) const { + fo->Write(rows_); + fo->Write(&size_, sizeof(size_)); +} + +inline bool RowSet::Load(dmlc::Stream* fi) { + if (!fi->Read(&rows_)) return false; + if (rows_.size() != 0) return true; + return fi->Read(&size_, sizeof(size_)) == sizeof(size_); +} } // namespace xgboost namespace dmlc { DMLC_DECLARE_TRAITS(is_pod, xgboost::SparseBatch::Entry, true); +DMLC_DECLARE_TRAITS(has_saveload, xgboost::RowSet, true); } #endif // XGBOOST_DATA_H_ diff --git a/src/data/simple_dmatrix.cc b/src/data/simple_dmatrix.cc index 69700f45b..ae78e3864 100644 --- a/src/data/simple_dmatrix.cc +++ b/src/data/simple_dmatrix.cc @@ -184,9 +184,7 @@ void SimpleDMatrix::MakeManyBatch(const std::vector& enabled, } if (tmp.Size() >= max_row_perbatch) { std::unique_ptr page(new SparsePage()); - this->MakeColPage(tmp.GetRowBatch(0), - dmlc::BeginPtr(buffered_rowset_) + btop, - enabled, page.get()); + this->MakeColPage(tmp.GetRowBatch(0), btop, enabled, page.get()); col_iter_.cpages_.push_back(std::move(page)); btop = buffered_rowset_.size(); tmp.Clear(); @@ -196,16 +194,14 @@ void SimpleDMatrix::MakeManyBatch(const std::vector& enabled, if (tmp.Size() != 0) { std::unique_ptr page(new SparsePage()); - this->MakeColPage(tmp.GetRowBatch(0), - dmlc::BeginPtr(buffered_rowset_) + btop, - enabled, page.get()); + this->MakeColPage(tmp.GetRowBatch(0), btop, enabled, page.get()); col_iter_.cpages_.push_back(std::move(page)); } } // make column page from subset of rowbatchs void SimpleDMatrix::MakeColPage(const RowBatch& batch, - const bst_uint* ridx, + size_t buffer_begin, const std::vector& enabled, SparsePage* pcol) { int nthread; @@ -240,9 +236,10 @@ void SimpleDMatrix::MakeColPage(const RowBatch& batch, RowBatch::Inst inst = batch[i]; for (bst_uint j = 0; j < inst.length; ++j) { const SparseBatch::Entry &e = inst[j]; - builder.Push(e.index, - SparseBatch::Entry(ridx[i], e.fvalue), - tid); + builder.Push( + e.index, + SparseBatch::Entry(buffered_rowset_[i + buffer_begin], e.fvalue), + tid); } } CHECK_EQ(pcol->Size(), info().num_col); diff --git a/src/data/simple_dmatrix.h b/src/data/simple_dmatrix.h index 3b63e1e97..d31578254 100644 --- a/src/data/simple_dmatrix.h +++ b/src/data/simple_dmatrix.h @@ -40,7 +40,7 @@ class SimpleDMatrix : public DMatrix { return col_size_.size() != 0; } - const std::vector& buffered_rowset() const override { + const RowSet& buffered_rowset() const override { return buffered_rowset_; } @@ -96,7 +96,7 @@ class SimpleDMatrix : public DMatrix { // column iterator ColBatchIter col_iter_; // list of row index that are buffered. - std::vector buffered_rowset_; + RowSet buffered_rowset_; /*! \brief sizeof column data */ std::vector col_size_; @@ -110,7 +110,7 @@ class SimpleDMatrix : public DMatrix { size_t max_row_perbatch); void MakeColPage(const RowBatch& batch, - const bst_uint* ridx, + size_t buffer_begin, const std::vector& enabled, SparsePage* pcol); }; diff --git a/src/data/sparse_page_dmatrix.cc b/src/data/sparse_page_dmatrix.cc index eb3ef3ca0..d25e06492 100644 --- a/src/data/sparse_page_dmatrix.cc +++ b/src/data/sparse_page_dmatrix.cc @@ -165,10 +165,10 @@ void SparsePageDMatrix::InitColAccess(const std::vector& enabled, // function to create the page. auto make_col_batch = [&] ( const SparsePage& prow, - const bst_uint* ridx, + size_t begin, SparsePage *pcol) { pcol->Clear(); - pcol->min_index = ridx[0]; + pcol->min_index = buffered_rowset_[begin]; int nthread; #pragma omp parallel { @@ -196,7 +196,7 @@ void SparsePageDMatrix::InitColAccess(const std::vector& enabled, for (size_t j = prow.offset[i]; j < prow.offset[i+1]; ++j) { const SparseBatch::Entry &e = prow.data[j]; builder.Push(e.index, - SparseBatch::Entry(ridx[i], e.fvalue), + SparseBatch::Entry(buffered_rowset_[i + begin], e.fvalue), tid); } } @@ -230,7 +230,7 @@ void SparsePageDMatrix::InitColAccess(const std::vector& enabled, if (tmp.Size() >= max_row_perbatch || tmp.MemCostBytes() >= kPageSize) { - make_col_batch(tmp, dmlc::BeginPtr(buffered_rowset_) + btop, dptr); + make_col_batch(tmp, btop, dptr); batch_ptr = i + 1; return true; } @@ -243,7 +243,7 @@ void SparsePageDMatrix::InitColAccess(const std::vector& enabled, } if (tmp.Size() != 0) { - make_col_batch(tmp, dmlc::BeginPtr(buffered_rowset_) + btop, dptr); + make_col_batch(tmp, btop, dptr); return true; } else { return false; diff --git a/src/data/sparse_page_dmatrix.h b/src/data/sparse_page_dmatrix.h index 129d1f016..a1f9f77b4 100644 --- a/src/data/sparse_page_dmatrix.h +++ b/src/data/sparse_page_dmatrix.h @@ -44,7 +44,7 @@ class SparsePageDMatrix : public DMatrix { return col_iter_.get() != nullptr; } - const std::vector& buffered_rowset() const override { + const RowSet& buffered_rowset() const override { return buffered_rowset_; } @@ -120,7 +120,7 @@ class SparsePageDMatrix : public DMatrix { // the cache prefix std::string cache_info_; /*! \brief list of row index that are buffered */ - std::vector buffered_rowset_; + RowSet buffered_rowset_; // count for column data std::vector col_size_; // internal column iter. diff --git a/src/gbm/gblinear.cc b/src/gbm/gblinear.cc index f4d235e1b..307e24f4c 100644 --- a/src/gbm/gblinear.cc +++ b/src/gbm/gblinear.cc @@ -109,7 +109,7 @@ class GBLinear : public GradientBooster { std::vector &gpair = *in_gpair; const int ngroup = model.param.num_output_group; - const std::vector &rowset = p_fmat->buffered_rowset(); + const RowSet &rowset = p_fmat->buffered_rowset(); // for all the output group for (int gid = 0; gid < ngroup; ++gid) { double sum_grad = 0.0, sum_hess = 0.0; diff --git a/src/gbm/gbtree.cc b/src/gbm/gbtree.cc index 6618cd503..f74ffc177 100644 --- a/src/gbm/gbtree.cc +++ b/src/gbm/gbtree.cc @@ -325,7 +325,7 @@ class GBTree : public GradientBooster { int bst_group, const RegTree &new_tree, const int* leaf_position) { - const std::vector& rowset = p_fmat->buffered_rowset(); + const RowSet& rowset = p_fmat->buffered_rowset(); const bst_omp_uint ndata = static_cast(rowset.size()); #pragma omp parallel for schedule(static) for (bst_omp_uint i = 0; i < ndata; ++i) { diff --git a/src/tree/updater_basemaker-inl.h b/src/tree/updater_basemaker-inl.h index 25faaae4e..cad3ec811 100644 --- a/src/tree/updater_basemaker-inl.h +++ b/src/tree/updater_basemaker-inl.h @@ -207,7 +207,7 @@ class BaseMaker: public TreeUpdater { // set the positions in the nondefault this->SetNonDefaultPositionCol(nodes, p_fmat, tree); // set rest of instances to default position - const std::vector &rowset = p_fmat->buffered_rowset(); + const RowSet &rowset = p_fmat->buffered_rowset(); // set default direct nodes to default // for leaf nodes that are not fresh, mark then to ~nid, // so that they are ignored in future statistics collection @@ -297,7 +297,7 @@ class BaseMaker: public TreeUpdater { thread_temp[tid][nid].Clear(); } } - const std::vector &rowset = fmat.buffered_rowset(); + const RowSet &rowset = fmat.buffered_rowset(); // setup position const bst_omp_uint ndata = static_cast(rowset.size()); #pragma omp parallel for schedule(static) diff --git a/src/tree/updater_colmaker.cc b/src/tree/updater_colmaker.cc index d034fa566..ded1dcdfe 100644 --- a/src/tree/updater_colmaker.cc +++ b/src/tree/updater_colmaker.cc @@ -117,7 +117,7 @@ class ColMaker: public TreeUpdater { CHECK_EQ(tree.param.num_nodes, tree.param.num_roots) << "ColMaker: can only grow new tree"; const std::vector& root_index = fmat.info().root_index; - const std::vector& rowset = fmat.buffered_rowset(); + const RowSet& rowset = fmat.buffered_rowset(); { // setup position position.resize(gpair.size()); @@ -200,7 +200,7 @@ class ColMaker: public TreeUpdater { } snode.resize(tree.param.num_nodes, NodeEntry(param)); } - const std::vector &rowset = fmat.buffered_rowset(); + const RowSet &rowset = fmat.buffered_rowset(); const MetaInfo& info = fmat.info(); // setup position const bst_omp_uint ndata = static_cast(rowset.size()); @@ -620,7 +620,7 @@ class ColMaker: public TreeUpdater { // set the positions in the nondefault this->SetNonDefaultPosition(qexpand, p_fmat, tree); // set rest of instances to default position - const std::vector &rowset = p_fmat->buffered_rowset(); + const RowSet &rowset = p_fmat->buffered_rowset(); // set default direct nodes to default // for leaf nodes that are not fresh, mark then to ~nid, // so that they are ignored in future statistics collection @@ -761,7 +761,7 @@ class DistColMaker : public ColMaker { : ColMaker::Builder(param) { } inline void UpdatePosition(DMatrix* p_fmat, const RegTree &tree) { - const std::vector &rowset = p_fmat->buffered_rowset(); + const RowSet &rowset = p_fmat->buffered_rowset(); const bst_omp_uint ndata = static_cast(rowset.size()); #pragma omp parallel for schedule(static) for (bst_omp_uint i = 0; i < ndata; ++i) { @@ -831,7 +831,7 @@ class DistColMaker : public ColMaker { bitmap.InitFromBool(boolmap); // communicate bitmap rabit::Allreduce(dmlc::BeginPtr(bitmap.data), bitmap.data.size()); - const std::vector &rowset = p_fmat->buffered_rowset(); + const RowSet &rowset = p_fmat->buffered_rowset(); // get the new position const bst_omp_uint ndata = static_cast(rowset.size()); #pragma omp parallel for schedule(static) From 468bc7725a133e598ec36bfeb92c2e34fec17b57 Mon Sep 17 00:00:00 2001 From: tqchen Date: Tue, 19 Jan 2016 16:42:33 -0800 Subject: [PATCH 03/16] [METRIC] change metric accumulator to double --- src/metric/multiclass_metric.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/metric/multiclass_metric.cc b/src/metric/multiclass_metric.cc index d51379c64..428554976 100644 --- a/src/metric/multiclass_metric.cc +++ b/src/metric/multiclass_metric.cc @@ -31,7 +31,7 @@ struct EvalMClassBase : public Metric { << "mlogloss and merror are only used for multi-class classification," << " use logloss for binary classification"; const bst_omp_uint ndata = static_cast(info.labels.size()); - float sum = 0.0, wsum = 0.0; + double sum = 0.0, wsum = 0.0; int label_error = 0; #pragma omp parallel for reduction(+: sum, wsum) schedule(static) for (bst_omp_uint i = 0; i < ndata; ++i) { @@ -50,7 +50,7 @@ struct EvalMClassBase : public Metric { << "MultiClassEvaluation: label must be in [0, num_class)," << " num_class=" << nclass << " but found " << label_error << " in label"; - float dat[2]; dat[0] = sum, dat[1] = wsum; + double dat[2]; dat[0] = sum, dat[1] = wsum; if (distributed) { rabit::Allreduce(dat, 2); } From 52227a8920aa0927345304b4ec119c73d5668a94 Mon Sep 17 00:00:00 2001 From: tqchen Date: Tue, 19 Jan 2016 20:44:29 -0800 Subject: [PATCH 04/16] [TREE] Refactor histmaker --- src/tree/updater_basemaker-inl.h | 38 +++++++++++++++++++++++++------- src/tree/updater_histmaker.cc | 21 +++++++----------- 2 files changed, 38 insertions(+), 21 deletions(-) diff --git a/src/tree/updater_basemaker-inl.h b/src/tree/updater_basemaker-inl.h index cad3ec811..271aa4ae2 100644 --- a/src/tree/updater_basemaker-inl.h +++ b/src/tree/updater_basemaker-inl.h @@ -206,6 +206,16 @@ class BaseMaker: public TreeUpdater { const RegTree &tree) { // set the positions in the nondefault this->SetNonDefaultPositionCol(nodes, p_fmat, tree); + this->SetDefaultPostion(p_fmat, tree); + } + /*! + * \brief helper function to set the non-leaf positions to default direction. + * This function can be applied multiple times and will get the same result. + * \param p_fmat feature matrix needed for tree construction + * \param tree the regression tree structure + */ + inline void SetDefaultPostion(DMatrix *p_fmat, + const RegTree &tree) { // set rest of instances to default position const RowSet &rowset = p_fmat->buffered_rowset(); // set default direct nodes to default @@ -222,7 +232,7 @@ class BaseMaker: public TreeUpdater { if (tree[nid].cright() == -1) { position[ridx] = ~nid; } - } else { + } else { // push to default branch if (tree[nid].default_left()) { this->SetEncodePosition(ridx, tree[nid].cleft()); @@ -234,16 +244,16 @@ class BaseMaker: public TreeUpdater { } /*! * \brief this is helper function uses column based data structure, - * update all positions into nondefault branch, if any, ignore the default branch * \param nodes the set of nodes that contains the split to be used - * \param p_fmat feature matrix needed for tree construction * \param tree the regression tree structure + * \param out_split_set The split index set */ - virtual void SetNonDefaultPositionCol(const std::vector &nodes, - DMatrix *p_fmat, - const RegTree &tree) { + inline void GetSplitSet(const std::vector &nodes, + const RegTree &tree, + std::vector* out_split_set) { + std::vector& fsplits = *out_split_set; + fsplits.clear(); // step 1, classify the non-default data into right places - std::vector fsplits; for (size_t i = 0; i < nodes.size(); ++i) { const int nid = nodes[i]; if (!tree[nid].is_leaf()) { @@ -252,7 +262,19 @@ class BaseMaker: public TreeUpdater { } std::sort(fsplits.begin(), fsplits.end()); fsplits.resize(std::unique(fsplits.begin(), fsplits.end()) - fsplits.begin()); - + } + /*! + * \brief this is helper function uses column based data structure, + * update all positions into nondefault branch, if any, ignore the default branch + * \param nodes the set of nodes that contains the split to be used + * \param p_fmat feature matrix needed for tree construction + * \param tree the regression tree structure + */ + virtual void SetNonDefaultPositionCol(const std::vector &nodes, + DMatrix *p_fmat, + const RegTree &tree) { + std::vector fsplits; + this->GetSplitSet(nodes, tree, &fsplits); dmlc::DataIter *iter = p_fmat->ColIterator(fsplits); while (iter->Next()) { const ColBatch &batch = iter->Value(); diff --git a/src/tree/updater_histmaker.cc b/src/tree/updater_histmaker.cc index c6d53b270..e7254307b 100644 --- a/src/tree/updater_histmaker.cc +++ b/src/tree/updater_histmaker.cc @@ -355,8 +355,9 @@ class CQHistMaker: public HistMaker { #endif } void ResetPositionAfterSplit(DMatrix *p_fmat, - const RegTree &tree) override { + const RegTree &tree) override { this->ResetPositionCol(this->qexpand, p_fmat, tree); + this->GetSplitSet(this->qexpand, tree, &fsplit_set); } void ResetPosAndPropose(const std::vector &gpair, DMatrix *p_fmat, @@ -388,14 +389,10 @@ class CQHistMaker: public HistMaker { for (size_t i = 0; i < sketchs.size(); ++i) { summary_array[i].Reserve(max_size); } - // if it is C++11, use lazy evaluation for Allreduce -#if __cplusplus >= 201103L - auto lazy_get_summary = [&]() -#endif - { + { // get smmary thread_sketch.resize(this->get_nthread()); - // number of rows in + // number of rows in data const size_t nrows = p_fmat->buffered_rowset().size(); // start accumulating statistics dmlc::DataIter *iter = p_fmat->ColIterator(freal_set); @@ -422,15 +419,10 @@ class CQHistMaker: public HistMaker { summary_array[i].SetPrune(out, max_size); } CHECK_EQ(summary_array.size(), sketchs.size()); - }; + } if (summary_array.size() != 0) { size_t nbytes = WXQSketch::SummaryContainer::CalcMemCost(max_size); -#if __cplusplus >= 201103L - sreducer.Allreduce(dmlc::BeginPtr(summary_array), - nbytes, summary_array.size(), lazy_get_summary); -#else sreducer.Allreduce(dmlc::BeginPtr(summary_array), nbytes, summary_array.size()); -#endif } // now we get the final result of sketch, setup the cut this->wspace.cut.clear(); @@ -617,6 +609,8 @@ class CQHistMaker: public HistMaker { std::vector feat2workindex; // set of index from fset that are real std::vector freal_set; + // set of index from that are split candidates. + std::vector fsplit_set; // thread temp data std::vector > thread_sketch; // used to hold statistics @@ -633,6 +627,7 @@ class CQHistMaker: public HistMaker { std::vector > sketchs; }; + template class QuantileHistMaker: public HistMaker { protected: From 523afcbcd25b02437c4b77f36cd503dfe8c0b3ae Mon Sep 17 00:00:00 2001 From: tqchen Date: Tue, 19 Jan 2016 21:53:52 -0800 Subject: [PATCH 05/16] [TREE] Cleanup some functions, add utility function for two pass --- src/tree/updater_basemaker-inl.h | 39 +++++++++++++++++++++ src/tree/updater_histmaker.cc | 58 +++++++++++++++----------------- 2 files changed, 66 insertions(+), 31 deletions(-) diff --git a/src/tree/updater_basemaker-inl.h b/src/tree/updater_basemaker-inl.h index 271aa4ae2..b6dbacd6c 100644 --- a/src/tree/updater_basemaker-inl.h +++ b/src/tree/updater_basemaker-inl.h @@ -242,6 +242,45 @@ class BaseMaker: public TreeUpdater { } } } + /*! + * \brief this is helper function uses column based data structure, + * to CORRECT the positions of non-default directions that WAS set to default + * before calling this function. + * \param batch The column batch + * \param sorted_split_set The set of index that contains split solutions. + * \param tree the regression tree structure + */ + inline void CorrectNonDefaultPositionByBatch( + const ColBatch& batch, + const std::vector &sorted_split_set, + const RegTree &tree) { + for (size_t i = 0; i < batch.size; ++i) { + ColBatch::Inst col = batch[i]; + const bst_uint fid = batch.col_index[i]; + auto it = std::lower_bound(sorted_split_set.begin(), sorted_split_set.end(), fid); + + if (it != sorted_split_set.end() && *it == fid) { + const bst_omp_uint ndata = static_cast(col.length); + #pragma omp parallel for schedule(static) + for (bst_omp_uint j = 0; j < ndata; ++j) { + const bst_uint ridx = col[j].index; + const float fvalue = col[j].fvalue; + const int nid = this->DecodePosition(ridx); + CHECK(tree[nid].is_leaf()); + int pid = tree[nid].parent(); + + // go back to parent, correct those who are not default + if (!tree[nid].is_root() && tree[pid].split_index() == fid) { + if (fvalue < tree[pid].split_cond()) { + this->SetEncodePosition(ridx, tree[pid].cleft()); + } else { + this->SetEncodePosition(ridx, tree[pid].cright()); + } + } + } + } + } + } /*! * \brief this is helper function uses column based data structure, * \param nodes the set of nodes that contains the split to be used diff --git a/src/tree/updater_histmaker.cc b/src/tree/updater_histmaker.cc index e7254307b..40089c26d 100644 --- a/src/tree/updater_histmaker.cc +++ b/src/tree/updater_histmaker.cc @@ -127,6 +127,11 @@ class HistMaker: public BaseMaker { RegTree *p_tree) { this->InitData(gpair, *p_fmat, *p_tree); this->InitWorkSet(p_fmat, *p_tree, &fwork_set); + // mark root node as fresh. + for (int i = 0; i < p_tree->param.num_roots; ++i) { + (*p_tree)[i].set_leaf(0.0f, 0); + } + for (int depth = 0; depth < param.max_depth; ++depth) { // reset and propose candidate split this->ResetPosAndPropose(gpair, p_fmat, fwork_set, *p_tree); @@ -356,8 +361,8 @@ class CQHistMaker: public HistMaker { } void ResetPositionAfterSplit(DMatrix *p_fmat, const RegTree &tree) override { + // remove this reset and do two pass reset on ResetPosAndPropose this->ResetPositionCol(this->qexpand, p_fmat, tree); - this->GetSplitSet(this->qexpand, tree, &fsplit_set); } void ResetPosAndPropose(const std::vector &gpair, DMatrix *p_fmat, @@ -367,18 +372,18 @@ class CQHistMaker: public HistMaker { // fill in reverse map feat2workindex.resize(tree.param.num_feature); std::fill(feat2workindex.begin(), feat2workindex.end(), -1); - freal_set.clear(); + work_set.clear(); for (size_t i = 0; i < fset.size(); ++i) { if (feat_helper.Type(fset[i]) == 2) { - feat2workindex[fset[i]] = static_cast(freal_set.size()); - freal_set.push_back(fset[i]); + feat2workindex[fset[i]] = static_cast(work_set.size()); + work_set.push_back(fset[i]); } else { feat2workindex[fset[i]] = -2; } } - this->GetNodeStats(gpair, *p_fmat, tree, - &thread_stats, &node_stats); - sketchs.resize(this->qexpand.size() * freal_set.size()); + const size_t work_set_size = work_set.size(); + + sketchs.resize(this->qexpand.size() * work_set_size); for (size_t i = 0; i < sketchs.size(); ++i) { sketchs[i].Init(info.num_row, this->param.sketch_eps); } @@ -392,10 +397,9 @@ class CQHistMaker: public HistMaker { { // get smmary thread_sketch.resize(this->get_nthread()); - // number of rows in data - const size_t nrows = p_fmat->buffered_rowset().size(); + // start accumulating statistics - dmlc::DataIter *iter = p_fmat->ColIterator(freal_set); + dmlc::DataIter *iter = p_fmat->ColIterator(work_set); iter->BeforeFirst(); while (iter->Next()) { const ColBatch &batch = iter->Value(); @@ -406,9 +410,7 @@ class CQHistMaker: public HistMaker { int offset = feat2workindex[batch.col_index[i]]; if (offset >= 0) { this->UpdateSketchCol(gpair, batch[i], tree, - node_stats, - freal_set, offset, - batch[i].length == nrows, + work_set_size, offset, &thread_sketch[omp_get_thread_num()]); } } @@ -419,11 +421,14 @@ class CQHistMaker: public HistMaker { summary_array[i].SetPrune(out, max_size); } CHECK_EQ(summary_array.size(), sketchs.size()); - } + } if (summary_array.size() != 0) { size_t nbytes = WXQSketch::SummaryContainer::CalcMemCost(max_size); sreducer.Allreduce(dmlc::BeginPtr(summary_array), nbytes, summary_array.size()); } + // update node statistics. + this->GetNodeStats(gpair, *p_fmat, tree, + &thread_stats, &node_stats); // now we get the final result of sketch, setup the cut this->wspace.cut.clear(); this->wspace.rptr.clear(); @@ -432,7 +437,7 @@ class CQHistMaker: public HistMaker { for (size_t i = 0; i < fset.size(); ++i) { int offset = feat2workindex[fset[i]]; if (offset >= 0) { - const WXQSketch::Summary &a = summary_array[wid * freal_set.size() + offset]; + const WXQSketch::Summary &a = summary_array[wid * work_set_size + offset]; for (size_t i = 1; i < a.size; ++i) { bst_float cpt = a.data[i].value - rt_eps; if (i == 1 || cpt > this->wspace.cut.back()) { @@ -518,10 +523,8 @@ class CQHistMaker: public HistMaker { inline void UpdateSketchCol(const std::vector &gpair, const ColBatch::Inst &c, const RegTree &tree, - const std::vector &nstats, - const std::vector &frealset, + size_t work_set_size, bst_uint offset, - bool col_full, std::vector *p_temp) { if (c.length == 0) return; // initialize sbuilder for use @@ -531,22 +534,15 @@ class CQHistMaker: public HistMaker { const unsigned nid = this->qexpand[i]; const unsigned wid = this->node2workindex[nid]; sbuilder[nid].sum_total = 0.0f; - sbuilder[nid].sketch = &sketchs[wid * frealset.size() + offset]; + sbuilder[nid].sketch = &sketchs[wid * work_set_size + offset]; } - if (!col_full) { - // first pass, get sum of weight, TODO, optimization to skip first pass - for (bst_uint j = 0; j < c.length; ++j) { + // first pass, get sum of weight, TODO, optimization to skip first pass + for (bst_uint j = 0; j < c.length; ++j) { const bst_uint ridx = c[j].index; const int nid = this->position[ridx]; if (nid >= 0) { - sbuilder[nid].sum_total += gpair[ridx].hess; - } - } - } else { - for (size_t i = 0; i < this->qexpand.size(); ++i) { - const unsigned nid = this->qexpand[i]; - sbuilder[nid].sum_total = static_cast(nstats[nid].sum_hess); + sbuilder[nid].sum_total += gpair[ridx].hess; } } // if only one value, no need to do second pass @@ -607,8 +603,8 @@ class CQHistMaker: public HistMaker { BaseMaker::FMetaHelper feat_helper; // temp space to map feature id to working index std::vector feat2workindex; - // set of index from fset that are real - std::vector freal_set; + // set of index from fset that are current work set + std::vector work_set; // set of index from that are split candidates. std::vector fsplit_set; // thread temp data From a500fbc9b0afcebf7801199be007373344745144 Mon Sep 17 00:00:00 2001 From: tqchen Date: Tue, 19 Jan 2016 21:55:33 -0800 Subject: [PATCH 06/16] [TREE] switch to two pass --- src/tree/updater_histmaker.cc | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/src/tree/updater_histmaker.cc b/src/tree/updater_histmaker.cc index 40089c26d..0f4b93c3b 100644 --- a/src/tree/updater_histmaker.cc +++ b/src/tree/updater_histmaker.cc @@ -361,8 +361,7 @@ class CQHistMaker: public HistMaker { } void ResetPositionAfterSplit(DMatrix *p_fmat, const RegTree &tree) override { - // remove this reset and do two pass reset on ResetPosAndPropose - this->ResetPositionCol(this->qexpand, p_fmat, tree); + this->GetSplitSet(this->qexpand, tree, &fsplit_set); } void ResetPosAndPropose(const std::vector &gpair, DMatrix *p_fmat, @@ -398,11 +397,20 @@ class CQHistMaker: public HistMaker { // get smmary thread_sketch.resize(this->get_nthread()); + // TWOPASS: use the real set + split set in the column iteration. + this->SetDefaultPostion(p_fmat, tree); + work_set.insert(work_set.end(), fsplit_set.begin(), fsplit_set.end()); + std::sort(work_set.begin(), work_set.end()); + work_set.resize(std::unique(work_set.begin(), work_set.end()) - work_set.begin()); + // start accumulating statistics dmlc::DataIter *iter = p_fmat->ColIterator(work_set); iter->BeforeFirst(); while (iter->Next()) { const ColBatch &batch = iter->Value(); + // TWOPASS: use the real set + split set in the column iteration. + this->CorrectNonDefaultPositionByBatch(batch, fsplit_set, tree); + // start enumeration const bst_omp_uint nsize = static_cast(batch.size); #pragma omp parallel for schedule(dynamic, 1) From 88e362732ff2c77b93777207b51774b1e64dcffc Mon Sep 17 00:00:00 2001 From: tqchen Date: Mon, 25 Jan 2016 09:23:26 -0800 Subject: [PATCH 07/16] [DMLC] Update dmlc --- dmlc-core | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dmlc-core b/dmlc-core index 257b09a0b..c66d2ab2d 160000 --- a/dmlc-core +++ b/dmlc-core @@ -1 +1 @@ -Subproject commit 257b09a0ba18625a9fcf3b9471a9b1c35a767b7b +Subproject commit c66d2ab2d30f55303b65b5ed9dc1f9ee04260f7e From b27b51f60ebdcc564978e904cfb067b8f1a0cfa5 Mon Sep 17 00:00:00 2001 From: tqchen Date: Mon, 25 Jan 2016 11:56:16 -0800 Subject: [PATCH 08/16] [PLUGIN] Add densify parser --- dmlc-core | 2 +- make/travis.mk | 1 + plugin/dense_parser/dense_libsvm.cc | 85 +++++++++++++++++++++++++++++ plugin/dense_parser/plugin.mk | 2 + src/data/data.cc | 2 +- 5 files changed, 90 insertions(+), 2 deletions(-) create mode 100644 plugin/dense_parser/dense_libsvm.cc create mode 100644 plugin/dense_parser/plugin.mk diff --git a/dmlc-core b/dmlc-core index c66d2ab2d..e0a18eb45 160000 --- a/dmlc-core +++ b/dmlc-core @@ -1 +1 @@ -Subproject commit c66d2ab2d30f55303b65b5ed9dc1f9ee04260f7e +Subproject commit e0a18eb45cb9c6e7314dbd3328dda158e3a3486f diff --git a/make/travis.mk b/make/travis.mk index 82a9696bd..85f53ca32 100644 --- a/make/travis.mk +++ b/make/travis.mk @@ -31,3 +31,4 @@ LIBJVM=$(JAVA_HOME)/jre/lib/amd64/server # XGB_PLUGINS += plugin/example/plugin.mk XGB_PLUGINS += plugin/lz4/plugin.mk +XGB_PLUGINS += plugin/dense_libsvm/plugin.mk diff --git a/plugin/dense_parser/dense_libsvm.cc b/plugin/dense_parser/dense_libsvm.cc new file mode 100644 index 000000000..45153c64b --- /dev/null +++ b/plugin/dense_parser/dense_libsvm.cc @@ -0,0 +1,85 @@ +/*! + * Copyright 2015 by Contributors + * \file dense_libsvm.cc + * \brief Plugin to load in libsvm, but fill all the missing entries with zeros. + * This plugin is mainly used for benchmark purposes and do not need to be included. + */ +#include +#include + +namespace dmlc { +namespace data { + +template +class DensifyParser : public dmlc::Parser { + public: + DensifyParser(dmlc::Parser* parser, uint32_t num_col) + : parser_(parser), num_col_(num_col) { + } + + void BeforeFirst() override { + parser_->BeforeFirst(); + } + + bool Next() override { + if (!parser_->Next()) return false; + const RowBlock& batch = parser_->Value(); + LOG(INFO) << batch.size; + dense_index_.resize(num_col_ * batch.size); + dense_value_.resize(num_col_ * batch.size); + std::fill(dense_value_.begin(), dense_value_.end(), 0.0f); + offset_.resize(batch.size + 1); + offset_[0] = 0; + + for (size_t i = 0; i < batch.size; ++i) { + offset_[i + 1] = (i + 1) * num_col_; + Row row = batch[i]; + for (uint32_t j = 0; j < num_col_; ++j) { + dense_index_[i * num_col_ + j] = j; + } + for (unsigned k = 0; k < row.length; ++k) { + uint32_t index = row.get_index(k); + CHECK_LT(index, num_col_) + << "Featuere index larger than num_col"; + dense_value_[i * num_col_ + index] = row.get_value(k); + } + } + out_ = batch; + out_.index = dmlc::BeginPtr(dense_index_); + out_.value = dmlc::BeginPtr(dense_value_); + out_.offset = dmlc::BeginPtr(offset_); + return true; + } + + const dmlc::RowBlock& Value() const override { + return out_; + } + + size_t BytesRead() const override { + return parser_->BytesRead(); + } + + private: + RowBlock out_; + std::unique_ptr > parser_; + uint32_t num_col_; + std::vector offset_; + std::vector dense_index_; + std::vector dense_value_; +}; + +template +Parser * +CreateDenseLibSVMParser(const std::string& path, + const std::map& args, + unsigned part_index, + unsigned num_parts) { + CHECK_NE(args.count("num_col"), 0) << "expect num_col in dense_libsvm"; + return new DensifyParser( + Parser::Create(path.c_str(), part_index, num_parts, "libsvm"), + uint32_t(atoi(args.at("num_col").c_str()))); +} +} // namespace data + +DMLC_REGISTER_DATA_PARSER(uint32_t, dense_libsvm, data::CreateDenseLibSVMParser); +} // namespace dmlc diff --git a/plugin/dense_parser/plugin.mk b/plugin/dense_parser/plugin.mk new file mode 100644 index 000000000..027cc42f8 --- /dev/null +++ b/plugin/dense_parser/plugin.mk @@ -0,0 +1,2 @@ +PLUGIN_OBJS += build_plugin/dense_parser/dense_libsvm.o +PLUGIN_LDFLAGS += diff --git a/src/data/data.cc b/src/data/data.cc index 9c63f8aa2..02b972d83 100644 --- a/src/data/data.cc +++ b/src/data/data.cc @@ -181,7 +181,7 @@ DMatrix* DMatrix::Load(const std::string& uri, std::string ftype = file_format; if (file_format == "auto") ftype = "libsvm"; std::unique_ptr > parser( - dmlc::Parser::Create(fname.c_str(), partid, npart, ftype.c_str())); + dmlc::Parser::Create(fname.c_str(), partid, npart, file_format.c_str())); DMatrix* dmat = DMatrix::Create(parser.get(), cache_file); if (!silent) { LOG(CONSOLE) << dmat->info().num_row << 'x' << dmat->info().num_col << " matrix with " From 52184387167aecdf0dc837392a7dec1f95c0f798 Mon Sep 17 00:00:00 2001 From: tqchen Date: Wed, 27 Jan 2016 10:57:00 -0800 Subject: [PATCH 09/16] [DMLC] update dmlccore --- dmlc-core | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dmlc-core b/dmlc-core index e0a18eb45..53ab0280f 160000 --- a/dmlc-core +++ b/dmlc-core @@ -1 +1 @@ -Subproject commit e0a18eb45cb9c6e7314dbd3328dda158e3a3486f +Subproject commit 53ab0280f4c1da4fd6f9cee22ce4716f04b3e712 From 46be6181b5a936381c6e9a7c55e8b2b22a72eebe Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Wed, 27 Jan 2016 21:21:00 +0000 Subject: [PATCH 10/16] [DIST] fix distirbuted setting --- src/cli_main.cc | 2 +- src/data/data.cc | 12 ++++++++---- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/src/cli_main.cc b/src/cli_main.cc index a08e3fd6d..720a3b185 100644 --- a/src/cli_main.cc +++ b/src/cli_main.cc @@ -318,6 +318,7 @@ int CLIRunTask(int argc, char *argv[]) { printf("Usage: \n"); return 0; } + rabit::Init(argc, argv); std::vector > cfg; cfg.push_back(std::make_pair("seed", "0")); @@ -336,7 +337,6 @@ int CLIRunTask(int argc, char *argv[]) { CLIParam param; param.Configure(cfg); - rabit::Init(argc, argv); switch (param.task) { case kTrain: CLITrain(param); break; case kDump2Text: CLIDump2Text(param); break; diff --git a/src/data/data.cc b/src/data/data.cc index 02b972d83..b4753bcb4 100644 --- a/src/data/data.cc +++ b/src/data/data.cc @@ -141,7 +141,9 @@ DMatrix* DMatrix::Load(const std::string& uri, << "Only one `#` is allowed in file path for cache file specification."; if (load_row_split) { std::ostringstream os; - os << cache_file << ".r" << rabit::GetRank(); + os << "r" << rabit::GetRank() + << "-" << rabit::GetWorldSize() + << "." << cache_file; cache_file = os.str(); } } else { @@ -154,9 +156,11 @@ DMatrix* DMatrix::Load(const std::string& uri, } else { // test option to load in part npart = dmlc::GetEnv("XGBOOST_TEST_NPART", 1); - if (npart != 1) { - LOG(CONSOLE) << "Partial load option on npart=" << npart; - } + } + + if (npart != 1) { + LOG(CONSOLE) << "Load part of data " << partid + << " of " << npart << " parts"; } // legacy handling of binary data loading if (file_format == "auto" && !load_row_split) { From 724eda24358cda0f1ae40905e8fdfdf1d152ba42 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Wed, 27 Jan 2016 21:50:45 +0000 Subject: [PATCH 11/16] remove reserve for more aggressive memory generation --- src/data/simple_csr_source.cc | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/data/simple_csr_source.cc b/src/data/simple_csr_source.cc index db23bb9d7..78f14f360 100644 --- a/src/data/simple_csr_source.cc +++ b/src/data/simple_csr_source.cc @@ -41,7 +41,6 @@ void SimpleCSRSource::CopyFrom(dmlc::Parser* parser) { if (batch.weight != nullptr) { info.weights.insert(info.weights.end(), batch.weight, batch.weight + batch.size); } - row_data_.reserve(row_data_.size() + batch.offset[batch.size] - batch.offset[0]); CHECK(batch.index != nullptr); // update information this->info.num_row += batch.size; @@ -54,9 +53,8 @@ void SimpleCSRSource::CopyFrom(dmlc::Parser* parser) { static_cast(index + 1)); } size_t top = row_ptr_.size(); - row_ptr_.resize(top + batch.size); for (size_t i = 0; i < batch.size; ++i) { - row_ptr_[top + i] = row_ptr_[top - 1] + batch.offset[i + 1] - batch.offset[0]; + row_ptr_.push_back(row_ptr_[top - 1] + batch.offset[i + 1] - batch.offset[0]); } } this->info.num_nonzero = static_cast(row_data_.size()); From c36195795a7f996c92ba345d9851101237d31c6e Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Thu, 28 Jan 2016 00:58:48 +0000 Subject: [PATCH 12/16] increase shard --- src/data/data.cc | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/src/data/data.cc b/src/data/data.cc index b4753bcb4..11a36ffdf 100644 --- a/src/data/data.cc +++ b/src/data/data.cc @@ -138,12 +138,24 @@ DMatrix* DMatrix::Load(const std::string& uri, cache_file = uri.substr(dlm_pos + 1, uri.length()); fname = uri.substr(0, dlm_pos); CHECK_EQ(cache_file.find('#'), std::string::npos) - << "Only one `#` is allowed in file path for cache file specification."; + << "Only one `#` is allowed in file path for cache file specification."; if (load_row_split) { std::ostringstream os; - os << "r" << rabit::GetRank() - << "-" << rabit::GetWorldSize() - << "." << cache_file; + std::vector cache_shards = common::Split(cache_file, ':'); + for (size_t i = 0; i < cache_shards.size(); ++i) { + size_t pos = cache_shards[i].rfind('.'); + if (pos == std::string::npos) { + os << cache_shards[i] + << ".r" << rabit::GetRank() + << "-" << rabit::GetWorldSize(); + } else { + os << cache_shards[i].substr(0, pos) + << ".r" << rabit::GetRank() + << "-" << rabit::GetWorldSize() + << cache_shards[i].substr(pos, cache_shards[i].length()); + } + if (i + 1 != cache_shards.size()) os << ':'; + } cache_file = os.str(); } } else { From 2f2080a337deef9f46fb5966cb25a00ed6086bae Mon Sep 17 00:00:00 2001 From: tqchen Date: Thu, 28 Jan 2016 11:26:37 -0800 Subject: [PATCH 13/16] [TREE] Remove gap constraint, make tree construction more robust --- include/xgboost/base.h | 4 +--- src/tree/updater_colmaker.cc | 8 ++++---- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/include/xgboost/base.h b/include/xgboost/base.h index 3674c6117..a3b8c31f5 100644 --- a/include/xgboost/base.h +++ b/include/xgboost/base.h @@ -62,9 +62,7 @@ struct bst_gpair { }; /*! \brief small eps gap for minimum split decision. */ -const float rt_eps = 1e-5f; -/*! \brief min gap between feature values to allow a split happen */ -const float rt_2eps = rt_eps * 2.0f; +const float rt_eps = 1e-6f; /*! \brief define unsigned long for openmp loop */ typedef dmlc::omp_ulong omp_ulong; diff --git a/src/tree/updater_colmaker.cc b/src/tree/updater_colmaker.cc index ded1dcdfe..79c013e29 100644 --- a/src/tree/updater_colmaker.cc +++ b/src/tree/updater_colmaker.cc @@ -291,7 +291,7 @@ class ColMaker: public TreeUpdater { ThreadEntry &e = stemp[tid][nid]; float fsplit; if (tid != 0) { - if (std::abs(stemp[tid - 1][nid].last_fvalue - e.first_fvalue) > rt_2eps) { + if (stemp[tid - 1][nid].last_fvalue != e.first_fvalue) { fsplit = (stemp[tid - 1][nid].last_fvalue + e.first_fvalue) * 0.5f; } else { continue; @@ -352,7 +352,7 @@ class ColMaker: public TreeUpdater { e.first_fvalue = fvalue; } else { // forward default right - if (std::abs(fvalue - e.first_fvalue) > rt_2eps) { + if (fvalue != e.first_fvalue) { if (need_forward) { c.SetSubstract(snode[nid].stats, e.stats); if (c.sum_hess >= param.min_child_weight && @@ -393,7 +393,7 @@ class ColMaker: public TreeUpdater { e.last_fvalue = fvalue; } else { // try to find a split - if (std::abs(fvalue - e.last_fvalue) > rt_2eps && + if (fvalue != e.last_fvalue && e.stats.sum_hess >= param.min_child_weight) { c.SetSubstract(snode[nid].stats, e.stats); if (c.sum_hess >= param.min_child_weight) { @@ -511,7 +511,7 @@ class ColMaker: public TreeUpdater { e.last_fvalue = fvalue; } else { // try to find a split - if (std::abs(fvalue - e.last_fvalue) > rt_2eps && + if (fvalue != e.last_fvalue && e.stats.sum_hess >= param.min_child_weight) { c.SetSubstract(snode[nid].stats, e.stats); if (c.sum_hess >= param.min_child_weight) { From ce4d59ed69a57ee4a0c93f9a80b5a985595e3e06 Mon Sep 17 00:00:00 2001 From: tqchen Date: Thu, 28 Jan 2016 17:16:38 -0800 Subject: [PATCH 14/16] [TREE] Enable global proposal for faster speed --- src/data/data.cc | 28 +++---- src/tree/updater_histmaker.cc | 135 ++++++++++++++++++++++++++++++++-- 2 files changed, 141 insertions(+), 22 deletions(-) diff --git a/src/data/data.cc b/src/data/data.cc index 11a36ffdf..39fa260d1 100644 --- a/src/data/data.cc +++ b/src/data/data.cc @@ -138,23 +138,23 @@ DMatrix* DMatrix::Load(const std::string& uri, cache_file = uri.substr(dlm_pos + 1, uri.length()); fname = uri.substr(0, dlm_pos); CHECK_EQ(cache_file.find('#'), std::string::npos) - << "Only one `#` is allowed in file path for cache file specification."; + << "Only one `#` is allowed in file path for cache file specification."; if (load_row_split) { std::ostringstream os; std::vector cache_shards = common::Split(cache_file, ':'); for (size_t i = 0; i < cache_shards.size(); ++i) { - size_t pos = cache_shards[i].rfind('.'); - if (pos == std::string::npos) { - os << cache_shards[i] - << ".r" << rabit::GetRank() - << "-" << rabit::GetWorldSize(); - } else { - os << cache_shards[i].substr(0, pos) - << ".r" << rabit::GetRank() - << "-" << rabit::GetWorldSize() - << cache_shards[i].substr(pos, cache_shards[i].length()); - } - if (i + 1 != cache_shards.size()) os << ':'; + size_t pos = cache_shards[i].rfind('.'); + if (pos == std::string::npos) { + os << cache_shards[i] + << ".r" << rabit::GetRank() + << "-" << rabit::GetWorldSize(); + } else { + os << cache_shards[i].substr(0, pos) + << ".r" << rabit::GetRank() + << "-" << rabit::GetWorldSize() + << cache_shards[i].substr(pos, cache_shards[i].length()); + } + if (i + 1 != cache_shards.size()) os << ':'; } cache_file = os.str(); } @@ -172,7 +172,7 @@ DMatrix* DMatrix::Load(const std::string& uri, if (npart != 1) { LOG(CONSOLE) << "Load part of data " << partid - << " of " << npart << " parts"; + << " of " << npart << " parts"; } // legacy handling of binary data loading if (file_format == "auto" && !load_row_split) { diff --git a/src/tree/updater_histmaker.cc b/src/tree/updater_histmaker.cc index 0f4b93c3b..6af7c8117 100644 --- a/src/tree/updater_histmaker.cc +++ b/src/tree/updater_histmaker.cc @@ -268,6 +268,10 @@ class HistMaker: public BaseMaker { template class CQHistMaker: public HistMaker { + public: + CQHistMaker() : cache_dmatrix_(nullptr) { + } + protected: struct HistEntry { typename HistMaker::HistUnit hist; @@ -290,9 +294,13 @@ class CQHistMaker: public HistMaker { */ inline void Add(bst_float fv, bst_gpair gstats) { - while (istart < hist.size && !(fv < hist.cut[istart])) ++istart; - CHECK_NE(istart, hist.size); - hist.data[istart].Add(gstats); + if (fv < hist.cut[istart]) { + hist.data[istart].Add(gstats); + } else { + while (istart < hist.size && !(fv < hist.cut[istart])) ++istart; + CHECK_NE(istart, hist.size); + hist.data[istart].Add(gstats); + } } }; // sketch type used for this @@ -301,7 +309,10 @@ class CQHistMaker: public HistMaker { void InitWorkSet(DMatrix *p_fmat, const RegTree &tree, std::vector *p_fset) override { - feat_helper.InitByCol(p_fmat, tree); + if (p_fmat != cache_dmatrix_) { + feat_helper.InitByCol(p_fmat, tree); + cache_dmatrix_ = p_fmat; + } feat_helper.SampleCol(this->param.colsample_bytree, p_fset); } // code to create histogram @@ -342,6 +353,9 @@ class CQHistMaker: public HistMaker { } } } + // update node statistics. + this->GetNodeStats(gpair, *p_fmat, tree, + &thread_stats, &node_stats); for (size_t i = 0; i < this->qexpand.size(); ++i) { const int nid = this->qexpand[i]; const int wid = this->node2workindex[nid]; @@ -434,9 +448,6 @@ class CQHistMaker: public HistMaker { size_t nbytes = WXQSketch::SummaryContainer::CalcMemCost(max_size); sreducer.Allreduce(dmlc::BeginPtr(summary_array), nbytes, summary_array.size()); } - // update node statistics. - this->GetNodeStats(gpair, *p_fmat, tree, - &thread_stats, &node_stats); // now we get the final result of sketch, setup the cut this->wspace.cut.clear(); this->wspace.rptr.clear(); @@ -475,7 +486,6 @@ class CQHistMaker: public HistMaker { (fset.size() + 1) * this->qexpand.size() + 1); } - private: inline void UpdateHistCol(const std::vector &gpair, const ColBatch::Inst &c, const MetaInfo &info, @@ -607,6 +617,8 @@ class CQHistMaker: public HistMaker { sbuilder[nid].Finalize(max_size); } } + // cached dmatrix where we initialized the feature on. + const DMatrix* cache_dmatrix_; // feature helper BaseMaker::FMetaHelper feat_helper; // temp space to map feature id to working index @@ -631,6 +643,107 @@ class CQHistMaker: public HistMaker { std::vector > sketchs; }; +// global proposal +template +class GlobalProposalHistMaker: public CQHistMaker { + protected: + void ResetPosAndPropose(const std::vector &gpair, + DMatrix *p_fmat, + const std::vector &fset, + const RegTree &tree) override { + if (this->qexpand.size() == 1 && !this->param.cache_global_proposal) { + cached_rptr_.clear(); + cached_cut_.clear(); + } + if (cached_rptr_.size() == 0) { + CHECK_EQ(this->qexpand.size(), 1); + CQHistMaker::ResetPosAndPropose(gpair, p_fmat, fset, tree); + cached_rptr_ = this->wspace.rptr; + cached_cut_ = this->wspace.cut; + } else { + this->wspace.cut.clear(); + this->wspace.rptr.clear(); + this->wspace.rptr.push_back(0); + for (size_t i = 0; i < this->qexpand.size(); ++i) { + for (size_t j = 0; j < cached_rptr_.size() - 1; ++j) { + this->wspace.rptr.push_back( + this->wspace.rptr.back() + cached_rptr_[j + 1] - cached_rptr_[j]); + } + this->wspace.cut.insert(this->wspace.cut.end(), cached_cut_.begin(), cached_cut_.end()); + } + CHECK_EQ(this->wspace.rptr.size(), + (fset.size() + 1) * this->qexpand.size() + 1); + CHECK_EQ(this->wspace.rptr.back(), this->wspace.cut.size()); + } + } + + // code to create histogram + void CreateHist(const std::vector &gpair, + DMatrix *p_fmat, + const std::vector &fset, + const RegTree &tree) override { + const MetaInfo &info = p_fmat->info(); + // fill in reverse map + this->feat2workindex.resize(tree.param.num_feature); + this->work_set = fset; + std::fill(this->feat2workindex.begin(), this->feat2workindex.end(), -1); + for (size_t i = 0; i < fset.size(); ++i) { + this->feat2workindex[fset[i]] = static_cast(i); + } + // start to work + this->wspace.Init(this->param, 1); + // to gain speedup in recovery + { + this->thread_hist.resize(this->get_nthread()); + + // TWOPASS: use the real set + split set in the column iteration. + this->SetDefaultPostion(p_fmat, tree); + this->work_set.insert(this->work_set.end(), this->fsplit_set.begin(), this->fsplit_set.end()); + std::sort(this->work_set.begin(), this->work_set.end()); + this->work_set.resize( + std::unique(this->work_set.begin(), this->work_set.end()) - this->work_set.begin()); + + // start accumulating statistics + dmlc::DataIter *iter = p_fmat->ColIterator(this->work_set); + iter->BeforeFirst(); + while (iter->Next()) { + const ColBatch &batch = iter->Value(); + // TWOPASS: use the real set + split set in the column iteration. + this->CorrectNonDefaultPositionByBatch(batch, this->fsplit_set, tree); + + // start enumeration + const bst_omp_uint nsize = static_cast(batch.size); + #pragma omp parallel for schedule(dynamic, 1) + for (bst_omp_uint i = 0; i < nsize; ++i) { + int offset = this->feat2workindex[batch.col_index[i]]; + if (offset >= 0) { + this->UpdateHistCol(gpair, batch[i], info, tree, + fset, offset, + &this->thread_hist[omp_get_thread_num()]); + } + } + } + + // update node statistics. + this->GetNodeStats(gpair, *p_fmat, tree, + &(this->thread_stats), &(this->node_stats)); + for (size_t i = 0; i < this->qexpand.size(); ++i) { + const int nid = this->qexpand[i]; + const int wid = this->node2workindex[nid]; + this->wspace.hset[0][fset.size() + wid * (fset.size()+1)] + .data[0] = this->node_stats[nid]; + } + } + this->histred.Allreduce(dmlc::BeginPtr(this->wspace.hset[0].data), + this->wspace.hset[0].data.size()); + } + + // cached unit pointer + std::vector cached_rptr_; + // cached cut value. + std::vector cached_cut_; +}; + template class QuantileHistMaker: public HistMaker { @@ -763,5 +876,11 @@ XGBOOST_REGISTER_TREE_UPDATER(HistMaker, "grow_histmaker") .set_body([]() { return new CQHistMaker(); }); + +XGBOOST_REGISTER_TREE_UPDATER(GlobalHistMaker, "grow_global_histmaker") +.describe("Tree constructor that uses approximate global proposal of histogram construction.") +.set_body([]() { + return new GlobalProposalHistMaker(); + }); } // namespace tree } // namespace xgboost From 63c4ad76170dd40cddc7aab7f6d2c49d0f94f6d4 Mon Sep 17 00:00:00 2001 From: tqchen Date: Wed, 10 Feb 2016 11:15:44 -0800 Subject: [PATCH 15/16] [APPROX] Make global proposal default, add group ptr solution --- make/travis.mk | 2 +- src/data/data.cc | 8 ++++++++ src/tree/param.h | 2 +- src/tree/updater_histmaker.cc | 8 +++++++- 4 files changed, 17 insertions(+), 3 deletions(-) diff --git a/make/travis.mk b/make/travis.mk index 85f53ca32..0c9fd5b7d 100644 --- a/make/travis.mk +++ b/make/travis.mk @@ -31,4 +31,4 @@ LIBJVM=$(JAVA_HOME)/jre/lib/amd64/server # XGB_PLUGINS += plugin/example/plugin.mk XGB_PLUGINS += plugin/lz4/plugin.mk -XGB_PLUGINS += plugin/dense_libsvm/plugin.mk +XGB_PLUGINS += plugin/dense_parser/plugin.mk diff --git a/src/data/data.cc b/src/data/data.cc index 39fa260d1..cea87c09f 100644 --- a/src/data/data.cc +++ b/src/data/data.cc @@ -124,6 +124,14 @@ void MetaInfo::SetInfo(const char* key, const void* dptr, DataType dtype, size_t base_margin.resize(num); DISPATCH_CONST_PTR(dtype, dptr, cast_dptr, std::copy(cast_dptr, cast_dptr + num, base_margin.begin())); + } else if (!std::strcmp(key, "group")) { + group_ptr.resize(num + 1); + DISPATCH_CONST_PTR(dtype, dptr, cast_dptr, + std::copy(cast_dptr, cast_dptr + num, group_ptr.begin() + 1)); + group_ptr[0] = 0; + for (size_t i = 1; i < group_ptr.size(); ++i) { + group_ptr[i] = group_ptr[i - 1] + group_ptr[i]; + } } } diff --git a/src/tree/param.h b/src/tree/param.h index b6ac89aef..e9de3ac14 100644 --- a/src/tree/param.h +++ b/src/tree/param.h @@ -87,7 +87,7 @@ struct TrainParam : public dmlc::Parameter { .describe("Subsample ratio of columns, resample on each tree construction."); DMLC_DECLARE_FIELD(opt_dense_col).set_range(0.0f, 1.0f).set_default(1.0f) .describe("EXP Param: speed optimization for dense column."); - DMLC_DECLARE_FIELD(sketch_eps).set_range(0.0f, 1.0f).set_default(0.1f) + DMLC_DECLARE_FIELD(sketch_eps).set_range(0.0f, 1.0f).set_default(0.03f) .describe("EXP Param: Sketch accuracy of approximate algorithm."); DMLC_DECLARE_FIELD(sketch_ratio).set_lower_bound(0.0f).set_default(2.0f) .describe("EXP Param: Sketch accuracy related parameter of approximate algorithm."); diff --git a/src/tree/updater_histmaker.cc b/src/tree/updater_histmaker.cc index 6af7c8117..fce76808a 100644 --- a/src/tree/updater_histmaker.cc +++ b/src/tree/updater_histmaker.cc @@ -871,7 +871,7 @@ class QuantileHistMaker: public HistMaker { std::vector > sketchs; }; -XGBOOST_REGISTER_TREE_UPDATER(HistMaker, "grow_histmaker") +XGBOOST_REGISTER_TREE_UPDATER(LocalHistMaker, "grow_local_histmaker") .describe("Tree constructor that uses approximate histogram construction.") .set_body([]() { return new CQHistMaker(); @@ -879,6 +879,12 @@ XGBOOST_REGISTER_TREE_UPDATER(HistMaker, "grow_histmaker") XGBOOST_REGISTER_TREE_UPDATER(GlobalHistMaker, "grow_global_histmaker") .describe("Tree constructor that uses approximate global proposal of histogram construction.") +.set_body([]() { + return new GlobalProposalHistMaker(); + }); + +XGBOOST_REGISTER_TREE_UPDATER(HistMaker, "grow_histmaker") +.describe("Tree constructor that uses approximate global of histogram construction.") .set_body([]() { return new GlobalProposalHistMaker(); }); From 413f119c7e452def77538d121bd269f4b84a602f Mon Sep 17 00:00:00 2001 From: tqchen Date: Wed, 10 Feb 2016 11:54:49 -0800 Subject: [PATCH 16/16] Update dmlc-core --- dmlc-core | 2 +- plugin/dense_parser/dense_libsvm.cc | 1 + src/data/data.cc | 1 + src/tree/updater_histmaker.cc | 2 +- 4 files changed, 4 insertions(+), 2 deletions(-) diff --git a/dmlc-core b/dmlc-core index 53ab0280f..0f8fd38bf 160000 --- a/dmlc-core +++ b/dmlc-core @@ -1 +1 @@ -Subproject commit 53ab0280f4c1da4fd6f9cee22ce4716f04b3e712 +Subproject commit 0f8fd38bf94e6666aa367be80195b1f2da87428c diff --git a/plugin/dense_parser/dense_libsvm.cc b/plugin/dense_parser/dense_libsvm.cc index 45153c64b..31d374af0 100644 --- a/plugin/dense_parser/dense_libsvm.cc +++ b/plugin/dense_parser/dense_libsvm.cc @@ -4,6 +4,7 @@ * \brief Plugin to load in libsvm, but fill all the missing entries with zeros. * This plugin is mainly used for benchmark purposes and do not need to be included. */ +#include #include #include diff --git a/src/data/data.cc b/src/data/data.cc index cea87c09f..65efa0f8f 100644 --- a/src/data/data.cc +++ b/src/data/data.cc @@ -9,6 +9,7 @@ #include "./sparse_batch_page.h" #include "./simple_dmatrix.h" #include "./simple_csr_source.h" +#include "../common/common.h" #include "../common/io.h" #if DMLC_ENABLE_STD_THREAD diff --git a/src/tree/updater_histmaker.cc b/src/tree/updater_histmaker.cc index fce76808a..bf3f2571e 100644 --- a/src/tree/updater_histmaker.cc +++ b/src/tree/updater_histmaker.cc @@ -651,7 +651,7 @@ class GlobalProposalHistMaker: public CQHistMaker { DMatrix *p_fmat, const std::vector &fset, const RegTree &tree) override { - if (this->qexpand.size() == 1 && !this->param.cache_global_proposal) { + if (this->qexpand.size() == 1) { cached_rptr_.clear(); cached_cut_.clear(); }