diff --git a/Makefile b/Makefile index 61964aacf..80507dfde 100644 --- a/Makefile +++ b/Makefile @@ -102,7 +102,7 @@ lint: python2 dmlc-core/scripts/lint.py xgboost ${LINT_LANG} include src clean: - $(RM) -r build lib bin *~ */*~ */*/*~ */*/*/*~ $(AMALGA_OBJ) + $(RM) -rf build lib bin *~ */*~ */*/*~ */*/*/*~ $(AMALGA_OBJ) xgboost clean_all: clean cd $(DMLC_CORE); make clean; cd - diff --git a/include/xgboost/base.h b/include/xgboost/base.h index 479cfc7ca..64892e67c 100644 --- a/include/xgboost/base.h +++ b/include/xgboost/base.h @@ -22,7 +22,7 @@ * "[21:47:50] 6513x126 matrix with 143286 entries loaded from ../data/agaricus.txt.train" */ #ifndef XGBOOST_LOG_WITH_TIME -#define XGBOOST_LOG_WITH_TIME 0 +#define XGBOOST_LOG_WITH_TIME 1 #endif /*! \brief namespace of xgboo st*/ diff --git a/src/data/data.cc b/src/data/data.cc index 4f6f2a878..593ccf284 100644 --- a/src/data/data.cc +++ b/src/data/data.cc @@ -8,6 +8,8 @@ #include "./sparse_batch_page.h" #include "./simple_dmatrix.h" #include "./simple_csr_source.h" +#include "./sparse_page_source.h" +#include "./sparse_page_dmatrix.h" #include "../common/io.h" namespace xgboost { @@ -151,8 +153,11 @@ DMatrix* DMatrix::Create(dmlc::Parser* parser, source->CopyFrom(parser); return DMatrix::Create(std::move(source), cache_prefix); } else { - LOG(FATAL) << "external memory not yet implemented"; - return nullptr; + if (!data::SparsePageSource::CacheExist(cache_prefix)) { + data::SparsePageSource::Create(parser, cache_prefix); + } + std::unique_ptr source(new data::SparsePageSource(cache_prefix)); + return DMatrix::Create(std::move(source), cache_prefix); } } @@ -165,6 +170,10 @@ void DMatrix::SaveToLocalFile(const std::string& fname) { DMatrix* DMatrix::Create(std::unique_ptr&& source, const std::string& cache_prefix) { - return new data::SimpleDMatrix(std::move(source)); + if (cache_prefix.length() == 0) { + return new data::SimpleDMatrix(std::move(source)); + } else { + return new data::SparsePageDMatrix(std::move(source), cache_prefix); + } } } // namespace xgboost diff --git a/src/data/page_csr_source.h b/src/data/page_csr_source.h deleted file mode 100644 index 3012af564..000000000 --- a/src/data/page_csr_source.h +++ /dev/null @@ -1,260 +0,0 @@ -/*! - * Copyright (c) 2014 by Contributors - * \file page_dmatrix-inl.hpp - * row iterator based on sparse page - * \author Tianqi Chen - */ -#ifndef XGBOOST_IO_PAGE_DMATRIX_INL_HPP_ -#define XGBOOST_IO_PAGE_DMATRIX_INL_HPP_ - -#include -#include -#include -#include "../data.h" -#include "../utils/iterator.h" -#include "../utils/thread_buffer.h" -#include "./simple_fmatrix-inl.hpp" -#include "./sparse_batch_page.h" -#include "./page_fmatrix-inl.hpp" -#include "./libsvm_parser.h" - -namespace xgboost { -namespace io { -/*! \brief thread buffer iterator */ -class ThreadRowPageIterator: public utils::IIterator { - public: - ThreadRowPageIterator(void) { - itr.SetParam("buffer_size", "4"); - page_ = NULL; - base_rowid_ = 0; - } - virtual ~ThreadRowPageIterator(void) {} - virtual void Init(void) { - } - virtual void BeforeFirst(void) { - itr.BeforeFirst(); - base_rowid_ = 0; - } - virtual bool Next(void) { - if (!itr.Next(page_)) return false; - out_ = page_->GetRowBatch(base_rowid_); - base_rowid_ += out_.size; - return true; - } - virtual const RowBatch &Value(void) const { - return out_; - } - /*! \brief load and initialize the iterator with fi */ - inline void Load(const utils::FileStream &fi) { - itr.get_factory().SetFile(fi, 0); - itr.Init(); - this->BeforeFirst(); - } - - private: - // base row id - size_t base_rowid_; - // output data - RowBatch out_; - SparsePage *page_; - utils::ThreadBuffer itr; -}; - -/*! \brief data matrix using page */ -template -class DMatrixPageBase : public DataMatrix { - public: - DMatrixPageBase(void) : DataMatrix(kMagic) { - iter_ = new ThreadRowPageIterator(); - } - // virtual destructor - virtual ~DMatrixPageBase(void) { - // do not delete row iterator, since it is owned by fmat - // to be cleaned up in a more clear way - } - /*! \brief save a DataMatrix as DMatrixPage */ - inline static void Save(const char *fname_, const DataMatrix &mat, bool silent) { - std::string fname = fname_; - utils::FileStream fs(utils::FopenCheck(fname.c_str(), "wb")); - int magic = kMagic; - fs.Write(&magic, sizeof(magic)); - mat.info.SaveBinary(fs); - fs.Close(); - fname += ".row.blob"; - utils::IIterator *iter = mat.fmat()->RowIterator(); - utils::FileStream fbin(utils::FopenCheck(fname.c_str(), "wb")); - SparsePage page; - iter->BeforeFirst(); - while (iter->Next()) { - const RowBatch &batch = iter->Value(); - for (size_t i = 0; i < batch.size; ++i) { - page.Push(batch[i]); - if (page.MemCostBytes() >= kPageSize) { - page.Save(&fbin); page.Clear(); - } - } - } - if (page.data.size() != 0) page.Save(&fbin); - fbin.Close(); - if (!silent) { - utils::Printf("DMatrixPage: %lux%lu is saved to %s\n", - static_cast(mat.info.num_row()), // NOLINT(*) - static_cast(mat.info.num_col()), fname_); // NOLINT(*) - } - } - /*! \brief load and initialize the iterator with fi */ - inline void LoadBinary(utils::FileStream &fi, // NOLINT(*) - bool silent, - const char *fname_) { - this->set_cache_file(fname_); - std::string fname = fname_; - int tmagic; - utils::Check(fi.Read(&tmagic, sizeof(tmagic)) != 0, "invalid input file format"); - this->CheckMagic(tmagic); - this->info.LoadBinary(fi); - // load in the row data file - fname += ".row.blob"; - utils::FileStream fs(utils::FopenCheck(fname.c_str(), "rb")); - iter_->Load(fs); - if (!silent) { - utils::Printf("DMatrixPage: %lux%lu matrix is loaded", - static_cast(info.num_row()), // NOLINT(*) - static_cast(info.num_col())); // NOLINT(*) - if (fname_ != NULL) { - utils::Printf(" from %s\n", fname_); - } else { - utils::Printf("\n"); - } - if (info.group_ptr.size() != 0) { - utils::Printf("data contains %u groups\n", (unsigned)info.group_ptr.size() - 1); - } - } - } - /*! \brief save a LibSVM format file as DMatrixPage */ - inline void LoadText(const char *uri, - const char* cache_file, - bool silent, - bool loadsplit) { - if (!silent) { - utils::Printf("start generate text file from %s\n", uri); - } - int rank = 0, npart = 1; - if (loadsplit) { - rank = rabit::GetRank(); - npart = rabit::GetWorldSize(); - } - this->set_cache_file(cache_file); - std::string fname_row = std::string(cache_file) + ".row.blob"; - utils::FileStream fo(utils::FopenCheck(fname_row.c_str(), "wb")); - SparsePage page; - size_t bytes_write = 0; - double tstart = rabit::utils::GetTime(); - LibSVMParser parser( - dmlc::InputSplit::Create(uri, rank, npart, "text"), 16); - info.Clear(); - while (parser.Next()) { - const LibSVMPage &batch = parser.Value(); - size_t nlabel = info.labels.size(); - info.labels.resize(nlabel + batch.label.size()); - if (batch.label.size() != 0) { - std::memcpy(BeginPtr(info.labels) + nlabel, - BeginPtr(batch.label), - batch.label.size() * sizeof(float)); - } - page.Push(batch); - for (size_t i = 0; i < batch.data.size(); ++i) { - info.info.num_col = std::max(info.info.num_col, - static_cast(batch.data[i].index+1)); - } - if (page.MemCostBytes() >= kPageSize) { - bytes_write += page.MemCostBytes(); - page.Save(&fo); - page.Clear(); - double tdiff = rabit::utils::GetTime() - tstart; - if (!silent) { - utils::Printf("Writting to %s in %g MB/s, %lu MB written\n", - cache_file, (bytes_write >> 20UL) / tdiff, - (bytes_write >> 20UL)); - } - } - info.info.num_row += batch.label.size(); - } - if (page.data.size() != 0) { - page.Save(&fo); - } - fo.Close(); - iter_->Load(utils::FileStream(utils::FopenCheck(fname_row.c_str(), "rb"))); - // save data matrix - utils::FileStream fs(utils::FopenCheck(cache_file, "wb")); - int tmagic = kMagic; - fs.Write(&tmagic, sizeof(tmagic)); - this->info.SaveBinary(fs); - fs.Close(); - if (!silent) { - utils::Printf("DMatrixPage: %lux%lu is parsed from %s\n", - static_cast(info.num_row()), // NOLINT(*) - static_cast(info.num_col()), // NOLINT(*) - uri); - } - } - /*! \brief magic number used to identify DMatrix */ - static const int kMagic = TKMagic; - /*! \brief page size 32 MB */ - static const size_t kPageSize = 32UL << 20UL; - - protected: - virtual void set_cache_file(const std::string &cache_file) = 0; - virtual void CheckMagic(int tmagic) = 0; - /*! \brief row iterator */ - ThreadRowPageIterator *iter_; -}; - -class DMatrixPage : public DMatrixPageBase<0xffffab02> { - public: - DMatrixPage(void) { - fmat_ = new FMatrixPage(iter_, this->info); - } - virtual ~DMatrixPage(void) { - delete fmat_; - } - virtual IFMatrix *fmat(void) const { - return fmat_; - } - virtual void set_cache_file(const std::string &cache_file) { - fmat_->set_cache_file(cache_file); - } - virtual void CheckMagic(int tmagic) { - utils::Check(tmagic == DMatrixPageBase<0xffffab02>::kMagic || - tmagic == DMatrixPageBase<0xffffab03>::kMagic, - "invalid format,magic number mismatch"); - } - /*! \brief the real fmatrix */ - FMatrixPage *fmat_; -}; - -// mix of FMatrix S and DMatrix -// cost half of ram usually as DMatrixSimple -class DMatrixHalfRAM : public DMatrixPageBase<0xffffab03> { - public: - DMatrixHalfRAM(void) { - fmat_ = new FMatrixS(iter_, this->info); - } - virtual ~DMatrixHalfRAM(void) { - delete fmat_; - } - virtual IFMatrix *fmat(void) const { - return fmat_; - } - virtual void set_cache_file(const std::string &cache_file) { - } - virtual void CheckMagic(int tmagic) { - utils::Check(tmagic == DMatrixPageBase<0xffffab02>::kMagic || - tmagic == DMatrixPageBase<0xffffab03>::kMagic, - "invalid format,magic number mismatch"); - } - /*! \brief the real fmatrix */ - IFMatrix *fmat_; -}; -} // namespace io -} // namespace xgboost -#endif // XGBOOST_IO_PAGE_ROW_ITER_INL_HPP_ diff --git a/src/data/page_dmatrix.cc b/src/data/page_dmatrix.cc deleted file mode 100644 index d2b71e50f..000000000 --- a/src/data/page_dmatrix.cc +++ /dev/null @@ -1,360 +0,0 @@ -/*! - * Copyright (c) 2014 by Contributors - * \file page_fmatrix-inl.hpp - * col iterator based on sparse page - * \author Tianqi Chen - */ -#ifndef XGBOOST_IO_PAGE_FMATRIX_INL_HPP_ -#define XGBOOST_IO_PAGE_FMATRIX_INL_HPP_ - -#include -#include -#include - -namespace xgboost { -namespace io { -/*! \brief thread buffer iterator */ -class ThreadColPageIterator: public utils::IIterator { - public: - ThreadColPageIterator(void) { - itr.SetParam("buffer_size", "2"); - page_ = NULL; - } - virtual ~ThreadColPageIterator(void) {} - virtual void Init(void) {} - virtual void BeforeFirst(void) { - itr.BeforeFirst(); - } - virtual bool Next(void) { - if (!itr.Next(page_)) return false; - out_.col_index = BeginPtr(itr.get_factory().index_set()); - col_data_.resize(page_->offset.size() - 1, SparseBatch::Inst(NULL, 0)); - for (size_t i = 0; i < col_data_.size(); ++i) { - col_data_[i] = SparseBatch::Inst - (BeginPtr(page_->data) + page_->offset[i], - static_cast(page_->offset[i + 1] - page_->offset[i])); - } - out_.col_data = BeginPtr(col_data_); - out_.size = col_data_.size(); - return true; - } - virtual const ColBatch &Value(void) const { - return out_; - } - /*! \brief load and initialize the iterator with fi */ - inline void SetFile(const utils::FileStream &fi) { - itr.get_factory().SetFile(fi); - itr.Init(); - } - // set index set - inline void SetIndexSet(const std::vector &fset, bool load_all) { - itr.get_factory().SetIndexSet(fset, load_all); - } - - private: - // output data - ColBatch out_; - SparsePage *page_; - std::vector col_data_; - utils::ThreadBuffer itr; -}; - -struct ColConvertFactory { - inline bool Init(void) { - return true; - } - inline void Setup(float pkeep, - size_t max_row_perbatch, - size_t num_col, - utils::IIterator *iter, - std::vector *buffered_rowset, - const std::vector *enabled) { - pkeep_ = pkeep; - max_row_perbatch_ = max_row_perbatch; - num_col_ = num_col; - iter_ = iter; - buffered_rowset_ = buffered_rowset; - enabled_ = enabled; - } - inline SparsePage *Create(void) { - return new SparsePage(); - } - inline void FreeSpace(SparsePage *a) { - delete a; - } - inline void SetParam(const char *name, const char *val) {} - inline bool LoadNext(SparsePage *val) { - tmp_.Clear(); - size_t btop = buffered_rowset_->size(); - while (iter_->Next()) { - const RowBatch &batch = iter_->Value(); - for (size_t i = 0; i < batch.size; ++i) { - bst_uint ridx = static_cast(batch.base_rowid + i); - if (pkeep_ == 1.0f || random::SampleBinary(pkeep_)) { - buffered_rowset_->push_back(ridx); - tmp_.Push(batch[i]); - } - } - if (tmp_.MemCostBytes() >= kPageSize || - tmp_.Size() >= max_row_perbatch_) { - this->MakeColPage(tmp_, BeginPtr(*buffered_rowset_) + btop, - *enabled_, val); - return true; - } - } - if (tmp_.Size() != 0) { - this->MakeColPage(tmp_, BeginPtr(*buffered_rowset_) + btop, - *enabled_, val); - return true; - } else { - return false; - } - } - inline void Destroy(void) {} - inline void BeforeFirst(void) {} - inline void MakeColPage(const SparsePage &prow, - const bst_uint *ridx, - const std::vector &enabled, - SparsePage *pcol) { - pcol->Clear(); - int nthread; - #pragma omp parallel - { - nthread = omp_get_num_threads(); - int max_nthread = std::max(omp_get_num_procs() / 2 - 4, 1); - if (nthread > max_nthread) { - nthread = max_nthread; - } - } - pcol->Clear(); - utils::ParallelGroupBuilder - builder(&pcol->offset, &pcol->data); - builder.InitBudget(num_col_, nthread); - bst_omp_uint ndata = static_cast(prow.Size()); - #pragma omp parallel for schedule(static) num_threads(nthread) - for (bst_omp_uint i = 0; i < ndata; ++i) { - int tid = omp_get_thread_num(); - for (size_t j = prow.offset[i]; j < prow.offset[i+1]; ++j) { - const SparseBatch::Entry &e = prow.data[j]; - if (enabled[e.index]) { - builder.AddBudget(e.index, tid); - } - } - } - builder.InitStorage(); - #pragma omp parallel for schedule(static) num_threads(nthread) - for (bst_omp_uint i = 0; i < ndata; ++i) { - int tid = omp_get_thread_num(); - for (size_t j = prow.offset[i]; j < prow.offset[i+1]; ++j) { - const SparseBatch::Entry &e = prow.data[j]; - builder.Push(e.index, - SparseBatch::Entry(ridx[i], e.fvalue), - tid); - } - } - utils::Assert(pcol->Size() == num_col_, "inconsistent col data"); - // sort columns - bst_omp_uint ncol = static_cast(pcol->Size()); - #pragma omp parallel for schedule(dynamic, 1) num_threads(nthread) - for (bst_omp_uint i = 0; i < ncol; ++i) { - if (pcol->offset[i] < pcol->offset[i + 1]) { - std::sort(BeginPtr(pcol->data) + pcol->offset[i], - BeginPtr(pcol->data) + pcol->offset[i + 1], - SparseBatch::Entry::CmpValue); - } - } - } - // probability of keep - float pkeep_; - // maximum number of rows per batch - size_t max_row_perbatch_; - // number of columns - size_t num_col_; - // row batch iterator - utils::IIterator *iter_; - // buffered rowset - std::vector *buffered_rowset_; - // enabled marks - const std::vector *enabled_; - // internal temp cache - SparsePage tmp_; - /*! \brief page size 256 M */ - static const size_t kPageSize = 256 << 20UL; -}; -/*! - * \brief sparse matrix that support column access, CSC - */ -class FMatrixPage : public IFMatrix { - public: - typedef SparseBatch::Entry Entry; - /*! \brief constructor */ - FMatrixPage(utils::IIterator *iter, - const learner::MetaInfo &info) : info(info) { - this->iter_ = iter; - } - // destructor - virtual ~FMatrixPage(void) { - if (iter_ != NULL) delete iter_; - } - /*! \return whether column access is enabled */ - virtual bool HaveColAccess(void) const { - return col_size_.size() != 0; - } - /*! \brief get number of columns */ - virtual size_t NumCol(void) const { - utils::Check(this->HaveColAccess(), "NumCol:need column access"); - return col_size_.size(); - } - /*! \brief get number of buffered rows */ - virtual const std::vector &buffered_rowset(void) const { - return buffered_rowset_; - } - /*! \brief get column size */ - virtual size_t GetColSize(size_t cidx) const { - return col_size_[cidx]; - } - /*! \brief get column density */ - virtual float GetColDensity(size_t cidx) const { - size_t nmiss = num_buffered_row_ - (col_size_[cidx]); - return 1.0f - (static_cast(nmiss)) / num_buffered_row_; - } - virtual void InitColAccess(const std::vector &enabled, - float pkeep, size_t max_row_perbatch) { - if (this->HaveColAccess()) return; - if (TryLoadColData()) return; - this->InitColData(enabled, pkeep, max_row_perbatch); - utils::Check(TryLoadColData(), "failed on creating col.blob"); - } - /*! - * \brief get the row iterator associated with FMatrix - */ - virtual utils::IIterator* RowIterator(void) { - iter_->BeforeFirst(); - return iter_; - } - /*! - * \brief get the column based iterator - */ - virtual utils::IIterator* ColIterator(void) { - size_t ncol = this->NumCol(); - col_index_.resize(0); - for (size_t i = 0; i < ncol; ++i) { - col_index_.push_back(static_cast(i)); - } - col_iter_.SetIndexSet(col_index_, false); - col_iter_.BeforeFirst(); - return &col_iter_; - } - /*! - * \brief column based iterator - */ - virtual utils::IIterator *ColIterator(const std::vector &fset) { - size_t ncol = this->NumCol(); - col_index_.resize(0); - for (size_t i = 0; i < fset.size(); ++i) { - if (fset[i] < ncol) col_index_.push_back(fset[i]); - } - col_iter_.SetIndexSet(col_index_, false); - col_iter_.BeforeFirst(); - return &col_iter_; - } - // set the cache file name - inline void set_cache_file(const std::string &cache_file) { - col_data_name_ = std::string(cache_file) + ".col.blob"; - col_meta_name_ = std::string(cache_file) + ".col.meta"; - } - - protected: - inline bool TryLoadColData(void) { - std::FILE *fi = fopen64(col_meta_name_.c_str(), "rb"); - if (fi == NULL) return false; - utils::FileStream fs(fi); - LoadMeta(&fs); - fs.Close(); - fi = utils::FopenCheck(col_data_name_.c_str(), "rb"); - if (fi == NULL) return false; - col_iter_.SetFile(utils::FileStream(fi)); - return true; - } - inline void LoadMeta(utils::IStream *fi) { - utils::Check(fi->Read(&num_buffered_row_, sizeof(num_buffered_row_)) != 0, - "invalid col.blob file"); - utils::Check(fi->Read(&buffered_rowset_), - "invalid col.blob file"); - utils::Check(fi->Read(&col_size_), - "invalid col.blob file"); - } - inline void SaveMeta(utils::IStream *fo) { - fo->Write(&num_buffered_row_, sizeof(num_buffered_row_)); - fo->Write(buffered_rowset_); - fo->Write(col_size_); - } - /*! - * \brief initialize column data - * \param enabled the list of enabled columns - * \param pkeep probability to keep a row - * \param max_row_perbatch maximum row per batch - */ - inline void InitColData(const std::vector &enabled, - float pkeep, size_t max_row_perbatch) { - // clear rowset - buffered_rowset_.clear(); - col_size_.resize(info.num_col()); - std::fill(col_size_.begin(), col_size_.end(), 0); - utils::FileStream fo; - fo = utils::FileStream(utils::FopenCheck(col_data_name_.c_str(), "wb")); - iter_->BeforeFirst(); - double tstart = rabit::utils::GetTime(); - size_t bytes_write = 0; - utils::ThreadBuffer citer; - citer.SetParam("buffer_size", "2"); - citer.get_factory().Setup(pkeep, max_row_perbatch, info.num_col(), - iter_, &buffered_rowset_, &enabled); - citer.Init(); - SparsePage *pcol; - while (citer.Next(pcol)) { - for (size_t i = 0; i < pcol->Size(); ++i) { - col_size_[i] += pcol->offset[i + 1] - pcol->offset[i]; - } - pcol->Save(&fo); - size_t spage = pcol->MemCostBytes(); - bytes_write += spage; - double tnow = rabit::utils::GetTime(); - double tdiff = tnow - tstart; - utils::Printf("Writing to %s in %g MB/s, %lu MB written\n", - col_data_name_.c_str(), - (bytes_write >> 20UL) / tdiff, - (bytes_write >> 20UL)); - } - fo.Close(); - num_buffered_row_ = buffered_rowset_.size(); - fo = utils::FileStream(utils::FopenCheck(col_meta_name_.c_str(), "wb")); - this->SaveMeta(&fo); - fo.Close(); - } - - private: - /*! \brief page size 256 M */ - static const size_t kPageSize = 256 << 20UL; - // shared meta info with DMatrix - const learner::MetaInfo &info; - // row iterator - utils::IIterator *iter_; - /*! \brief column based data file name */ - std::string col_data_name_; - /*! \brief column based data file name */ - std::string col_meta_name_; - /*! \brief list of row index that are buffered */ - std::vector buffered_rowset_; - // number of buffered rows - size_t num_buffered_row_; - // count for column data - std::vector col_size_; - // internal column index for output - std::vector col_index_; - // internal thread backed col iterator - ThreadColPageIterator col_iter_; -}; -} // namespace io -} // namespace xgboost -#endif // XGBOOST_IO_PAGE_FMATRIX_INL_HPP_ diff --git a/src/data/simple_csr_source.h b/src/data/simple_csr_source.h index a9faa98e3..1e7adb0b2 100644 --- a/src/data/simple_csr_source.h +++ b/src/data/simple_csr_source.h @@ -15,7 +15,6 @@ namespace xgboost { -/*! \brief namespace of internal data structures*/ namespace data { /*! * \brief The simplest form of data holder, can be used to create DMatrix. diff --git a/src/data/sparse_batch_page.h b/src/data/sparse_batch_page.h index b19504bda..145495e29 100644 --- a/src/data/sparse_batch_page.h +++ b/src/data/sparse_batch_page.h @@ -13,6 +13,7 @@ #include #include #include +#include namespace xgboost { namespace data { @@ -163,6 +164,24 @@ class SparsePage { offset[i + begin] = top + batch.ind_ptr[i + 1] - batch.ind_ptr[0]; } } + /*! + * \brief Push row block into the page. + * \param batch the row batch. + */ + inline void Push(const dmlc::RowBlock& batch) { + data.reserve(data.size() + batch.offset[batch.size] - batch.offset[0]); + offset.reserve(offset.size() + batch.size); + CHECK(batch.index != nullptr); + for (size_t i = 0; i < batch.size; ++i) { + offset.push_back(offset.back() + batch.offset[i + 1] - batch.offset[i]); + } + for (size_t i = batch.offset[0]; i < batch.offset[batch.size]; ++i) { + uint32_t index = batch.index[i]; + bst_float fvalue = batch.value == nullptr ? 1.0f : batch.value[i]; + data.push_back(SparseBatch::Entry(index, fvalue)); + } + CHECK_EQ(offset.back(), data.size()); + } /*! * \brief Push a sparse page * \param batch the row page diff --git a/src/data/sparse_page_dmatrix.cc b/src/data/sparse_page_dmatrix.cc new file mode 100644 index 000000000..ba4f566f5 --- /dev/null +++ b/src/data/sparse_page_dmatrix.cc @@ -0,0 +1,253 @@ +/*! + * Copyright 2014 by Contributors + * \file sparse_page_dmatrix.cc + * \brief The external memory version of Page Iterator. + * \author Tianqi Chen + */ +#include +#include +#include +#include +#include "./sparse_page_dmatrix.h" +#include "../common/random.h" +#include "../common/group_data.h" + +namespace xgboost { +namespace data { + +SparsePageDMatrix::ColPageIter::ColPageIter(std::unique_ptr&& fi) + : fi_(std::move(fi)), page_(nullptr) { + + + load_all_ = false; + prefetcher_.Init([this](SparsePage** dptr) { + if (*dptr == nullptr) { + *dptr = new SparsePage(); + } + if (load_all_) { + return (*dptr)->Load(fi_.get()); + } else { + return (*dptr)->Load(fi_.get(), index_set_); + } + }, [this] () { + fi_->Seek(0); + index_set_ = set_index_set_; + load_all_ = set_load_all_; + }); +} + +SparsePageDMatrix::ColPageIter::~ColPageIter() { + delete page_; +} + +bool SparsePageDMatrix::ColPageIter::Next() { + if (page_ != nullptr) { + prefetcher_.Recycle(&page_); + } + if (prefetcher_.Next(&page_)) { + out_.col_index = dmlc::BeginPtr(index_set_); + col_data_.resize(page_->offset.size() - 1, SparseBatch::Inst(nullptr, 0)); + for (size_t i = 0; i < col_data_.size(); ++i) { + col_data_[i] = SparseBatch::Inst + (dmlc::BeginPtr(page_->data) + page_->offset[i], + static_cast(page_->offset[i + 1] - page_->offset[i])); + } + out_.col_data = dmlc::BeginPtr(col_data_); + out_.size = col_data_.size(); + return true; + } else { + return false; + } +} + +void SparsePageDMatrix::ColPageIter::Init(const std::vector& index_set, + bool load_all) { + set_index_set_ = index_set; + set_load_all_ = load_all; + std::sort(set_index_set_.begin(), set_index_set_.end()); + + this->BeforeFirst(); +} + +dmlc::DataIter* SparsePageDMatrix::ColIterator() { + CHECK(col_iter_.get() != nullptr); + std::vector col_index; + size_t ncol = this->info().num_col; + for (size_t i = 0; i < ncol; ++i) { + col_index.push_back(static_cast(i)); + } + col_iter_->Init(col_index, true); + return col_iter_.get(); +} + +dmlc::DataIter* SparsePageDMatrix:: +ColIterator(const std::vector& fset) { + CHECK(col_iter_.get() != nullptr); + std::vector col_index; + size_t ncol = this->info().num_col; + for (size_t i = 0; i < fset.size(); ++i) { + if (fset[i] < ncol) { + col_index.push_back(fset[i]); + } + } + col_iter_->Init(col_index, false); + return col_iter_.get(); +} + + +bool SparsePageDMatrix::TryInitColData() { + // load meta data. + { + std::string col_meta_name = cache_prefix_ + ".col.meta"; + std::unique_ptr fmeta( + dmlc::Stream::Create(col_meta_name.c_str(), "r", true)); + if (fmeta.get() == nullptr) return false; + CHECK(fmeta->Read(&buffered_rowset_)) << "invalid col.meta file"; + CHECK(fmeta->Read(&col_size_)) << "invalid col.meta file"; + } + // load real data + { + std::string col_data_name = cache_prefix_ + ".col.page"; + std::unique_ptr fdata( + dmlc::SeekStream::CreateForRead(col_data_name.c_str(), true)); + if (fdata.get() == nullptr) return false; + col_iter_.reset(new ColPageIter(std::move(fdata))); + } + return true; +} + +void SparsePageDMatrix::InitColAccess(const std::vector& enabled, + float pkeep, + size_t max_row_perbatch) { + if (HaveColAccess()) return; + if (TryInitColData()) return; + + const MetaInfo& info = this->info(); + if (max_row_perbatch == std::numeric_limits::max()) { + max_row_perbatch = kMaxRowPerBatch; + } + buffered_rowset_.clear(); + col_size_.resize(info.num_col); + std::fill(col_size_.begin(), col_size_.end(), 0); + // make the sparse page. + dmlc::ThreadedIter cmaker; + SparsePage tmp; + dmlc::DataIter* iter = this->RowIterator(); + std::bernoulli_distribution coin_flip(pkeep); + + auto& rnd = common::GlobalRandom(); + + // function to create the page. + auto make_col_batch = [&] ( + const SparsePage& prow, + const bst_uint* ridx, + SparsePage **dptr) { + if (*dptr == nullptr) { + *dptr = new SparsePage(); + } + SparsePage* pcol = *dptr; + pcol->Clear(); + int nthread; + #pragma omp parallel + { + nthread = omp_get_num_threads(); + nthread = std::max(nthread, std::max(omp_get_num_procs() / 2 - 1, 1)); + } + pcol->Clear(); + common::ParallelGroupBuilder + builder(&pcol->offset, &pcol->data); + builder.InitBudget(info.num_col, nthread); + bst_omp_uint ndata = static_cast(prow.Size()); + #pragma omp parallel for schedule(static) num_threads(nthread) + for (bst_omp_uint i = 0; i < ndata; ++i) { + int tid = omp_get_thread_num(); + for (size_t j = prow.offset[i]; j < prow.offset[i+1]; ++j) { + const SparseBatch::Entry &e = prow.data[j]; + if (enabled[e.index]) { + builder.AddBudget(e.index, tid); + } + } + } + builder.InitStorage(); + #pragma omp parallel for schedule(static) num_threads(nthread) + for (bst_omp_uint i = 0; i < ndata; ++i) { + int tid = omp_get_thread_num(); + for (size_t j = prow.offset[i]; j < prow.offset[i+1]; ++j) { + const SparseBatch::Entry &e = prow.data[j]; + builder.Push(e.index, + SparseBatch::Entry(ridx[i], e.fvalue), + tid); + } + } + CHECK_EQ(pcol->Size(), info.num_col); + // sort columns + bst_omp_uint ncol = static_cast(pcol->Size()); + #pragma omp parallel for schedule(dynamic, 1) num_threads(nthread) + for (bst_omp_uint i = 0; i < ncol; ++i) { + if (pcol->offset[i] < pcol->offset[i + 1]) { + std::sort(dmlc::BeginPtr(pcol->data) + pcol->offset[i], + dmlc::BeginPtr(pcol->data) + pcol->offset[i + 1], + SparseBatch::Entry::CmpValue); + } + } + }; + + auto make_next_col = [&] (SparsePage** dptr) { + tmp.Clear(); + size_t btop = buffered_rowset_.size(); + while (iter->Next()) { + const RowBatch& batch = iter->Value(); + for (size_t i = 0; i < batch.size; ++i) { + bst_uint ridx = static_cast(batch.base_rowid + i); + if (pkeep == 1.0f || coin_flip(rnd)) { + buffered_rowset_.push_back(ridx); + tmp.Push(batch[i]); + } + } + if (tmp.MemCostBytes() >= kPageSize || + tmp.Size() >= max_row_perbatch) { + make_col_batch(tmp, dmlc::BeginPtr(buffered_rowset_) + btop, dptr); + return true; + } + } + if (tmp.Size() != 0) { + make_col_batch(tmp, dmlc::BeginPtr(buffered_rowset_) + btop, dptr); + return true; + } else { + return false; + } + }; + + cmaker.Init(make_next_col, []() {}); + + std::string col_data_name = cache_prefix_ + ".col.page"; + std::unique_ptr fo(dmlc::Stream::Create(col_data_name.c_str(), "w")); + double tstart = dmlc::GetTime(); + size_t bytes_write = 0; + SparsePage* pcol = nullptr; + + while (cmaker.Next(&pcol)) { + for (size_t i = 0; i < pcol->Size(); ++i) { + col_size_[i] += pcol->offset[i + 1] - pcol->offset[i]; + } + pcol->Save(fo.get()); + size_t spage = pcol->MemCostBytes(); + bytes_write += spage; + double tdiff = dmlc::GetTime() - tstart; + LOG(CONSOLE) << "Writing to " << col_data_name + << " in " << ((bytes_write >> 20UL) / tdiff) << " MB/s, " + << (bytes_write >> 20UL) << " MB writen"; + cmaker.Recycle(&pcol); + } + // save meta data + std::string col_meta_name = cache_prefix_ + ".col.meta"; + fo.reset(dmlc::Stream::Create(col_meta_name.c_str(), "w")); + fo->Write(buffered_rowset_); + fo->Write(col_size_); + fo.reset(nullptr); + // initialize column data + CHECK(TryInitColData()); +} + +} // namespace data +} // namespace xgboost diff --git a/src/data/sparse_page_dmatrix.h b/src/data/sparse_page_dmatrix.h new file mode 100644 index 000000000..91e1dcef1 --- /dev/null +++ b/src/data/sparse_page_dmatrix.h @@ -0,0 +1,128 @@ +/*! + * Copyright 2015 by Contributors + * \file simple_dmatrix.h + * \brief In-memory version of DMatrix. + * \author Tianqi Chen + */ +#ifndef XGBOOST_SPARSE_PAGE_DMATRIX_H_ +#define XGBOOST_SPARSE_PAGE_DMATRIX_H_ + +#include +#include +#include +#include +#include +#include +#include "./sparse_batch_page.h" + +namespace xgboost { +namespace data { + +class SparsePageDMatrix : public DMatrix { + public: + explicit SparsePageDMatrix(std::unique_ptr&& source, + const std::string& cache_prefix) + : source_(std::move(source)), + cache_prefix_(cache_prefix) {} + + MetaInfo& info() override { + return source_->info; + } + + const MetaInfo& info() const override { + return source_->info; + } + + dmlc::DataIter* RowIterator() override { + dmlc::DataIter* iter = source_.get(); + iter->BeforeFirst(); + return iter; + } + + bool HaveColAccess() const override { + return col_iter_.get() != nullptr; + } + + const std::vector& buffered_rowset() const override { + return buffered_rowset_; + } + + size_t GetColSize(size_t cidx) const { + return col_size_[cidx]; + } + + float GetColDensity(size_t cidx) const override { + size_t nmiss = buffered_rowset_.size() - col_size_[cidx]; + return 1.0f - (static_cast(nmiss)) / buffered_rowset_.size(); + } + + bool SingleColBlock() const override { + return false; + } + + dmlc::DataIter* ColIterator() override; + + dmlc::DataIter* ColIterator(const std::vector& fset) override; + + void InitColAccess(const std::vector& enabled, + float subsample, + size_t max_row_perbatch) override; + + /*! \brief page size 256 MB */ + static const size_t kPageSize = 256UL << 20UL; + /*! \brief Maximum number of rows per batch. */ + static const size_t kMaxRowPerBatch = 32UL << 10UL; + + private: + // declare the column batch iter. + class ColPageIter : public dmlc::DataIter { + public: + explicit ColPageIter(std::unique_ptr&& fi); + virtual ~ColPageIter(); + void BeforeFirst() override { + prefetcher_.BeforeFirst(); + } + const ColBatch &Value() const override { + return out_; + } + bool Next() override; + // initialize the column iterator with the specified index set. + void Init(const std::vector& index_set, bool load_all); + + private: + // data file pointer. + std::unique_ptr fi_; + // The index set to be loaded. + std::vector index_set_; + // The index set by the outsiders + std::vector set_index_set_; + // whether to load data dataset. + bool set_load_all_, load_all_; + // data prefetcher. + dmlc::ThreadedIter prefetcher_; + // the temp page. + SparsePage* page_; + // temporal space for batch + ColBatch out_; + // the pointer data. + std::vector col_data_; + }; + /*! + * \brief Try to intitialize column data. + * \return true if data already exists, false if they do not. + */ + bool TryInitColData(); + // source data pointer. + std::unique_ptr source_; + // the cache prefix + std::string cache_prefix_; + /*! \brief list of row index that are buffered */ + std::vector buffered_rowset_; + // count for column data + std::vector col_size_; + // internal column iter. + std::unique_ptr col_iter_; +}; +} // namespace data +} // namespace xgboost +#endif // XGBOOST_SPARSE_PAGE_DMATRIX_H_ diff --git a/src/data/sparse_page_source.cc b/src/data/sparse_page_source.cc new file mode 100644 index 000000000..4ee6f17c4 --- /dev/null +++ b/src/data/sparse_page_source.cc @@ -0,0 +1,157 @@ +/*! + * Copyright 2015 by Contributors + * \file sparse_page_source.cc + */ +#include +#include +#include +#include +#include "./sparse_page_source.h" + +namespace xgboost { +namespace data { + +SparsePageSource::SparsePageSource(const std::string& cache_prefix) + : base_rowid_(0), page_(nullptr) { + // read in the info files. + { + std::string name_info = cache_prefix; + std::unique_ptr finfo(dmlc::Stream::Create(name_info.c_str(), "r")); + int tmagic; + CHECK_EQ(finfo->Read(&tmagic, sizeof(tmagic)), sizeof(tmagic)); + this->info.LoadBinary(finfo.get()); + } + // read in the cache files. + std::string name_row = cache_prefix + ".row.page"; + fi_.reset(dmlc::SeekStream::CreateForRead(name_row.c_str())); + prefetcher_.Init([this] (SparsePage** dptr) { + if (*dptr == nullptr) { + *dptr = new SparsePage(); + } + return (*dptr)->Load(fi_.get()); + }, [this] () { fi_->Seek(0); }); +} + +SparsePageSource::~SparsePageSource() { + delete page_; +} + +bool SparsePageSource::Next() { + if (page_ != nullptr) { + prefetcher_.Recycle(&page_); + } + if (prefetcher_.Next(&page_)) { + batch_ = page_->GetRowBatch(base_rowid_); + base_rowid_ += batch_.size; + return true; + } else { + return false; + } +} + +void SparsePageSource::BeforeFirst() { + base_rowid_ = 0; + prefetcher_.BeforeFirst(); +} + +const RowBatch& SparsePageSource::Value() const { + return batch_; +} + +bool SparsePageSource::CacheExist(const std::string& cache_prefix) { + std::string name_info = cache_prefix; + std::string name_row = cache_prefix + ".row.page"; + std::unique_ptr finfo(dmlc::Stream::Create(name_info.c_str(), "r", true)); + std::unique_ptr frow(dmlc::Stream::Create(name_row.c_str(), "r", true)); + return finfo.get() != nullptr && frow.get() != nullptr; +} + +void SparsePageSource::Create(dmlc::Parser* src, + const std::string& cache_prefix) { + // read in the info files. + std::string name_info = cache_prefix; + std::string name_row = cache_prefix + ".row.page"; + std::unique_ptr fo(dmlc::Stream::Create(name_row.c_str(), "w")); + MetaInfo info; + SparsePage page; + size_t bytes_write = 0; + double tstart = dmlc::GetTime(); + + while (src->Next()) { + const dmlc::RowBlock& batch = src->Value(); + if (batch.label != nullptr) { + info.labels.insert(info.labels.end(), batch.label, batch.label + batch.size); + } + if (batch.weight != nullptr) { + info.weights.insert(info.weights.end(), batch.weight, batch.weight + batch.size); + } + info.num_row += batch.size; + info.num_nonzero += batch.offset[batch.size] - batch.offset[0]; + for (size_t i = batch.offset[0]; i < batch.offset[batch.size]; ++i) { + uint32_t index = batch.index[i]; + info.num_col = std::max(info.num_col, + static_cast(index + 1)); + } + page.Push(batch); + if (page.MemCostBytes() >= kPageSize) { + bytes_write += page.MemCostBytes(); + page.Save(fo.get()); + page.Clear(); + double tdiff = dmlc::GetTime() - tstart; + LOG(CONSOLE) << "Writing to " << name_row << " in " + << ((bytes_write >> 20UL) / tdiff) << " MB/s, " + << (bytes_write >> 20UL) << " written"; + } + } + + if (page.data.size() != 0) { + page.Save(fo.get()); + } + + fo.reset(dmlc::Stream::Create(name_info.c_str(), "w")); + int tmagic = kMagic; + fo->Write(&tmagic, sizeof(tmagic)); + info.SaveBinary(fo.get()); + + LOG(CONSOLE) << "SparsePageSource: Finished writing to " << cache_prefix; +} + +void SparsePageSource::Create(DMatrix* src, + const std::string& cache_prefix) { + // read in the info files. + std::string name_info = cache_prefix; + std::string name_row = cache_prefix + ".row.page"; + std::unique_ptr fo(dmlc::Stream::Create(name_row.c_str(), "w")); + + SparsePage page; + size_t bytes_write = 0; + double tstart = dmlc::GetTime(); + dmlc::DataIter* iter = src->RowIterator(); + + while (iter->Next()) { + page.Push(iter->Value()); + if (page.MemCostBytes() >= kPageSize) { + bytes_write += page.MemCostBytes(); + page.Save(fo.get()); + page.Clear(); + double tdiff = dmlc::GetTime() - tstart; + LOG(CONSOLE) << "Writing to " << name_row << " in " + << ((bytes_write >> 20UL) / tdiff) << " MB/s, " + << (bytes_write >> 20UL) << " written"; + } + } + + if (page.data.size() != 0) { + page.Save(fo.get()); + } + + fo.reset(dmlc::Stream::Create(name_info.c_str(), "w")); + int tmagic = kMagic; + fo->Write(&tmagic, sizeof(tmagic)); + src->info().SaveBinary(fo.get()); + + LOG(CONSOLE) << "SparsePageSource: Finished writing to " << cache_prefix; +} + +} // namespace data +} // namespace xgboost diff --git a/src/data/sparse_page_source.h b/src/data/sparse_page_source.h new file mode 100644 index 000000000..cdbfd9020 --- /dev/null +++ b/src/data/sparse_page_source.h @@ -0,0 +1,83 @@ +/*! + * Copyright (c) 2014 by Contributors + * \file page_csr_source.h + * External memory data source, saved with sparse_batch_page binary format. + * \author Tianqi Chen + */ +#ifndef XGBOOST_DATA_SPARSE_PAGE_SOURCE_H_ +#define XGBOOST_DATA_SPARSE_PAGE_SOURCE_H_ + +#include +#include +#include +#include +#include +#include +#include "./sparse_batch_page.h" + +namespace xgboost { +namespace data { +/*! + * \brief External memory data source. + * \code + * std::unique_ptr source(new SimpleCSRSource(cache_prefix)); + * // add data to source + * DMatrix* dmat = DMatrix::Create(std::move(source)); + * \encode + */ +class SparsePageSource : public DataSource { + public: + /*! + * \brief Create source from cache files the cache_prefix. + * \param cache_prefix The prefix of cache we want to solve. + */ + explicit SparsePageSource(const std::string& cache_prefix) noexcept(false); + /*! \brief destructor */ + virtual ~SparsePageSource(); + // implement Next + bool Next() override; + // implement BeforeFirst + void BeforeFirst() override; + // implement Value + const RowBatch& Value() const override; + /*! + * \brief Create source by taking data from parser. + * \param src source parser. + * \param cache_prefix The cache_prefix of cache file location. + */ + static void Create(dmlc::Parser* src, + const std::string& cache_prefix); + /*! + * \brief Create source cache by copy content from DMatrix. + * \param cache_prefix The cache_prefix of cache file location. + */ + static void Create(DMatrix* src, + const std::string& cache_prefix); + /*! + * \brief Check if the cache file already exists. + * \param cache_prefix The cache prefix of files. + * \return Whether cache file already exists. + */ + static bool CacheExist(const std::string& cache_prefix); + /*! \brief page size 32 MB */ + static const size_t kPageSize = 32UL << 20UL; + /*! \brief magic number used to identify Page */ + static const int kMagic = 0xffffab02; + + private: + /*! \brief number of rows */ + size_t base_rowid_; + /*! \brief temp data. */ + RowBatch batch_; + /*! \brief page currently on hold. */ + SparsePage *page_; + /*! \brief The cache predix of the dataset. */ + std::string cache_prefix_; + /*! \brief file pointer to the row blob file. */ + std::unique_ptr fi_; + /*! \brief internal prefetcher. */ + dmlc::ThreadedIter prefetcher_; +}; +} // namespace data +} // namespace xgboost +#endif // XGBOOST_DATA_SPARSE_PAGE_SOURCE_H_