From 70d208d68c3a32aaa4fcd6aa456f286a4da5912f Mon Sep 17 00:00:00 2001 From: Rory Mitchell Date: Mon, 1 Oct 2018 01:29:03 +1300 Subject: [PATCH] Dmatrix refactor stage 2 (#3395) * DMatrix refactor 2 * Remove buffered rowset usage where possible * Transition to c++11 style iterators for row access * Transition column iterators to C++ 11 --- NEWS.md | 4 + include/xgboost/data.h | 134 ++++++++-- src/common/hist_util.cc | 10 +- src/data/data.cc | 7 +- src/data/simple_csr_source.cc | 5 +- src/data/simple_dmatrix.cc | 144 +++++----- src/data/simple_dmatrix.h | 78 +----- src/data/sparse_page_dmatrix.cc | 295 +++++---------------- src/data/sparse_page_dmatrix.h | 112 ++------ src/data/sparse_page_source.cc | 56 ++-- src/data/sparse_page_source.h | 21 +- src/gbm/gblinear.cc | 16 +- src/gbm/gbtree.cc | 6 +- src/learner.cc | 36 --- src/linear/coordinate_common.h | 32 +-- src/linear/updater_gpu_coordinate.cu | 4 +- src/linear/updater_shotgun.cc | 4 +- src/predictor/cpu_predictor.cc | 17 +- src/predictor/gpu_predictor.cu | 5 +- src/tree/updater_basemaker-inl.h | 30 +-- src/tree/updater_colmaker.cc | 70 ++--- src/tree/updater_gpu.cu | 15 +- src/tree/updater_gpu_hist.cu | 11 +- src/tree/updater_histmaker.cc | 20 +- src/tree/updater_refresh.cc | 5 +- src/tree/updater_skmaker.cc | 7 +- tests/cpp/c_api/test_c_api.cc | 10 +- tests/cpp/common/test_gpu_hist_util.cu | 6 +- tests/cpp/data/test_metainfo.cc | 13 +- tests/cpp/data/test_simple_csr_source.cc | 15 +- tests/cpp/data/test_simple_dmatrix.cc | 40 +-- tests/cpp/data/test_sparse_page_dmatrix.cc | 56 ++-- tests/cpp/linear/test_linear.cc | 4 - tests/cpp/predictor/test_cpu_predictor.cc | 2 +- tests/cpp/predictor/test_gpu_predictor.cu | 2 +- tests/cpp/tree/test_gpu_hist.cu | 13 +- 36 files changed, 459 insertions(+), 846 deletions(-) diff --git a/NEWS.md b/NEWS.md index 5c7e11794..135193270 100644 --- a/NEWS.md +++ b/NEWS.md @@ -3,6 +3,10 @@ XGBoost Change Log This file records the changes in xgboost library in reverse chronological order. +## Master (2018.09.30) +* BREAKING CHANGES + - External memory page files have changed, breaking backwards compatibility for temporary storage used during external memory training. This only affects external memory users upgrading their xgboost version - we recommend clearing all *.page files before resuming training. Model serialization is unaffected. + ## v0.80 (2018.08.13) * **JVM packages received a major upgrade**: To consolidate the APIs and improve the user experience, we refactored the design of XGBoost4J-Spark in a significant manner. (#3387) - Consolidated APIs: It is now much easier to integrate XGBoost models into a Spark ML pipeline. Users can control behaviors like output leaf prediction results by setting corresponding column names. Training is now more consistent with other Estimators in Spark MLLIB: there is now one single method `fit()` to train decision trees. diff --git a/include/xgboost/data.h b/include/xgboost/data.h index 436799fe2..fbc93def0 100644 --- a/include/xgboost/data.h +++ b/include/xgboost/data.h @@ -12,10 +12,12 @@ #include #include #include +#include #include #include #include "./base.h" #include "../../src/common/span.h" +#include "../../src/common/group_data.h" #include "../../src/common/host_device_vector.h" @@ -191,6 +193,49 @@ class SparsePage { data.HostVector().clear(); } + SparsePage GetTranspose(int num_columns) const { + SparsePage transpose; + common::ParallelGroupBuilder builder(&transpose.offset.HostVector(), + &transpose.data.HostVector()); + const int nthread = omp_get_max_threads(); + builder.InitBudget(num_columns, nthread); + long batch_size = static_cast(this->Size()); // NOLINT(*) +#pragma omp parallel for schedule(static) + for (long i = 0; i < batch_size; ++i) { // NOLINT(*) + int tid = omp_get_thread_num(); + auto inst = (*this)[i]; + for (bst_uint j = 0; j < inst.size(); ++j) { + builder.AddBudget(inst[j].index, tid); + } + } + builder.InitStorage(); +#pragma omp parallel for schedule(static) + for (long i = 0; i < batch_size; ++i) { // NOLINT(*) + int tid = omp_get_thread_num(); + auto inst = (*this)[i]; + for (bst_uint j = 0; j < inst.size(); ++j) { + builder.Push( + inst[j].index, + Entry(static_cast(this->base_rowid + i), inst[j].fvalue), + tid); + } + } + return transpose; + } + + void SortRows() { + auto ncol = static_cast(this->Size()); +#pragma omp parallel for schedule(dynamic, 1) + for (bst_omp_uint i = 0; i < ncol; ++i) { + if (this->offset.HostVector()[i] < this->offset.HostVector()[i + 1]) { + std::sort( + this->data.HostVector().begin() + this->offset.HostVector()[i], + this->data.HostVector().begin() + this->offset.HostVector()[i + 1], + Entry::CmpValue); + } + } + } + /*! * \brief Push row block into the page. * \param batch the row batch. @@ -251,6 +296,62 @@ class SparsePage { size_t Size() { return offset.Size() - 1; } }; +class BatchIteratorImpl { + public: + virtual ~BatchIteratorImpl() {} + virtual BatchIteratorImpl* Clone() = 0; + virtual const SparsePage& operator*() const = 0; + virtual void operator++() = 0; + virtual bool AtEnd() const = 0; +}; + +class BatchIterator { + public: + using iterator_category = std::forward_iterator_tag; + explicit BatchIterator(BatchIteratorImpl* impl) { impl_.reset(impl); } + + BatchIterator(const BatchIterator& other) { + if (other.impl_) { + impl_.reset(other.impl_->Clone()); + } else { + impl_.reset(); + } + } + + void operator++() { + CHECK(impl_ != nullptr); + ++(*impl_); + } + + const SparsePage& operator*() const { + CHECK(impl_ != nullptr); + return *(*impl_); + } + + bool operator!=(const BatchIterator& rhs) const { + CHECK(impl_ != nullptr); + return !impl_->AtEnd(); + } + + bool AtEnd() const { + CHECK(impl_ != nullptr); + return impl_->AtEnd(); + } + + private: + std::unique_ptr impl_; +}; + +class BatchSet { + public: + explicit BatchSet(BatchIterator begin_iter) : begin_iter_(begin_iter) {} + BatchIterator begin() { return begin_iter_; } + BatchIterator end() { return BatchIterator(nullptr); } + + private: + BatchIterator begin_iter_; +}; + /*! * \brief This is data structure that user can pass to DMatrix::Create * to create a DMatrix for training, user can create this data structure @@ -320,32 +421,17 @@ class DMatrix { virtual MetaInfo& Info() = 0; /*! \brief meta information of the dataset */ virtual const MetaInfo& Info() const = 0; - /*! - * \brief get the row iterator, reset to beginning position - * \note Only either RowIterator or column Iterator can be active. + /** + * \brief Gets row batches. Use range based for loop over BatchSet to access individual batches. */ - virtual dmlc::DataIter* RowIterator() = 0; - /*!\brief get column iterator, reset to the beginning position */ - virtual dmlc::DataIter* ColIterator() = 0; - /*! - * \brief check if column access is supported, if not, initialize column access. - * \param max_row_perbatch auxiliary information, maximum row used in each column batch. - * this is a hint information that can be ignored by the implementation. - * \param sorted If column features should be in sorted order - * \return Number of column blocks in the column access. - */ - virtual void InitColAccess(size_t max_row_perbatch, bool sorted) = 0; + virtual BatchSet GetRowBatches() = 0; + virtual BatchSet GetSortedColumnBatches() = 0; + virtual BatchSet GetColumnBatches() = 0; // the following are column meta data, should be able to answer them fast. - /*! \return whether column access is enabled */ - virtual bool HaveColAccess(bool sorted) const = 0; /*! \return Whether the data columns single column block. */ virtual bool SingleColBlock() const = 0; - /*! \brief get number of non-missing entries in column */ - virtual size_t GetColSize(size_t cidx) const = 0; /*! \brief get column density */ - virtual float GetColDensity(size_t cidx) const = 0; - /*! \return reference of buffered rowset, in column access */ - virtual const RowSet& BufferedRowset() const = 0; + virtual float GetColDensity(size_t cidx) = 0; /*! \brief virtual destructor */ virtual ~DMatrix() = default; /*! @@ -392,12 +478,6 @@ class DMatrix { */ static DMatrix* Create(dmlc::Parser* parser, const std::string& cache_prefix = ""); - - private: - // allow learner class to access this field. - friend class LearnerImpl; - /*! \brief public field to back ref cached matrix. */ - LearnerImpl* cache_learner_ptr_{nullptr}; }; // implementation of inline functions diff --git a/src/common/hist_util.cc b/src/common/hist_util.cc index 49081e59e..713f1f844 100644 --- a/src/common/hist_util.cc +++ b/src/common/hist_util.cc @@ -32,11 +32,8 @@ void HistCutMatrix::Init(DMatrix* p_fmat, uint32_t max_num_bins) { s.Init(info.num_row_, 1.0 / (max_num_bins * kFactor)); } - auto iter = p_fmat->RowIterator(); - iter->BeforeFirst(); const auto& weights = info.weights_.HostVector(); - while (iter->Next()) { - auto &batch = iter->Value(); + for (const auto &batch : p_fmat->GetRowBatches()) { #pragma omp parallel num_threads(nthread) { CHECK_EQ(nthread, omp_get_num_threads()); @@ -128,17 +125,14 @@ uint32_t HistCutMatrix::GetBinIdx(const Entry& e) { void GHistIndexMatrix::Init(DMatrix* p_fmat, int max_num_bins) { cut.Init(p_fmat, max_num_bins); - auto iter = p_fmat->RowIterator(); const int nthread = omp_get_max_threads(); const uint32_t nbins = cut.row_ptr.back(); hit_count.resize(nbins, 0); hit_count_tloc_.resize(nthread * nbins, 0); - iter->BeforeFirst(); row_ptr.push_back(0); - while (iter->Next()) { - auto &batch = iter->Value(); + for (const auto &batch : p_fmat->GetRowBatches()) { const size_t rbegin = row_ptr.size() - 1; for (size_t i = 0; i < batch.Size(); ++i) { row_ptr.push_back(batch[i].size() + row_ptr.back()); diff --git a/src/data/data.cc b/src/data/data.cc index 4b24d5da8..24b4f1c1c 100644 --- a/src/data/data.cc +++ b/src/data/data.cc @@ -255,10 +255,11 @@ DMatrix* DMatrix::Create(dmlc::Parser* parser, return DMatrix::Create(std::move(source), cache_prefix); } else { #if DMLC_ENABLE_STD_THREAD - if (!data::SparsePageSource::CacheExist(cache_prefix)) { - data::SparsePageSource::Create(parser, cache_prefix); + if (!data::SparsePageSource::CacheExist(cache_prefix, ".row.page")) { + data::SparsePageSource::CreateRowPage(parser, cache_prefix); } - std::unique_ptr source(new data::SparsePageSource(cache_prefix)); + std::unique_ptr source( + new data::SparsePageSource(cache_prefix, ".row.page")); return DMatrix::Create(std::move(source), cache_prefix); #else LOG(FATAL) << "External memory is not enabled in mingw"; diff --git a/src/data/simple_csr_source.cc b/src/data/simple_csr_source.cc index c13d96d67..a20f538cc 100644 --- a/src/data/simple_csr_source.cc +++ b/src/data/simple_csr_source.cc @@ -18,10 +18,7 @@ void SimpleCSRSource::Clear() { void SimpleCSRSource::CopyFrom(DMatrix* src) { this->Clear(); this->info = src->Info(); - auto iter = src->RowIterator(); - iter->BeforeFirst(); - while (iter->Next()) { - const auto &batch = iter->Value(); + for (const auto &batch : src->GetRowBatches()) { page_.Push(batch); } } diff --git a/src/data/simple_dmatrix.cc b/src/data/simple_dmatrix.cc index 98a293967..c956932a9 100644 --- a/src/data/simple_dmatrix.cc +++ b/src/data/simple_dmatrix.cc @@ -4,103 +4,79 @@ * \brief the input data structure for gradient boosting * \author Tianqi Chen */ -#include -#include -#include -#include #include "./simple_dmatrix.h" +#include #include "../common/random.h" -#include "../common/group_data.h" namespace xgboost { namespace data { +MetaInfo& SimpleDMatrix::Info() { return source_->info; } -bool SimpleDMatrix::ColBatchIter::Next() { - if (data_ >= 1) return false; - data_ += 1; - return true; -} +const MetaInfo& SimpleDMatrix::Info() const { return source_->info; } - dmlc::DataIter* SimpleDMatrix::ColIterator() { - col_iter_.BeforeFirst(); - return &col_iter_; -} - -void SimpleDMatrix::InitColAccess( - size_t max_row_perbatch, bool sorted) { - if (this->HaveColAccess(sorted)) return; - col_iter_.sorted_ = sorted; - col_iter_.column_page_.reset(new SparsePage()); - this->MakeOneBatch(col_iter_.column_page_.get(), sorted); -} - -// internal function to make one batch from row iter. -void SimpleDMatrix::MakeOneBatch(SparsePage* pcol, bool sorted) { - // clear rowset - buffered_rowset_.Clear(); - // bit map - const int nthread = omp_get_max_threads(); - pcol->Clear(); - auto& pcol_offset_vec = pcol->offset.HostVector(); - auto& pcol_data_vec = pcol->data.HostVector(); - common::ParallelGroupBuilder - builder(&pcol_offset_vec, &pcol_data_vec); - builder.InitBudget(Info().num_col_, nthread); - // start working - auto iter = this->RowIterator(); - iter->BeforeFirst(); - while (iter->Next()) { - const auto& batch = iter->Value(); - long batch_size = static_cast(batch.Size()); // NOLINT(*) - for (long i = 0; i < batch_size; ++i) { // NOLINT(*) - auto ridx = static_cast(batch.base_rowid + i); - buffered_rowset_.PushBack(ridx); - } - #pragma omp parallel for schedule(static) - for (long i = 0; i < batch_size; ++i) { // NOLINT(*) - int tid = omp_get_thread_num(); - auto inst = batch[i]; - for (auto& ins : inst) { - builder.AddBudget(ins.index, tid); - } - } - } - builder.InitStorage(); - - iter->BeforeFirst(); - while (iter->Next()) { - auto &batch = iter->Value(); - #pragma omp parallel for schedule(static) - for (long i = 0; i < static_cast(batch.Size()); ++i) { // NOLINT(*) - int tid = omp_get_thread_num(); - auto inst = batch[i]; - for (auto& ins : inst) { - builder.Push(ins.index, - Entry(static_cast(batch.base_rowid + i), - ins.fvalue), - tid); - } - } +float SimpleDMatrix::GetColDensity(size_t cidx) { + size_t column_size = 0; + // Use whatever version of column batches already exists + if (sorted_column_page_) { + auto batch = this->GetSortedColumnBatches(); + column_size = (*batch.begin())[cidx].size(); + } else { + auto batch = this->GetColumnBatches(); + column_size = (*batch.begin())[cidx].size(); } - CHECK_EQ(pcol->Size(), Info().num_col_); + size_t nmiss = this->Info().num_row_ - column_size; + return 1.0f - (static_cast(nmiss)) / this->Info().num_row_; +} - if (sorted) { - // sort columns - auto ncol = static_cast(pcol->Size()); -#pragma omp parallel for schedule(dynamic, 1) num_threads(nthread) - for (bst_omp_uint i = 0; i < ncol; ++i) { - if (pcol_offset_vec[i] < pcol_offset_vec[i + 1]) { - std::sort(dmlc::BeginPtr(pcol_data_vec) + pcol_offset_vec[i], - dmlc::BeginPtr(pcol_data_vec) + pcol_offset_vec[i + 1], - Entry::CmpValue); - } - } +class SimpleBatchIteratorImpl : public BatchIteratorImpl { + public: + explicit SimpleBatchIteratorImpl(SparsePage* page) : page_(page) {} + const SparsePage& operator*() const override { + CHECK(page_ != nullptr); + return *page_; } + void operator++() override { page_ = nullptr; } + bool AtEnd() const override { return page_ == nullptr; } + SimpleBatchIteratorImpl* Clone() override { + return new SimpleBatchIteratorImpl(*this); + } + + private: + SparsePage* page_{nullptr}; +}; + +BatchSet SimpleDMatrix::GetRowBatches() { + auto cast = dynamic_cast(source_.get()); + auto begin_iter = BatchIterator(new SimpleBatchIteratorImpl(&(cast->page_))); + return BatchSet(begin_iter); } -bool SimpleDMatrix::SingleColBlock() const { - return true; +BatchSet SimpleDMatrix::GetColumnBatches() { + // column page doesn't exist, generate it + if (!column_page_) { + auto page = dynamic_cast(source_.get())->page_; + column_page_.reset( + new SparsePage(page.GetTranspose(source_->info.num_col_))); + } + auto begin_iter = + BatchIterator(new SimpleBatchIteratorImpl(column_page_.get())); + return BatchSet(begin_iter); } + +BatchSet SimpleDMatrix::GetSortedColumnBatches() { + // Sorted column page doesn't exist, generate it + if (!sorted_column_page_) { + auto page = dynamic_cast(source_.get())->page_; + sorted_column_page_.reset( + new SparsePage(page.GetTranspose(source_->info.num_col_))); + sorted_column_page_->SortRows(); + } + auto begin_iter = + BatchIterator(new SimpleBatchIteratorImpl(sorted_column_page_.get())); + return BatchSet(begin_iter); +} + +bool SimpleDMatrix::SingleColBlock() const { return true; } } // namespace data } // namespace xgboost diff --git a/src/data/simple_dmatrix.h b/src/data/simple_dmatrix.h index c1d1babdd..308baed6a 100644 --- a/src/data/simple_dmatrix.h +++ b/src/data/simple_dmatrix.h @@ -9,9 +9,10 @@ #include #include -#include #include #include +#include +#include "simple_csr_source.h" namespace xgboost { namespace data { @@ -21,79 +22,26 @@ class SimpleDMatrix : public DMatrix { explicit SimpleDMatrix(std::unique_ptr&& source) : source_(std::move(source)) {} - MetaInfo& Info() override { - return source_->info; - } + MetaInfo& Info() override; - const MetaInfo& Info() const override { - return source_->info; - } + const MetaInfo& Info() const override; - dmlc::DataIter* RowIterator() override { - auto iter = source_.get(); - iter->BeforeFirst(); - return iter; - } - - bool HaveColAccess(bool sorted) const override { - return col_iter_.sorted_ == sorted && col_iter_.column_page_!= nullptr; - } - - const RowSet& BufferedRowset() const override { - return buffered_rowset_; - } - - size_t GetColSize(size_t cidx) const override { - auto& batch = *col_iter_.column_page_; - return batch[cidx].size(); - } - - float GetColDensity(size_t cidx) const override { - size_t nmiss = buffered_rowset_.Size() - GetColSize(cidx); - return 1.0f - (static_cast(nmiss)) / buffered_rowset_.Size(); - } - - dmlc::DataIter* ColIterator() override; - - void InitColAccess( - size_t max_row_perbatch, bool sorted) override; + float GetColDensity(size_t cidx) override; bool SingleColBlock() const override; + BatchSet GetRowBatches() override; + + BatchSet GetColumnBatches() override; + + BatchSet GetSortedColumnBatches() override; + private: - // in-memory column batch iterator. - struct ColBatchIter: dmlc::DataIter { - public: - ColBatchIter() = default; - void BeforeFirst() override { - data_ = 0; - } - const SparsePage &Value() const override { - return *column_page_; - } - bool Next() override; - - private: - // allow SimpleDMatrix to access it. - friend class SimpleDMatrix; - // column sparse page - std::unique_ptr column_page_; - // data pointer - size_t data_{0}; - // Is column sorted? - bool sorted_{false}; - }; - // source data pointer. std::unique_ptr source_; - // column iterator - ColBatchIter col_iter_; - // list of row index that are buffered. - RowSet buffered_rowset_; - // internal function to make one batch from row iter. - void MakeOneBatch( - SparsePage *pcol, bool sorted); + std::unique_ptr sorted_column_page_; + std::unique_ptr column_page_; }; } // namespace data } // namespace xgboost diff --git a/src/data/sparse_page_dmatrix.cc b/src/data/sparse_page_dmatrix.cc index 06c195e14..3a97b08f4 100644 --- a/src/data/sparse_page_dmatrix.cc +++ b/src/data/sparse_page_dmatrix.cc @@ -12,261 +12,92 @@ #if DMLC_ENABLE_STD_THREAD #include "./sparse_page_dmatrix.h" #include "../common/random.h" -#include "../common/common.h" -#include "../common/group_data.h" namespace xgboost { namespace data { -SparsePageDMatrix::ColPageIter::ColPageIter( - std::vector >&& files) - : page_(nullptr), clock_ptr_(0), files_(std::move(files)) { - load_all_ = false; - formats_.resize(files_.size()); - prefetchers_.resize(files_.size()); - - for (size_t i = 0; i < files_.size(); ++i) { - dmlc::SeekStream* fi = files_[i].get(); - std::string format; - CHECK(fi->Read(&format)) << "Invalid page format"; - formats_[i].reset(SparsePageFormat::Create(format)); - SparsePageFormat* fmt = formats_[i].get(); - size_t fbegin = fi->Tell(); - prefetchers_[i].reset(new dmlc::ThreadedIter(4)); - prefetchers_[i]->Init([this, fi, fmt] (SparsePage** dptr) { - if (*dptr == nullptr) { - *dptr = new SparsePage(); - } - if (load_all_) { - return fmt->Read(*dptr, fi); - } else { - return fmt->Read(*dptr, fi, index_set_); - } - }, [this, fi, fbegin] () { - fi->Seek(fbegin); - index_set_ = set_index_set_; - load_all_ = set_load_all_; - }); - } +MetaInfo& SparsePageDMatrix::Info() { + return row_source_->info; } -SparsePageDMatrix::ColPageIter::~ColPageIter() { - delete page_; +const MetaInfo& SparsePageDMatrix::Info() const { + return row_source_->info; } -bool SparsePageDMatrix::ColPageIter::Next() { - // doing clock rotation over shards. - if (page_ != nullptr) { - size_t n = prefetchers_.size(); - prefetchers_[(clock_ptr_ + n - 1) % n]->Recycle(&page_); +class SparseBatchIteratorImpl : public BatchIteratorImpl { + public: + explicit SparseBatchIteratorImpl(SparsePageSource* source) : source_(source) { + CHECK(source_ != nullptr); } - if (prefetchers_[clock_ptr_]->Next(&page_)) { - // advance clock - clock_ptr_ = (clock_ptr_ + 1) % prefetchers_.size(); - return true; - } else { - return false; + const SparsePage& operator*() const override { return source_->Value(); } + void operator++() override { at_end_ = !source_->Next(); } + bool AtEnd() const override { return at_end_; } + SparseBatchIteratorImpl* Clone() override { + return new SparseBatchIteratorImpl(*this); } + + private: + SparsePageSource* source_{nullptr}; + bool at_end_{ false }; +}; + +BatchSet SparsePageDMatrix::GetRowBatches() { + auto cast = dynamic_cast(row_source_.get()); + cast->BeforeFirst(); + cast->Next(); + auto begin_iter = BatchIterator(new SparseBatchIteratorImpl(cast)); + return BatchSet(begin_iter); } -void SparsePageDMatrix::ColPageIter::BeforeFirst() { - clock_ptr_ = 0; - for (auto& p : prefetchers_) { - p->BeforeFirst(); +BatchSet SparsePageDMatrix::GetSortedColumnBatches() { + // Lazily instantiate + if (!sorted_column_source_) { + SparsePageSource::CreateColumnPage(this, cache_info_, true); + sorted_column_source_.reset( + new SparsePageSource(cache_info_, ".sorted.col.page")); } + sorted_column_source_->BeforeFirst(); + sorted_column_source_->Next(); + auto begin_iter = + BatchIterator(new SparseBatchIteratorImpl(sorted_column_source_.get())); + return BatchSet(begin_iter); } -void SparsePageDMatrix::ColPageIter::Init( - const std::vector& index_set) { - set_index_set_ = index_set; - set_load_all_ = true; - std::sort(set_index_set_.begin(), set_index_set_.end()); - this->BeforeFirst(); +BatchSet SparsePageDMatrix::GetColumnBatches() { + // Lazily instantiate + if (!column_source_) { + SparsePageSource::CreateColumnPage(this, cache_info_, false); + column_source_.reset(new SparsePageSource(cache_info_, ".col.page")); + } + column_source_->BeforeFirst(); + column_source_->Next(); + auto begin_iter = + BatchIterator(new SparseBatchIteratorImpl(column_source_.get())); + return BatchSet(begin_iter); } - dmlc::DataIter* SparsePageDMatrix::ColIterator() { - CHECK(col_iter_ != nullptr); - std::vector col_index; - std::iota(col_index.begin(), col_index.end(), bst_uint(0)); - col_iter_->Init(col_index); - return col_iter_.get(); -} - -bool SparsePageDMatrix::TryInitColData(bool sorted) { - // load meta data. - std::vector cache_shards = common::Split(cache_info_, ':'); - { - std::string col_meta_name = cache_shards[0] + ".col.meta"; - std::unique_ptr fmeta( - dmlc::Stream::Create(col_meta_name.c_str(), "r", true)); - if (fmeta == nullptr) return false; - CHECK(fmeta->Read(&buffered_rowset_)) << "invalid col.meta file"; - CHECK(fmeta->Read(&col_size_)) << "invalid col.meta file"; - } - // load real data - std::vector > files; - for (const std::string& prefix : cache_shards) { - std::string col_data_name = prefix + ".col.page"; - std::unique_ptr fdata( - dmlc::SeekStream::CreateForRead(col_data_name.c_str(), true)); - if (fdata == nullptr) return false; - files.push_back(std::move(fdata)); - } - col_iter_.reset(new ColPageIter(std::move(files))); - // warning: no attempt to check here whether the cached data was sorted - col_iter_->sorted = sorted; - return true; -} - -void SparsePageDMatrix::InitColAccess( - size_t max_row_perbatch, bool sorted) { - if (HaveColAccess(sorted)) return; - if (TryInitColData(sorted)) return; - const MetaInfo& info = this->Info(); - if (max_row_perbatch == std::numeric_limits::max()) { - max_row_perbatch = kMaxRowPerBatch; - } - buffered_rowset_.Clear(); - col_size_.resize(info.num_col_); - std::fill(col_size_.begin(), col_size_.end(), 0); - auto iter = this->RowIterator(); - size_t batch_ptr = 0, batch_top = 0; - SparsePage tmp; - - // function to create the page. - auto make_col_batch = [&] ( - const SparsePage& prow, - size_t begin, - SparsePage *pcol) { - pcol->Clear(); - pcol->base_rowid = buffered_rowset_[begin]; - const int nthread = std::max(omp_get_max_threads(), std::max(omp_get_num_procs() / 2 - 1, 1)); - auto& offset_vec = pcol->offset.HostVector(); - auto& data_vec = pcol->data.HostVector(); - common::ParallelGroupBuilder - builder(&offset_vec, &data_vec); - builder.InitBudget(info.num_col_, nthread); - bst_omp_uint ndata = static_cast(prow.Size()); - const auto& prow_offset_vec = prow.offset.HostVector(); - const auto& prow_data_vec = prow.data.HostVector(); - #pragma omp parallel for schedule(static) num_threads(nthread) - for (bst_omp_uint i = 0; i < ndata; ++i) { - int tid = omp_get_thread_num(); - for (size_t j = prow_offset_vec[i]; j < prow_offset_vec[i+1]; ++j) { - const auto e = prow_data_vec[j]; - builder.AddBudget(e.index, tid); +float SparsePageDMatrix::GetColDensity(size_t cidx) { + // Finds densities if we don't already have them + if (col_density_.empty()) { + std::vector column_size(this->Info().num_col_); + for (const auto &batch : this->GetColumnBatches()) { + for (int i = 0; i < batch.Size(); i++) { + column_size[i] += batch[i].size(); } } - builder.InitStorage(); - #pragma omp parallel for schedule(static) num_threads(nthread) - for (bst_omp_uint i = 0; i < ndata; ++i) { - int tid = omp_get_thread_num(); - for (size_t j = prow_offset_vec[i]; j < prow_offset_vec[i+1]; ++j) { - const Entry &e = prow_data_vec[j]; - builder.Push(e.index, - Entry(buffered_rowset_[i + begin], e.fvalue), - tid); - } + col_density_.resize(column_size.size()); + for (int i = 0; i < col_density_.size(); i++) { + size_t nmiss = this->Info().num_row_ - column_size[i]; + col_density_[i] = + 1.0f - (static_cast(nmiss)) / this->Info().num_row_; } - CHECK_EQ(pcol->Size(), info.num_col_); - // sort columns - if (sorted) { - auto ncol = static_cast(pcol->Size()); -#pragma omp parallel for schedule(dynamic, 1) num_threads(nthread) - for (bst_omp_uint i = 0; i < ncol; ++i) { - if (offset_vec[i] < offset_vec[i + 1]) { - std::sort(dmlc::BeginPtr(data_vec) + offset_vec[i], - dmlc::BeginPtr(data_vec) + offset_vec[i + 1], - Entry::CmpValue); - } - } - } - }; - - auto make_next_col = [&] (SparsePage* dptr) { - tmp.Clear(); - size_t btop = buffered_rowset_.Size(); - - while (true) { - if (batch_ptr != batch_top) { - auto &batch = iter->Value(); - CHECK_EQ(batch_top, batch.Size()); - for (size_t i = batch_ptr; i < batch_top; ++i) { - auto ridx = static_cast(batch.base_rowid + i); - buffered_rowset_.PushBack(ridx); - tmp.Push(batch[i]); - - if (tmp.Size() >= max_row_perbatch || - tmp.MemCostBytes() >= kPageSize) { - make_col_batch(tmp, btop, dptr); - batch_ptr = i + 1; - return true; - } - } - batch_ptr = batch_top; - } - if (!iter->Next()) break; - batch_ptr = 0; - batch_top = iter->Value().Size(); - } - - if (tmp.Size() != 0) { - make_col_batch(tmp, btop, dptr); - return true; - } else { - return false; - } - }; - - std::vector cache_shards = common::Split(cache_info_, ':'); - std::vector name_shards, format_shards; - for (const std::string& prefix : cache_shards) { - name_shards.push_back(prefix + ".col.page"); - format_shards.push_back(SparsePageFormat::DecideFormat(prefix).second); } - - { - SparsePageWriter writer(name_shards, format_shards, 6); - std::shared_ptr page; - writer.Alloc(&page); page->Clear(); - - double tstart = dmlc::GetTime(); - size_t bytes_write = 0; - // print every 4 sec. - constexpr double kStep = 4.0; - size_t tick_expected = kStep; - - while (make_next_col(page.get())) { - const auto& page_offset_vec = page->offset.ConstHostVector(); - for (size_t i = 0; i < page->Size(); ++i) { - col_size_[i] += page_offset_vec[i + 1] - page_offset_vec[i]; - } - - bytes_write += page->MemCostBytes(); - writer.PushWrite(std::move(page)); - writer.Alloc(&page); - page->Clear(); - - double tdiff = dmlc::GetTime() - tstart; - if (tdiff >= tick_expected) { - LOG(CONSOLE) << "Writing col.page file to " << cache_info_ - << " in " << ((bytes_write >> 20UL) / tdiff) << " MB/s, " - << (bytes_write >> 20UL) << " MB writen"; - tick_expected += kStep; - } - } - // save meta data - std::string col_meta_name = cache_shards[0] + ".col.meta"; - std::unique_ptr fo( - dmlc::Stream::Create(col_meta_name.c_str(), "w")); - fo->Write(buffered_rowset_); - fo->Write(col_size_); - fo.reset(nullptr); - } - // initialize column data - CHECK(TryInitColData(sorted)); + return col_density_.at(cidx); } +bool SparsePageDMatrix::SingleColBlock() const { + return false; +} } // namespace data } // namespace xgboost #endif diff --git a/src/data/sparse_page_dmatrix.h b/src/data/sparse_page_dmatrix.h index 9e1e97895..706b15fc3 100644 --- a/src/data/sparse_page_dmatrix.h +++ b/src/data/sparse_page_dmatrix.h @@ -7,15 +7,12 @@ #ifndef XGBOOST_DATA_SPARSE_PAGE_DMATRIX_H_ #define XGBOOST_DATA_SPARSE_PAGE_DMATRIX_H_ -#include #include -#include -#include -#include #include #include -#include "../common/common.h" -#include "./sparse_page_writer.h" +#include +#include +#include "sparse_page_source.h" namespace xgboost { namespace data { @@ -23,104 +20,35 @@ namespace data { class SparsePageDMatrix : public DMatrix { public: explicit SparsePageDMatrix(std::unique_ptr&& source, - std::string cache_info) - : source_(std::move(source)), cache_info_(std::move(cache_info)) { - } + std::string cache_info) + : row_source_(std::move(source)), cache_info_(std::move(cache_info)) {} - MetaInfo& Info() override { - return source_->info; - } + MetaInfo& Info() override; - const MetaInfo& Info() const override { - return source_->info; - } + const MetaInfo& Info() const override; - dmlc::DataIter* RowIterator() override { - auto iter = source_.get(); - iter->BeforeFirst(); - return iter; - } + BatchSet GetRowBatches() override; - bool HaveColAccess(bool sorted) const override { - return col_iter_ != nullptr && col_iter_->sorted == sorted; - } + BatchSet GetSortedColumnBatches() override; - const RowSet& BufferedRowset() const override { - return buffered_rowset_; - } + BatchSet GetColumnBatches() override; - size_t GetColSize(size_t cidx) const override { - return col_size_[cidx]; - } + float GetColDensity(size_t cidx) override; - float GetColDensity(size_t cidx) const override { - size_t nmiss = buffered_rowset_.Size() - col_size_[cidx]; - return 1.0f - (static_cast(nmiss)) / buffered_rowset_.Size(); - } - - bool SingleColBlock() const override { - return false; - } - - dmlc::DataIter* ColIterator() override; - - void InitColAccess( - size_t max_row_perbatch, bool sorted) override; - - /*! \brief page size 256 MB */ - static const size_t kPageSize = 256UL << 20UL; - /*! \brief Maximum number of rows per batch. */ - static const size_t kMaxRowPerBatch = 64UL << 10UL; + bool SingleColBlock() const override; private: - // declare the column batch iter. - class ColPageIter : public dmlc::DataIter { - public: - explicit ColPageIter(std::vector >&& files); - ~ColPageIter() override; - void BeforeFirst() override; - const SparsePage &Value() const override { - return *page_; - } - bool Next() override; - // initialize the column iterator with the specified index set. - void Init(const std::vector& index_set); - // If the column features are sorted - bool sorted; + /*! \brief page size 256 MB */ + static const size_t kPageSize = 256UL << 20UL; - private: - // the temp page. - SparsePage* page_; - // internal clock ptr. - size_t clock_ptr_; - // data file pointer. - std::vector > files_; - // page format. - std::vector > formats_; - /*! \brief internal prefetcher. */ - std::vector > > prefetchers_; - // The index set to be loaded. - std::vector index_set_; - // The index set by the outsiders - std::vector set_index_set_; - // whether to load data dataset. - bool set_load_all_, load_all_; - }; - /*! - * \brief Try to initialize column data. - * \return true if data already exists, false if they do not. - */ - bool TryInitColData(bool sorted); - // source data pointer. - std::unique_ptr source_; + // source data pointers. + std::unique_ptr row_source_; + std::unique_ptr column_source_; + std::unique_ptr sorted_column_source_; // the cache prefix std::string cache_info_; - /*! \brief list of row index that are buffered */ - RowSet buffered_rowset_; - // count for column data - std::vector col_size_; - // internal column iter. - std::unique_ptr col_iter_; + // Store column densities to avoid recalculating + std::vector col_density_; }; } // namespace data } // namespace xgboost diff --git a/src/data/sparse_page_source.cc b/src/data/sparse_page_source.cc index 7d47dedd9..517978446 100644 --- a/src/data/sparse_page_source.cc +++ b/src/data/sparse_page_source.cc @@ -14,7 +14,8 @@ namespace xgboost { namespace data { -SparsePageSource::SparsePageSource(const std::string& cache_info) +SparsePageSource::SparsePageSource(const std::string& cache_info, + const std::string& page_type) : base_rowid_(0), page_(nullptr), clock_ptr_(0) { // read in the info files std::vector cache_shards = common::Split(cache_info, ':'); @@ -32,7 +33,7 @@ SparsePageSource::SparsePageSource(const std::string& cache_info) // read in the cache files. for (size_t i = 0; i < cache_shards.size(); ++i) { - std::string name_row = cache_shards[i] + ".row.page"; + std::string name_row = cache_shards[i] + page_type; files_[i].reset(dmlc::SeekStream::CreateForRead(name_row.c_str())); dmlc::SeekStream* fi = files_[i].get(); std::string format; @@ -83,7 +84,8 @@ const SparsePage& SparsePageSource::Value() const { return *page_; } -bool SparsePageSource::CacheExist(const std::string& cache_info) { +bool SparsePageSource::CacheExist(const std::string& cache_info, + const std::string& page_type) { std::vector cache_shards = common::Split(cache_info, ':'); CHECK_NE(cache_shards.size(), 0U); { @@ -92,22 +94,23 @@ bool SparsePageSource::CacheExist(const std::string& cache_info) { if (finfo == nullptr) return false; } for (const std::string& prefix : cache_shards) { - std::string name_row = prefix + ".row.page"; + std::string name_row = prefix + page_type; std::unique_ptr frow(dmlc::Stream::Create(name_row.c_str(), "r", true)); if (frow == nullptr) return false; } return true; } -void SparsePageSource::Create(dmlc::Parser* src, +void SparsePageSource::CreateRowPage(dmlc::Parser* src, const std::string& cache_info) { + const std::string page_type = ".row.page"; std::vector cache_shards = common::Split(cache_info, ':'); CHECK_NE(cache_shards.size(), 0U); // read in the info files. std::string name_info = cache_shards[0]; std::vector name_shards, format_shards; for (const std::string& prefix : cache_shards) { - name_shards.push_back(prefix + ".row.page"); + name_shards.push_back(prefix + page_type); format_shards.push_back(SparsePageFormat::DecideFormat(prefix).first); } { @@ -164,8 +167,8 @@ void SparsePageSource::Create(dmlc::Parser* src, double tdiff = dmlc::GetTime() - tstart; if (tdiff >= tick_expected) { - LOG(CONSOLE) << "Writing row.page to " << cache_info << " in " - << ((bytes_write >> 20UL) / tdiff) << " MB/s, " + LOG(CONSOLE) << "Writing " << page_type << " to " << cache_info + << " in " << ((bytes_write >> 20UL) / tdiff) << " MB/s, " << (bytes_write >> 20UL) << " written"; tick_expected += static_cast(kStep); } @@ -192,29 +195,40 @@ void SparsePageSource::Create(dmlc::Parser* src, LOG(CONSOLE) << "SparsePageSource: Finished writing to " << name_info; } -void SparsePageSource::Create(DMatrix* src, - const std::string& cache_info) { +void SparsePageSource::CreatePageFromDMatrix(DMatrix* src, + const std::string& cache_info, + const std::string& page_type) { std::vector cache_shards = common::Split(cache_info, ':'); CHECK_NE(cache_shards.size(), 0U); // read in the info files. std::string name_info = cache_shards[0]; std::vector name_shards, format_shards; for (const std::string& prefix : cache_shards) { - name_shards.push_back(prefix + ".row.page"); + name_shards.push_back(prefix + page_type); format_shards.push_back(SparsePageFormat::DecideFormat(prefix).first); } { SparsePageWriter writer(name_shards, format_shards, 6); std::shared_ptr page; - writer.Alloc(&page); page->Clear(); + writer.Alloc(&page); + page->Clear(); MetaInfo info = src->Info(); size_t bytes_write = 0; double tstart = dmlc::GetTime(); - auto iter = src->RowIterator(); + for (auto& batch : src->GetRowBatches()) { + if (page_type == ".row.page") { + page->Push(batch); + } else if (page_type == ".col.page") { + page->Push(batch.GetTranspose(src->Info().num_col_)); + } else if (page_type == ".sorted.col.page") { + auto tmp = batch.GetTranspose(src->Info().num_col_); + tmp.SortRows(); + page->Push(tmp); + } else { + LOG(FATAL) << "Unknown page type: " << page_type; + } - while (iter->Next()) { - page->Push(iter->Value()); if (page->MemCostBytes() >= kPageSize) { bytes_write += page->MemCostBytes(); writer.PushWrite(std::move(page)); @@ -239,6 +253,18 @@ void SparsePageSource::Create(DMatrix* src, LOG(CONSOLE) << "SparsePageSource: Finished writing to " << name_info; } +void SparsePageSource::CreateRowPage(DMatrix* src, + const std::string& cache_info) { + const std::string page_type = ".row.page"; + CreatePageFromDMatrix(src, cache_info, page_type); +} + +void SparsePageSource::CreateColumnPage(DMatrix* src, + const std::string& cache_info, + bool sorted) { + const std::string page_type = sorted ? ".sorted.col.page" : ".col.page"; + CreatePageFromDMatrix(src, cache_info, page_type); +} } // namespace data } // namespace xgboost #endif diff --git a/src/data/sparse_page_source.h b/src/data/sparse_page_source.h index 9dcffb278..0e02b52da 100644 --- a/src/data/sparse_page_source.h +++ b/src/data/sparse_page_source.h @@ -31,7 +31,8 @@ class SparsePageSource : public DataSource { * \brief Create source from cache files the cache_prefix. * \param cache_prefix The prefix of cache we want to solve. */ - explicit SparsePageSource(const std::string& cache_prefix) noexcept(false); + explicit SparsePageSource(const std::string& cache_prefix, + const std::string& page_type) noexcept(false); /*! \brief destructor */ ~SparsePageSource() override; // implement Next @@ -45,26 +46,38 @@ class SparsePageSource : public DataSource { * \param src source parser. * \param cache_info The cache_info of cache file location. */ - static void Create(dmlc::Parser* src, + static void CreateRowPage(dmlc::Parser* src, const std::string& cache_info); /*! * \brief Create source cache by copy content from DMatrix. * \param cache_info The cache_info of cache file location. */ - static void Create(DMatrix* src, + static void CreateRowPage(DMatrix* src, const std::string& cache_info); + + /*! + * \brief Create source cache by copy content from DMatrix. Creates transposed column page, may be sorted or not. + * \param cache_info The cache_info of cache file location. + * \param sorted Whether columns should be pre-sorted + */ + static void CreateColumnPage(DMatrix* src, + const std::string& cache_info, bool sorted); /*! * \brief Check if the cache file already exists. * \param cache_info The cache prefix of files. + * \param page_type Type of the page. * \return Whether cache file already exists. */ - static bool CacheExist(const std::string& cache_info); + static bool CacheExist(const std::string& cache_info, + const std::string& page_type); /*! \brief page size 32 MB */ static const size_t kPageSize = 32UL << 20UL; /*! \brief magic number used to identify Page */ static const int kMagic = 0xffffab02; private: + static void CreatePageFromDMatrix(DMatrix* src, const std::string& cache_info, + const std::string& page_type); /*! \brief number of rows */ size_t base_rowid_; /*! \brief page currently on hold. */ diff --git a/src/gbm/gblinear.cc b/src/gbm/gblinear.cc index 471134575..eaa8a1d57 100644 --- a/src/gbm/gblinear.cc +++ b/src/gbm/gblinear.cc @@ -83,13 +83,6 @@ class GBLinear : public GradientBooster { ObjFunction* obj) override { monitor_.Start("DoBoost"); - if (!p_fmat->HaveColAccess(false)) { - monitor_.Start("InitColAccess"); - std::vector enabled(p_fmat->Info().num_col_, true); - p_fmat->InitColAccess(param_.max_row_perbatch, false); - monitor_.Stop("InitColAccess"); - } - model_.LazyInitModel(); this->LazySumWeights(p_fmat); @@ -152,10 +145,7 @@ class GBLinear : public GradientBooster { // make sure contributions is zeroed, we could be reusing a previously allocated one std::fill(contribs.begin(), contribs.end(), 0); // start collecting the contributions - auto iter = p_fmat->RowIterator(); - iter->BeforeFirst(); - while (iter->Next()) { - auto &batch = iter->Value(); + for (const auto &batch : p_fmat->GetRowBatches()) { // parallel over local batch const auto nsize = static_cast(batch.Size()); #pragma omp parallel for schedule(static) @@ -203,11 +193,9 @@ class GBLinear : public GradientBooster { std::vector &preds = *out_preds; const auto& base_margin = p_fmat->Info().base_margin_.ConstHostVector(); // start collecting the prediction - auto iter = p_fmat->RowIterator(); const int ngroup = model_.param.num_output_group; preds.resize(p_fmat->Info().num_row_ * ngroup); - while (iter->Next()) { - auto &batch = iter->Value(); + for (const auto &batch : p_fmat->GetRowBatches()) { // output convention: nrow * k, where nrow is number of rows // k is number of group // parallel over local batch diff --git a/src/gbm/gbtree.cc b/src/gbm/gbtree.cc index 319b5188f..e33a40b7f 100644 --- a/src/gbm/gbtree.cc +++ b/src/gbm/gbtree.cc @@ -438,12 +438,8 @@ class Dart : public GBTree { << "size_leaf_vector is enforced to 0 so far"; CHECK_EQ(preds.size(), p_fmat->Info().num_row_ * num_group); // start collecting the prediction - auto iter = p_fmat->RowIterator(); auto* self = static_cast(this); - iter->BeforeFirst(); - while (iter->Next()) { - auto &batch = iter->Value(); - // parallel over local batch + for (const auto &batch : p_fmat->GetRowBatches()) { constexpr int kUnroll = 8; const auto nsize = static_cast(batch.Size()); const bst_omp_uint rest = nsize % kUnroll; diff --git a/src/learner.cc b/src/learner.cc index dfbc1ede2..d73c8b18e 100644 --- a/src/learner.cc +++ b/src/learner.cc @@ -85,8 +85,6 @@ struct LearnerTrainParam : public dmlc::Parameter { int tree_method; // internal test flag std::string test_flag; - // maximum row per batch. - size_t max_row_perbatch; // number of threads to use if OpenMP is enabled // if equals 0, use system default int nthread; @@ -121,9 +119,6 @@ struct LearnerTrainParam : public dmlc::Parameter { .describe("Choice of tree construction method."); DMLC_DECLARE_FIELD(test_flag).set_default("").describe( "Internal test flag"); - DMLC_DECLARE_FIELD(max_row_perbatch) - .set_default(std::numeric_limits::max()) - .describe("maximum row per batch."); DMLC_DECLARE_FIELD(nthread).set_default(0).describe( "Number of threads to use."); DMLC_DECLARE_FIELD(debug_verbose) @@ -492,36 +487,6 @@ class LearnerImpl : public Learner { return; } - monitor_.Start("LazyInitDMatrix"); - if (!p_train->HaveColAccess(true)) { - auto ncol = static_cast(p_train->Info().num_col_); - std::vector enabled(ncol, true); - // set max row per batch to limited value - // in distributed mode, use safe choice otherwise - size_t max_row_perbatch = tparam_.max_row_perbatch; - const auto safe_max_row = static_cast(32ul << 10ul); - - if (tparam_.tree_method == 0 && p_train->Info().num_row_ >= (4UL << 20UL)) { - LOG(CONSOLE) - << "Tree method is automatically selected to be \'approx\'" - << " for faster speed." - << " to use old behavior(exact greedy algorithm on single machine)," - << " set tree_method to \'exact\'"; - max_row_perbatch = std::min(max_row_perbatch, safe_max_row); - } - - if (tparam_.tree_method == 1) { - LOG(CONSOLE) << "Tree method is selected to be \'approx\'"; - max_row_perbatch = std::min(max_row_perbatch, safe_max_row); - } - - if (tparam_.test_flag == "block" || tparam_.dsplit == 2) { - max_row_perbatch = std::min(max_row_perbatch, safe_max_row); - } - // initialize column access - p_train->InitColAccess(max_row_perbatch, true); - } - if (!p_train->SingleColBlock() && cfg_.count("updater") == 0) { if (tparam_.tree_method == 2) { LOG(CONSOLE) << "tree method is set to be 'exact'," @@ -533,7 +498,6 @@ class LearnerImpl : public Learner { gbm_->Configure(cfg_.begin(), cfg_.end()); } } - monitor_.Stop("LazyInitDMatrix"); } // return whether model is already initialized. diff --git a/src/linear/coordinate_common.h b/src/linear/coordinate_common.h index 72b0c9802..4c4dc3b54 100644 --- a/src/linear/coordinate_common.h +++ b/src/linear/coordinate_common.h @@ -65,9 +65,7 @@ inline std::pair GetGradient(int group_idx, int num_group, int f const std::vector &gpair, DMatrix *p_fmat) { double sum_grad = 0.0, sum_hess = 0.0; - auto iter = p_fmat->ColIterator(); - while (iter->Next()) { - auto &batch = iter->Value(); + for (const auto &batch : p_fmat->GetColumnBatches()) { auto col = batch[fidx]; const auto ndata = static_cast(col.size()); for (bst_omp_uint j = 0; j < ndata; ++j) { @@ -96,9 +94,7 @@ inline std::pair GetGradientParallel(int group_idx, int num_grou const std::vector &gpair, DMatrix *p_fmat) { double sum_grad = 0.0, sum_hess = 0.0; - auto iter = p_fmat->ColIterator(); - while (iter->Next()) { - auto &batch = iter->Value(); + for (const auto &batch : p_fmat->GetColumnBatches()) { auto col = batch[fidx]; const auto ndata = static_cast(col.size()); #pragma omp parallel for schedule(static) reduction(+ : sum_grad, sum_hess) @@ -126,12 +122,11 @@ inline std::pair GetGradientParallel(int group_idx, int num_grou inline std::pair GetBiasGradientParallel(int group_idx, int num_group, const std::vector &gpair, DMatrix *p_fmat) { - const RowSet &rowset = p_fmat->BufferedRowset(); double sum_grad = 0.0, sum_hess = 0.0; - const auto ndata = static_cast(rowset.Size()); + const auto ndata = static_cast(p_fmat->Info().num_row_); #pragma omp parallel for schedule(static) reduction(+ : sum_grad, sum_hess) for (bst_omp_uint i = 0; i < ndata; ++i) { - auto &p = gpair[rowset[i] * num_group + group_idx]; + auto &p = gpair[i * num_group + group_idx]; if (p.GetHess() >= 0.0f) { sum_grad += p.GetGrad(); sum_hess += p.GetHess(); @@ -154,9 +149,7 @@ inline void UpdateResidualParallel(int fidx, int group_idx, int num_group, float dw, std::vector *in_gpair, DMatrix *p_fmat) { if (dw == 0.0f) return; - auto iter = p_fmat->ColIterator(); - while (iter->Next()) { - auto &batch = iter->Value(); + for (const auto &batch : p_fmat->GetColumnBatches()) { auto col = batch[fidx]; // update grad value const auto num_row = static_cast(col.size()); @@ -182,11 +175,10 @@ inline void UpdateBiasResidualParallel(int group_idx, int num_group, float dbias std::vector *in_gpair, DMatrix *p_fmat) { if (dbias == 0.0f) return; - const RowSet &rowset = p_fmat->BufferedRowset(); const auto ndata = static_cast(p_fmat->Info().num_row_); #pragma omp parallel for schedule(static) for (bst_omp_uint i = 0; i < ndata; ++i) { - GradientPair &g = (*in_gpair)[rowset[i] * num_group + group_idx]; + GradientPair &g = (*in_gpair)[i * num_group + group_idx]; if (g.GetHess() < 0.0f) continue; g += GradientPair(g.GetHess() * dbias, 0); } @@ -325,9 +317,7 @@ class GreedyFeatureSelector : public FeatureSelector { const bst_omp_uint nfeat = model.param.num_feature; // Calculate univariate gradient sums std::fill(gpair_sums_.begin(), gpair_sums_.end(), std::make_pair(0., 0.)); - auto iter = p_fmat->ColIterator(); - while (iter->Next()) { - auto &batch = iter->Value(); + for (const auto &batch : p_fmat->GetColumnBatches()) { #pragma omp parallel for schedule(static) for (bst_omp_uint i = 0; i < nfeat; ++i) { const auto col = batch[i]; @@ -392,11 +382,9 @@ class ThriftyFeatureSelector : public FeatureSelector { } // Calculate univariate gradient sums std::fill(gpair_sums_.begin(), gpair_sums_.end(), std::make_pair(0., 0.)); - auto iter = p_fmat->ColIterator(); - while (iter->Next()) { - auto &batch = iter->Value(); - // column-parallel is usually faster than row-parallel - #pragma omp parallel for schedule(static) + for (const auto &batch : p_fmat->GetColumnBatches()) { +// column-parallel is usually faster than row-parallel +#pragma omp parallel for schedule(static) for (bst_omp_uint i = 0; i < nfeat; ++i) { const auto col = batch[i]; const bst_uint ndata = col.size(); diff --git a/src/linear/updater_gpu_coordinate.cu b/src/linear/updater_gpu_coordinate.cu index a05ddcba2..fbed99425 100644 --- a/src/linear/updater_gpu_coordinate.cu +++ b/src/linear/updater_gpu_coordinate.cu @@ -235,10 +235,8 @@ class GPUCoordinateUpdater : public LinearUpdater { row_begin = row_end; } - auto iter = p_fmat->ColIterator(); CHECK(p_fmat->SingleColBlock()); - iter->Next(); - auto &batch = iter->Value(); + const auto &batch = *p_fmat->GetColumnBatches().begin(); shards.resize(n_devices); // Create device shards diff --git a/src/linear/updater_shotgun.cc b/src/linear/updater_shotgun.cc index 2b760f89f..36b2acc5c 100644 --- a/src/linear/updater_shotgun.cc +++ b/src/linear/updater_shotgun.cc @@ -80,9 +80,7 @@ class ShotgunUpdater : public LinearUpdater { // lock-free parallel updates of weights selector_->Setup(*model, in_gpair->ConstHostVector(), p_fmat, param_.reg_alpha_denorm, param_.reg_lambda_denorm, 0); - auto iter = p_fmat->ColIterator(); - while (iter->Next()) { - auto &batch = iter->Value(); + for (const auto &batch : p_fmat->GetColumnBatches()) { const auto nfeat = static_cast(batch.Size()); #pragma omp parallel for schedule(static) for (bst_omp_uint i = 0; i < nfeat; ++i) { diff --git a/src/predictor/cpu_predictor.cc b/src/predictor/cpu_predictor.cc index 22a31425c..db3421381 100644 --- a/src/predictor/cpu_predictor.cc +++ b/src/predictor/cpu_predictor.cc @@ -53,10 +53,7 @@ class CPUPredictor : public Predictor { << "size_leaf_vector is enforced to 0 so far"; CHECK_EQ(preds.size(), p_fmat->Info().num_row_ * num_group); // start collecting the prediction - auto iter = p_fmat->RowIterator(); - iter->BeforeFirst(); - while (iter->Next()) { - const auto& batch = iter->Value(); + for (const auto &batch : p_fmat->GetRowBatches()) { // parallel over local batch constexpr int kUnroll = 8; const auto nsize = static_cast(batch.Size()); @@ -233,10 +230,7 @@ class CPUPredictor : public Predictor { std::vector& preds = *out_preds; preds.resize(info.num_row_ * ntree_limit); // start collecting the prediction - auto iter = p_fmat->RowIterator(); - iter->BeforeFirst(); - while (iter->Next()) { - auto &batch = iter->Value(); + for (const auto &batch : p_fmat->GetRowBatches()) { // parallel over local batch const auto nsize = static_cast(batch.Size()); #pragma omp parallel for schedule(static) @@ -280,12 +274,9 @@ class CPUPredictor : public Predictor { for (bst_omp_uint i = 0; i < ntree_limit; ++i) { model.trees[i]->FillNodeMeanValues(); } + const std::vector& base_margin = info.base_margin_.HostVector(); // start collecting the contributions - auto iter = p_fmat->RowIterator(); - const auto& base_margin = info.base_margin_.HostVector(); - iter->BeforeFirst(); - while (iter->Next()) { - auto &batch = iter->Value(); + for (const auto &batch : p_fmat->GetRowBatches()) { // parallel over local batch const auto nsize = static_cast(batch.Size()); #pragma omp parallel for schedule(static) diff --git a/src/predictor/gpu_predictor.cu b/src/predictor/gpu_predictor.cu index 4c11643c6..18c23c126 100644 --- a/src/predictor/gpu_predictor.cu +++ b/src/predictor/gpu_predictor.cu @@ -61,11 +61,8 @@ struct DeviceMatrix { const auto& info = dmat->Info(); ba.Allocate(device_idx, silent, &row_ptr, info.num_row_ + 1, &data, info.num_nonzero_); - auto iter = dmat->RowIterator(); - iter->BeforeFirst(); size_t data_offset = 0; - while (iter->Next()) { - const auto& batch = iter->Value(); + for (const auto &batch : dmat->GetRowBatches()) { const auto& offset_vec = batch.offset.HostVector(); const auto& data_vec = batch.data.HostVector(); // Copy row ptr diff --git a/src/tree/updater_basemaker-inl.h b/src/tree/updater_basemaker-inl.h index 55ff2c774..e6c021fc9 100644 --- a/src/tree/updater_basemaker-inl.h +++ b/src/tree/updater_basemaker-inl.h @@ -43,15 +43,14 @@ class BaseMaker: public TreeUpdater { std::fill(fminmax_.begin(), fminmax_.end(), -std::numeric_limits::max()); // start accumulating statistics - auto iter = p_fmat->ColIterator(); - iter->BeforeFirst(); - while (iter->Next()) { - auto &batch = iter->Value(); + for (const auto &batch : p_fmat->GetSortedColumnBatches()) { for (bst_uint fid = 0; fid < batch.Size(); ++fid) { - auto c = batch[fid]; + auto c = batch[fid]; if (c.size() != 0) { - fminmax_[fid * 2 + 0] = std::max(-c[0].fvalue, fminmax_[fid * 2 + 0]); - fminmax_[fid * 2 + 1] = std::max(c[c.size() - 1].fvalue, fminmax_[fid * 2 + 1]); + fminmax_[fid * 2 + 0] = + std::max(-c[0].fvalue, fminmax_[fid * 2 + 0]); + fminmax_[fid * 2 + 1] = + std::max(c[c.size() - 1].fvalue, fminmax_[fid * 2 + 1]); } } } @@ -208,16 +207,13 @@ class BaseMaker: public TreeUpdater { */ inline void SetDefaultPostion(DMatrix *p_fmat, const RegTree &tree) { - // set rest of instances to default position - const RowSet &rowset = p_fmat->BufferedRowset(); // set default direct nodes to default // for leaf nodes that are not fresh, mark then to ~nid, // so that they are ignored in future statistics collection - const auto ndata = static_cast(rowset.Size()); + const auto ndata = static_cast(p_fmat->Info().num_row_); #pragma omp parallel for schedule(static) - for (bst_omp_uint i = 0; i < ndata; ++i) { - const bst_uint ridx = rowset[i]; + for (bst_omp_uint ridx = 0; ridx < ndata; ++ridx) { const int nid = this->DecodePosition(ridx); if (tree[nid].IsLeaf()) { // mark finish when it is not a fresh leaf @@ -303,9 +299,7 @@ class BaseMaker: public TreeUpdater { const RegTree &tree) { std::vector fsplits; this->GetSplitSet(nodes, tree, &fsplits); - auto iter = p_fmat->ColIterator(); - while (iter->Next()) { - auto &batch = iter->Value(); + for (const auto &batch : p_fmat->GetSortedColumnBatches()) { for (auto fid : fsplits) { auto col = batch[fid]; const auto ndata = static_cast(col.size()); @@ -345,12 +339,10 @@ class BaseMaker: public TreeUpdater { thread_temp[tid][nid].Clear(); } } - const RowSet &rowset = fmat.BufferedRowset(); // setup position - const auto ndata = static_cast(rowset.Size()); + const auto ndata = static_cast(fmat.Info().num_row_); #pragma omp parallel for schedule(static) - for (bst_omp_uint i = 0; i < ndata; ++i) { - const bst_uint ridx = rowset[i]; + for (bst_omp_uint ridx = 0; ridx < ndata; ++ridx) { const int nid = position_[ridx]; const int tid = omp_get_thread_num(); if (nid >= 0) { diff --git a/src/tree/updater_colmaker.cc b/src/tree/updater_colmaker.cc index 42a681b45..bda3103b6 100644 --- a/src/tree/updater_colmaker.cc +++ b/src/tree/updater_colmaker.cc @@ -141,32 +141,27 @@ class ColMaker: public TreeUpdater { CHECK_EQ(tree.param.num_nodes, tree.param.num_roots) << "ColMaker: can only grow new tree"; const std::vector& root_index = fmat.Info().root_index_; - const RowSet& rowset = fmat.BufferedRowset(); { // setup position position_.resize(gpair.size()); + CHECK_EQ(fmat.Info().num_row_, position_.size()); if (root_index.size() == 0) { - for (size_t i = 0; i < rowset.Size(); ++i) { - position_[rowset[i]] = 0; - } + std::fill(position_.begin(), position_.end(), 0); } else { - for (size_t i = 0; i < rowset.Size(); ++i) { - const bst_uint ridx = rowset[i]; + for (size_t ridx = 0; ridx < position_.size(); ++ridx) { position_[ridx] = root_index[ridx]; CHECK_LT(root_index[ridx], (unsigned)tree.param.num_roots); } } // mark delete for the deleted datas - for (size_t i = 0; i < rowset.Size(); ++i) { - const bst_uint ridx = rowset[i]; + for (size_t ridx = 0; ridx < position_.size(); ++ridx) { if (gpair[ridx].GetHess() < 0.0f) position_[ridx] = ~position_[ridx]; } // mark subsample if (param_.subsample < 1.0f) { std::bernoulli_distribution coin_flip(param_.subsample); auto& rnd = common::GlobalRandom(); - for (size_t i = 0; i < rowset.Size(); ++i) { - const bst_uint ridx = rowset[i]; + for (size_t ridx = 0; ridx < position_.size(); ++ridx) { if (gpair[ridx].GetHess() < 0.0f) continue; if (!coin_flip(rnd)) position_[ridx] = ~position_[ridx]; } @@ -209,13 +204,11 @@ class ColMaker: public TreeUpdater { } snode_.resize(tree.param.num_nodes, NodeEntry(param_)); } - const RowSet &rowset = fmat.BufferedRowset(); const MetaInfo& info = fmat.Info(); // setup position - const auto ndata = static_cast(rowset.Size()); + const auto ndata = static_cast(info.num_row_); #pragma omp parallel for schedule(static) - for (bst_omp_uint i = 0; i < ndata; ++i) { - const bst_uint ridx = rowset[i]; + for (bst_omp_uint ridx = 0; ridx < ndata; ++ridx) { const int tid = omp_get_thread_num(); if (position_[ridx] < 0) continue; stemp_[tid][position_[ridx]].stats.Add(gpair, info, ridx); @@ -254,13 +247,13 @@ class ColMaker: public TreeUpdater { // this function does not support nested functions inline void ParallelFindSplit(const SparsePage::Inst &col, bst_uint fid, - const DMatrix &fmat, + DMatrix *p_fmat, const std::vector &gpair) { // TODO(tqchen): double check stats order. - const MetaInfo& info = fmat.Info(); + const MetaInfo& info = p_fmat->Info(); const bool ind = col.size() != 0 && col[0].fvalue == col[col.size() - 1].fvalue; - bool need_forward = param_.NeedForwardSearch(fmat.GetColDensity(fid), ind); - bool need_backward = param_.NeedBackwardSearch(fmat.GetColDensity(fid), ind); + bool need_forward = param_.NeedForwardSearch(p_fmat->GetColDensity(fid), ind); + bool need_backward = param_.NeedBackwardSearch(p_fmat->GetColDensity(fid), ind); const std::vector &qexpand = qexpand_; #pragma omp parallel { @@ -592,8 +585,8 @@ class ColMaker: public TreeUpdater { virtual void UpdateSolution(const SparsePage &batch, const std::vector &feat_set, const std::vector &gpair, - const DMatrix &fmat) { - const MetaInfo& info = fmat.Info(); + DMatrix*p_fmat) { + const MetaInfo& info = p_fmat->Info(); // start enumeration const auto num_features = static_cast(feat_set.size()); #if defined(_OPENMP) @@ -610,11 +603,11 @@ class ColMaker: public TreeUpdater { const int tid = omp_get_thread_num(); auto c = batch[fid]; const bool ind = c.size() != 0 && c[0].fvalue == c[c.size() - 1].fvalue; - if (param_.NeedForwardSearch(fmat.GetColDensity(fid), ind)) { + if (param_.NeedForwardSearch(p_fmat->GetColDensity(fid), ind)) { this->EnumerateSplit(c.data(), c.data() + c.size(), +1, fid, gpair, info, stemp_[tid]); } - if (param_.NeedBackwardSearch(fmat.GetColDensity(fid), ind)) { + if (param_.NeedBackwardSearch(p_fmat->GetColDensity(fid), ind)) { this->EnumerateSplit(c.data() + c.size() - 1, c.data() - 1, -1, fid, gpair, info, stemp_[tid]); } @@ -622,7 +615,7 @@ class ColMaker: public TreeUpdater { } else { for (bst_omp_uint fid = 0; fid < num_features; ++fid) { this->ParallelFindSplit(batch[fid], fid, - fmat, gpair); + p_fmat, gpair); } } } @@ -633,9 +626,8 @@ class ColMaker: public TreeUpdater { DMatrix *p_fmat, RegTree *p_tree) { const std::vector &feat_set = column_sampler_.GetFeatureSet(depth).HostVector(); - auto iter = p_fmat->ColIterator(); - while (iter->Next()) { - this->UpdateSolution(iter->Value(), feat_set, gpair, *p_fmat); + for (const auto &batch : p_fmat->GetSortedColumnBatches()) { + this->UpdateSolution(batch, feat_set, gpair, p_fmat); } // after this each thread's stemp will get the best candidates, aggregate results this->SyncBestSolution(qexpand); @@ -661,15 +653,13 @@ class ColMaker: public TreeUpdater { // set the positions in the nondefault this->SetNonDefaultPosition(qexpand, p_fmat, tree); // set rest of instances to default position - const RowSet &rowset = p_fmat->BufferedRowset(); // set default direct nodes to default // for leaf nodes that are not fresh, mark then to ~nid, // so that they are ignored in future statistics collection - const auto ndata = static_cast(rowset.Size()); + const auto ndata = static_cast(p_fmat->Info().num_row_); #pragma omp parallel for schedule(static) - for (bst_omp_uint i = 0; i < ndata; ++i) { - const bst_uint ridx = rowset[i]; + for (bst_omp_uint ridx = 0; ridx < ndata; ++ridx) { CHECK_LT(ridx, position_.size()) << "ridx exceed bound " << "ridx="<< ridx << " pos=" << position_.size(); const int nid = this->DecodePosition(ridx); @@ -710,9 +700,7 @@ class ColMaker: public TreeUpdater { } std::sort(fsplits.begin(), fsplits.end()); fsplits.resize(std::unique(fsplits.begin(), fsplits.end()) - fsplits.begin()); - auto iter = p_fmat->ColIterator(); - while (iter->Next()) { - auto &batch = iter->Value(); + for (const auto &batch : p_fmat->GetSortedColumnBatches()) { for (auto fid : fsplits) { auto col = batch[fid]; const auto ndata = static_cast(col.size()); @@ -798,11 +786,9 @@ class DistColMaker : public ColMaker { std::unique_ptr spliteval) : ColMaker::Builder(param, std::move(spliteval)) {} inline void UpdatePosition(DMatrix* p_fmat, const RegTree &tree) { - const RowSet &rowset = p_fmat->BufferedRowset(); - const auto ndata = static_cast(rowset.Size()); + const auto ndata = static_cast(p_fmat->Info().num_row_); #pragma omp parallel for schedule(static) - for (bst_omp_uint i = 0; i < ndata; ++i) { - const bst_uint ridx = rowset[i]; + for (bst_omp_uint ridx = 0; ridx < ndata; ++ridx) { int nid = this->DecodePosition(ridx); while (tree[nid].IsDeleted()) { nid = tree[nid].Parent(); @@ -840,9 +826,7 @@ class DistColMaker : public ColMaker { boolmap_[j] = 0; } } - auto iter = p_fmat->ColIterator(); - while (iter->Next()) { - auto &batch = iter->Value(); + for (const auto &batch : p_fmat->GetSortedColumnBatches()) { for (auto fid : fsplits) { auto col = batch[fid]; const auto ndata = static_cast(col.size()); @@ -865,12 +849,10 @@ class DistColMaker : public ColMaker { bitmap_.InitFromBool(boolmap_); // communicate bitmap rabit::Allreduce(dmlc::BeginPtr(bitmap_.data), bitmap_.data.size()); - const RowSet &rowset = p_fmat->BufferedRowset(); // get the new position - const auto ndata = static_cast(rowset.Size()); + const auto ndata = static_cast(p_fmat->Info().num_row_); #pragma omp parallel for schedule(static) - for (bst_omp_uint i = 0; i < ndata; ++i) { - const bst_uint ridx = rowset[i]; + for (bst_omp_uint ridx = 0; ridx < ndata; ++ridx) { const int nid = this->DecodePosition(ridx); if (bitmap_.Get(ridx)) { CHECK(!tree[nid].IsLeaf()) << "inconsistent reduce information"; diff --git a/src/tree/updater_gpu.cu b/src/tree/updater_gpu.cu index 05368a82b..cba128704 100644 --- a/src/tree/updater_gpu.cu +++ b/src/tree/updater_gpu.cu @@ -661,19 +661,12 @@ class GPUMaker : public TreeUpdater { fId->reserve(nCols * nRows); // in case you end up with a DMatrix having no column access // then make sure to enable that before copying the data! - if (!dmat->HaveColAccess(true)) { - dmat->InitColAccess(nRows, true); - } - auto iter = dmat->ColIterator(); - iter->BeforeFirst(); - while (iter->Next()) { - auto &batch = iter->Value(); + for (const auto& batch : dmat->GetSortedColumnBatches()) { for (int i = 0; i < batch.Size(); i++) { auto col = batch[i]; - for (const Entry* it = col.data(); it != col.data() + col.size(); - it++) { - int inst_id = static_cast(it->index); - fval->push_back(it->fvalue); + for (const Entry& e : col) { + int inst_id = static_cast(e.index); + fval->push_back(e.fvalue); fId->push_back(inst_id); } offset->push_back(fval->size()); diff --git a/src/tree/updater_gpu_hist.cu b/src/tree/updater_gpu_hist.cu index d1b7f61ea..20b743466 100644 --- a/src/tree/updater_gpu_hist.cu +++ b/src/tree/updater_gpu_hist.cu @@ -19,6 +19,7 @@ #include "../common/hist_util.h" #include "../common/host_device_vector.h" #include "../common/timer.h" +#include "../common/common.h" #include "param.h" #include "updater_gpu_common.cuh" @@ -803,10 +804,8 @@ class GPUHistMaker : public TreeUpdater { reducer_.Init(device_list_); - dmlc::DataIter* iter = dmat->RowIterator(); - iter->BeforeFirst(); - CHECK(iter->Next()) << "Empty batches are not supported"; - const SparsePage& batch = iter->Value(); + auto batch_iter = dmat->GetRowBatches().begin(); + const SparsePage& batch = *batch_iter; // Create device shards shards_.resize(n_devices); dh::ExecuteIndexShards(&shards_, [&](int i, std::unique_ptr& shard) { @@ -828,8 +827,8 @@ class GPUHistMaker : public TreeUpdater { shard->InitCompressedData(hmat_, batch); }); monitor_.Stop("BinningCompression", dist_.Devices()); - - CHECK(!iter->Next()) << "External memory not supported"; + ++batch_iter; + CHECK(batch_iter.AtEnd()) << "External memory not supported"; p_last_fmat_ = dmat; initialised_ = true; diff --git a/src/tree/updater_histmaker.cc b/src/tree/updater_histmaker.cc index b64a7d307..e0167afa4 100644 --- a/src/tree/updater_histmaker.cc +++ b/src/tree/updater_histmaker.cc @@ -344,10 +344,7 @@ class CQHistMaker: public HistMaker { { thread_hist_.resize(omp_get_max_threads()); // start accumulating statistics - auto iter = p_fmat->ColIterator(); - iter->BeforeFirst(); - while (iter->Next()) { - auto &batch = iter->Value(); + for (const auto &batch : p_fmat->GetSortedColumnBatches()) { // start enumeration const auto nsize = static_cast(fset.size()); #pragma omp parallel for schedule(dynamic, 1) @@ -426,10 +423,7 @@ class CQHistMaker: public HistMaker { work_set_.resize(std::unique(work_set_.begin(), work_set_.end()) - work_set_.begin()); // start accumulating statistics - auto iter = p_fmat->ColIterator(); - iter->BeforeFirst(); - while (iter->Next()) { - auto &batch = iter->Value(); + for (const auto &batch : p_fmat->GetSortedColumnBatches()) { // TWOPASS: use the real set + split set in the column iteration. this->CorrectNonDefaultPositionByBatch(batch, fsplit_set_, tree); @@ -714,10 +708,7 @@ class GlobalProposalHistMaker: public CQHistMaker { std::unique(this->work_set_.begin(), this->work_set_.end()) - this->work_set_.begin()); // start accumulating statistics - auto iter = p_fmat->ColIterator(); - iter->BeforeFirst(); - while (iter->Next()) { - auto &batch = iter->Value(); + for (const auto &batch : p_fmat->GetSortedColumnBatches()) { // TWOPASS: use the real set + split set in the column iteration. this->CorrectNonDefaultPositionByBatch(batch, this->fsplit_set_, tree); @@ -772,10 +763,7 @@ class QuantileHistMaker: public HistMaker { sketchs_[i].Init(info.num_row_, this->param_.sketch_eps); } // start accumulating statistics - auto iter = p_fmat->RowIterator(); - iter->BeforeFirst(); - while (iter->Next()) { - auto &batch = iter->Value(); + for (const auto &batch : p_fmat->GetRowBatches()) { // parallel convert to column major format common::ParallelGroupBuilder builder(&col_ptr_, &col_data_, &thread_col_ptr_); diff --git a/src/tree/updater_refresh.cc b/src/tree/updater_refresh.cc index 3df4baeea..9fb273a50 100644 --- a/src/tree/updater_refresh.cc +++ b/src/tree/updater_refresh.cc @@ -57,10 +57,7 @@ class TreeRefresher: public TreeUpdater { { const MetaInfo &info = p_fmat->Info(); // start accumulating statistics - auto *iter = p_fmat->RowIterator(); - iter->BeforeFirst(); - while (iter->Next()) { - auto &batch = iter->Value(); + for (const auto &batch : p_fmat->GetRowBatches()) { CHECK_LT(batch.Size(), std::numeric_limits::max()); const auto nbatch = static_cast(batch.Size()); #pragma omp parallel for schedule(static) diff --git a/src/tree/updater_skmaker.cc b/src/tree/updater_skmaker.cc index bf27e2c94..6848c9b0f 100644 --- a/src/tree/updater_skmaker.cc +++ b/src/tree/updater_skmaker.cc @@ -142,12 +142,9 @@ class SketchMaker: public BaseMaker { } thread_sketch_.resize(omp_get_max_threads()); // number of rows in - const size_t nrows = p_fmat->BufferedRowset().Size(); + const size_t nrows = p_fmat->Info().num_row_; // start accumulating statistics - auto iter = p_fmat->ColIterator(); - iter->BeforeFirst(); - while (iter->Next()) { - auto &batch = iter->Value(); + for (const auto &batch : p_fmat->GetSortedColumnBatches()) { // start enumeration const auto nsize = static_cast(batch.Size()); #pragma omp parallel for schedule(dynamic, 1) diff --git a/tests/cpp/c_api/test_c_api.cc b/tests/cpp/c_api/test_c_api.cc index 05ef93195..5942dc030 100644 --- a/tests/cpp/c_api/test_c_api.cc +++ b/tests/cpp/c_api/test_c_api.cc @@ -20,10 +20,7 @@ TEST(c_api, XGDMatrixCreateFromMatDT) { ASSERT_EQ(info.num_row_, 3); ASSERT_EQ(info.num_nonzero_, 6); - auto iter = (*dmat)->RowIterator(); - iter->BeforeFirst(); - while (iter->Next()) { - auto batch = iter->Value(); + for (const auto &batch : (*dmat)->GetRowBatches()) { ASSERT_EQ(batch[0][0].fvalue, 0.0f); ASSERT_EQ(batch[0][1].fvalue, -4.0f); ASSERT_EQ(batch[2][0].fvalue, 3.0f); @@ -55,10 +52,7 @@ TEST(c_api, XGDMatrixCreateFromMat_omp) { ASSERT_EQ(info.num_row_, row); ASSERT_EQ(info.num_nonzero_, num_cols * row - num_missing); - auto iter = (*dmat)->RowIterator(); - iter->BeforeFirst(); - while (iter->Next()) { - auto batch = iter->Value(); + for (const auto &batch : (*dmat)->GetRowBatches()) { for (int i = 0; i < batch.Size(); i++) { auto inst = batch[i]; for (int j = 0; i < inst.size(); i++) { diff --git a/tests/cpp/common/test_gpu_hist_util.cu b/tests/cpp/common/test_gpu_hist_util.cu index f8f550687..c3a1741a5 100644 --- a/tests/cpp/common/test_gpu_hist_util.cu +++ b/tests/cpp/common/test_gpu_hist_util.cu @@ -37,13 +37,9 @@ TEST(gpu_hist_util, TestDeviceSketch) { hmat_cpu.Init((*dmat).get(), p.max_bin); // find the cuts on the GPU - dmlc::DataIter* iter = (*dmat)->RowIterator(); - iter->BeforeFirst(); - CHECK(iter->Next()); - const SparsePage& batch = iter->Value(); + const SparsePage& batch = *(*dmat)->GetRowBatches().begin(); HistCutMatrix hmat_gpu; DeviceSketch(batch, (*dmat)->Info(), p, &hmat_gpu); - CHECK(!iter->Next()); // compare the cuts double eps = 1e-2; diff --git a/tests/cpp/data/test_metainfo.cc b/tests/cpp/data/test_metainfo.cc index 7231e880d..9c13752fc 100644 --- a/tests/cpp/data/test_metainfo.cc +++ b/tests/cpp/data/test_metainfo.cc @@ -123,12 +123,9 @@ TEST(MetaInfo, LoadQid) { xgboost::Entry(2, 0), xgboost::Entry(3, 0), xgboost::Entry(4, 0.4), xgboost::Entry(5, 1), xgboost::Entry(1, 0), xgboost::Entry(2, 1), xgboost::Entry(3, 1), xgboost::Entry(4, 0.5), {5, 0}}; - dmlc::DataIter* iter = dmat->RowIterator(); - iter->BeforeFirst(); - CHECK(iter->Next()); - const xgboost::SparsePage& batch = iter->Value(); - CHECK_EQ(batch.base_rowid, 0); - CHECK(batch.offset.HostVector() == expected_offset); - CHECK(batch.data.HostVector() == expected_data); - CHECK(!iter->Next()); + for (const auto &batch : dmat->GetRowBatches()) { + CHECK_EQ(batch.base_rowid, 0); + CHECK(batch.offset.HostVector() == expected_offset); + CHECK(batch.data.HostVector() == expected_data); + } } diff --git a/tests/cpp/data/test_simple_csr_source.cc b/tests/cpp/data/test_simple_csr_source.cc index aba08b893..71bd7716d 100644 --- a/tests/cpp/data/test_simple_csr_source.cc +++ b/tests/cpp/data/test_simple_csr_source.cc @@ -18,18 +18,17 @@ TEST(SimpleCSRSource, SaveLoadBinary) { EXPECT_EQ(dmat->Info().num_row_, dmat_read->Info().num_row_); EXPECT_EQ(dmat->Info().num_row_, dmat_read->Info().num_row_); - auto row_iter = dmat->RowIterator(); - auto row_iter_read = dmat_read->RowIterator(); + // Test we have non-empty batch + EXPECT_EQ(dmat->GetRowBatches().begin().AtEnd(), false); + + auto row_iter = dmat->GetRowBatches().begin(); + auto row_iter_read = dmat_read->GetRowBatches().begin(); // Test the data read into the first row - row_iter->BeforeFirst(); row_iter->Next(); - row_iter_read->BeforeFirst(); row_iter_read->Next(); - auto first_row = row_iter->Value()[0]; - auto first_row_read = row_iter_read->Value()[0]; + auto first_row = (*row_iter)[0]; + auto first_row_read = (*row_iter_read)[0]; EXPECT_EQ(first_row.size(), first_row_read.size()); EXPECT_EQ(first_row[2].index, first_row_read[2].index); EXPECT_EQ(first_row[2].fvalue, first_row_read[2].fvalue); - row_iter = nullptr; row_iter_read = nullptr; - delete dmat; delete dmat_read; } diff --git a/tests/cpp/data/test_simple_dmatrix.cc b/tests/cpp/data/test_simple_dmatrix.cc index 2d3ae0332..f403d7657 100644 --- a/tests/cpp/data/test_simple_dmatrix.cc +++ b/tests/cpp/data/test_simple_dmatrix.cc @@ -23,20 +23,18 @@ TEST(SimpleDMatrix, RowAccess) { xgboost::DMatrix * dmat = xgboost::DMatrix::Load(tmp_file, false, false); std::remove(tmp_file.c_str()); - auto row_iter = dmat->RowIterator(); // Loop over the batches and count the records long row_count = 0; - row_iter->BeforeFirst(); - while (row_iter->Next()) row_count += row_iter->Value().Size(); + for (auto &batch : dmat->GetRowBatches()) { + row_count += batch.Size(); + } EXPECT_EQ(row_count, dmat->Info().num_row_); // Test the data read into the first row - row_iter->BeforeFirst(); - row_iter->Next(); - auto first_row = row_iter->Value()[0]; + auto &batch = *dmat->GetRowBatches().begin(); + auto first_row = batch[0]; ASSERT_EQ(first_row.size(), 3); EXPECT_EQ(first_row[2].index, 2); EXPECT_EQ(first_row[2].fvalue, 20); - row_iter = nullptr; delete dmat; } @@ -46,40 +44,18 @@ TEST(SimpleDMatrix, ColAccessWithoutBatches) { xgboost::DMatrix * dmat = xgboost::DMatrix::Load(tmp_file, true, false); std::remove(tmp_file.c_str()); - // Unsorted column access - const std::vector enable(dmat->Info().num_col_, true); - EXPECT_EQ(dmat->HaveColAccess(false), false); - dmat->InitColAccess(dmat->Info().num_row_, false); - dmat->InitColAccess(0, false); // Calling it again should not change it - ASSERT_EQ(dmat->HaveColAccess(false), true); - // Sorted column access - EXPECT_EQ(dmat->HaveColAccess(true), false); - dmat->InitColAccess(dmat->Info().num_row_, true); - dmat->InitColAccess(0, true); // Calling it again should not change it - ASSERT_EQ(dmat->HaveColAccess(true), true); - - EXPECT_EQ(dmat->GetColSize(0), 2); - EXPECT_EQ(dmat->GetColSize(1), 1); EXPECT_EQ(dmat->GetColDensity(0), 1); EXPECT_EQ(dmat->GetColDensity(1), 0.5); ASSERT_TRUE(dmat->SingleColBlock()); - auto* col_iter = dmat->ColIterator(); // Loop over the batches and assert the data is as expected long num_col_batch = 0; - col_iter->BeforeFirst(); - while (col_iter->Next()) { + for (const auto &batch : dmat->GetSortedColumnBatches()) { num_col_batch += 1; - EXPECT_EQ(col_iter->Value().Size(), dmat->Info().num_col_) - << "Expected batch size = number of cells as #batches is 1."; - for (int i = 0; i < static_cast(col_iter->Value().Size()); ++i) { - EXPECT_EQ(col_iter->Value()[i].size(), dmat->GetColSize(i)) - << "Expected length of each colbatch = colsize as #batches is 1."; - } + EXPECT_EQ(batch.Size(), dmat->Info().num_col_) + << "Expected batch size = number of cells as #batches is 1."; } EXPECT_EQ(num_col_batch, 1) << "Expected number of batches to be 1"; - col_iter = nullptr; - delete dmat; } diff --git a/tests/cpp/data/test_sparse_page_dmatrix.cc b/tests/cpp/data/test_sparse_page_dmatrix.cc index 209c033d6..1c8533036 100644 --- a/tests/cpp/data/test_sparse_page_dmatrix.cc +++ b/tests/cpp/data/test_sparse_page_dmatrix.cc @@ -8,7 +8,6 @@ TEST(SparsePageDMatrix, MetaInfo) { std::string tmp_file = CreateSimpleTestData(); xgboost::DMatrix * dmat = xgboost::DMatrix::Load( tmp_file + "#" + tmp_file + ".cache", false, false); - std::remove(tmp_file.c_str()); std::cout << tmp_file << std::endl; EXPECT_TRUE(FileExists(tmp_file + ".cache")); @@ -19,6 +18,7 @@ TEST(SparsePageDMatrix, MetaInfo) { EXPECT_EQ(dmat->Info().labels_.Size(), dmat->Info().num_row_); // Clean up of external memory files + std::remove(tmp_file.c_str()); std::remove((tmp_file + ".cache").c_str()); std::remove((tmp_file + ".cache.row.page").c_str()); @@ -26,26 +26,26 @@ TEST(SparsePageDMatrix, MetaInfo) { } TEST(SparsePageDMatrix, RowAccess) { - std::string tmp_file = CreateSimpleTestData(); + // Create sufficiently large data to make two row pages + std::string tmp_file = CreateBigTestData(5000000); xgboost::DMatrix * dmat = xgboost::DMatrix::Load( tmp_file + "#" + tmp_file + ".cache", true, false); std::remove(tmp_file.c_str()); EXPECT_TRUE(FileExists(tmp_file + ".cache.row.page")); - auto row_iter = dmat->RowIterator(); // Loop over the batches and count the records long row_count = 0; - row_iter->BeforeFirst(); - while (row_iter->Next()) row_count += row_iter->Value().Size(); + for (auto &batch : dmat->GetRowBatches()) { + row_count += batch.Size(); + } EXPECT_EQ(row_count, dmat->Info().num_row_); + // Test the data read into the first row - row_iter->BeforeFirst(); - row_iter->Next(); - auto first_row = row_iter->Value()[0]; + auto &batch = *dmat->GetRowBatches().begin(); + auto first_row = batch[0]; ASSERT_EQ(first_row.size(), 3); EXPECT_EQ(first_row[2].index, 2); EXPECT_EQ(first_row[2].fvalue, 20); - row_iter = nullptr; // Clean up of external memory files std::remove((tmp_file + ".cache").c_str()); @@ -59,35 +59,33 @@ TEST(SparsePageDMatrix, ColAccess) { xgboost::DMatrix * dmat = xgboost::DMatrix::Load( tmp_file + "#" + tmp_file + ".cache", true, false); std::remove(tmp_file.c_str()); - EXPECT_FALSE(FileExists(tmp_file + ".cache.col.page")); - EXPECT_EQ(dmat->HaveColAccess(true), false); - const std::vector enable(dmat->Info().num_col_, true); - dmat->InitColAccess(1, true); // Max 1 row per patch - ASSERT_EQ(dmat->HaveColAccess(true), true); - EXPECT_TRUE(FileExists(tmp_file + ".cache.col.page")); - - EXPECT_EQ(dmat->GetColSize(0), 2); - EXPECT_EQ(dmat->GetColSize(1), 1); EXPECT_EQ(dmat->GetColDensity(0), 1); EXPECT_EQ(dmat->GetColDensity(1), 0.5); - auto col_iter = dmat->ColIterator(); // Loop over the batches and assert the data is as expected - long num_col_batch = 0; - col_iter->BeforeFirst(); - while (col_iter->Next()) { - num_col_batch += 1; - EXPECT_EQ(col_iter->Value().Size(), dmat->Info().num_col_) - << "Expected batch size to be same as num_cols as max_row_perbatch is 1."; + for (auto col_batch : dmat->GetSortedColumnBatches()) { + EXPECT_EQ(col_batch.Size(), dmat->Info().num_col_); + EXPECT_EQ(col_batch[1][0].fvalue, 10.0f); + EXPECT_EQ(col_batch[1].size(), 1); } - EXPECT_EQ(num_col_batch, dmat->Info().num_row_) - << "Expected num batches to be same as num_rows as max_row_perbatch is 1"; - col_iter = nullptr; + + // Loop over the batches and assert the data is as expected + for (auto col_batch : dmat->GetColumnBatches()) { + EXPECT_EQ(col_batch.Size(), dmat->Info().num_col_); + EXPECT_EQ(col_batch[1][0].fvalue, 10.0f); + EXPECT_EQ(col_batch[1].size(), 1); + } + + EXPECT_TRUE(FileExists(tmp_file + ".cache")); + EXPECT_TRUE(FileExists(tmp_file + ".cache.row.page")); + EXPECT_TRUE(FileExists(tmp_file + ".cache.col.page")); + EXPECT_TRUE(FileExists(tmp_file + ".cache.sorted.col.page")); std::remove((tmp_file + ".cache").c_str()); - std::remove((tmp_file + ".cache.col.page").c_str()); std::remove((tmp_file + ".cache.row.page").c_str()); + std::remove((tmp_file + ".cache.col.page").c_str()); + std::remove((tmp_file + ".cache.sorted.col.page").c_str()); delete dmat; } diff --git a/tests/cpp/linear/test_linear.cc b/tests/cpp/linear/test_linear.cc index de5f0f2fb..35db3180e 100644 --- a/tests/cpp/linear/test_linear.cc +++ b/tests/cpp/linear/test_linear.cc @@ -8,8 +8,6 @@ typedef std::pair arg; TEST(Linear, shotgun) { typedef std::pair arg; auto mat = CreateDMatrix(10, 10, 0); - std::vector enabled((*mat)->Info().num_col_, true); - (*mat)->InitColAccess(1 << 16, false); auto updater = std::unique_ptr( xgboost::LinearUpdater::Create("shotgun")); updater->Init({{"eta", "1."}}); @@ -29,8 +27,6 @@ TEST(Linear, shotgun) { TEST(Linear, coordinate) { typedef std::pair arg; auto mat = CreateDMatrix(10, 10, 0); - std::vector enabled((*mat)->Info().num_col_, true); - (*mat)->InitColAccess(1 << 16, false); auto updater = std::unique_ptr( xgboost::LinearUpdater::Create("coord_descent")); updater->Init({{"eta", "1."}}); diff --git a/tests/cpp/predictor/test_cpu_predictor.cc b/tests/cpp/predictor/test_cpu_predictor.cc index 784c0dfa9..f9e575c29 100644 --- a/tests/cpp/predictor/test_cpu_predictor.cc +++ b/tests/cpp/predictor/test_cpu_predictor.cc @@ -32,7 +32,7 @@ TEST(cpu_predictor, Test) { } // Test predict instance - auto batch = (*dmat)->RowIterator()->Value(); + auto &batch = *(*dmat)->GetRowBatches().begin(); for (int i = 0; i < batch.Size(); i++) { std::vector instance_out_predictions; cpu_predictor->PredictInstance(batch[i], &instance_out_predictions, model); diff --git a/tests/cpp/predictor/test_gpu_predictor.cu b/tests/cpp/predictor/test_gpu_predictor.cu index 2967da226..82f771997 100644 --- a/tests/cpp/predictor/test_gpu_predictor.cu +++ b/tests/cpp/predictor/test_gpu_predictor.cu @@ -45,7 +45,7 @@ TEST(gpu_predictor, Test) { abs_tolerance); } // Test predict instance - auto batch = (*dmat)->RowIterator()->Value(); + const auto &batch = *(*dmat)->GetRowBatches().begin(); for (int i = 0; i < batch.Size(); i++) { std::vector gpu_instance_out_predictions; std::vector cpu_instance_out_predictions; diff --git a/tests/cpp/tree/test_gpu_hist.cu b/tests/cpp/tree/test_gpu_hist.cu index 454306939..429f45e50 100644 --- a/tests/cpp/tree/test_gpu_hist.cu +++ b/tests/cpp/tree/test_gpu_hist.cu @@ -24,14 +24,10 @@ TEST(gpu_hist_experimental, TestSparseShard) { TrainParam p; p.max_depth = 6; - dmlc::DataIter* iter = (*dmat)->RowIterator(); - iter->BeforeFirst(); - CHECK(iter->Next()); - const SparsePage& batch = iter->Value(); + const SparsePage& batch = *(*dmat)->GetRowBatches().begin(); DeviceShard shard(0, 0, 0, rows, p); shard.InitRowPtrs(batch); shard.InitCompressedData(gmat.cut, batch); - CHECK(!iter->Next()); ASSERT_LT(shard.row_stride, columns); @@ -65,15 +61,10 @@ TEST(gpu_hist_experimental, TestDenseShard) { TrainParam p; p.max_depth = 6; - dmlc::DataIter* iter = (*dmat)->RowIterator(); - iter->BeforeFirst(); - CHECK(iter->Next()); - const SparsePage& batch = iter->Value(); - + const SparsePage& batch = *(*dmat)->GetRowBatches().begin(); DeviceShard shard(0, 0, 0, rows, p); shard.InitRowPtrs(batch); shard.InitCompressedData(gmat.cut, batch); - CHECK(!iter->Next()); ASSERT_EQ(shard.row_stride, columns);