From e8f6f3b541c2947698db78849f5be39ebd2e2052 Mon Sep 17 00:00:00 2001 From: tqchen Date: Wed, 15 Apr 2015 15:15:23 -0700 Subject: [PATCH] some initial try of cachefiles --- .gitignore | 2 +- R-package/src/Makevars | 4 +- R-package/src/Makevars.win | 4 +- R-package/src/xgboost_R.cpp | 11 ++ R-package/src/xgboost_R.h | 9 + src/io/io.cpp | 62 +++--- src/io/io.h | 6 +- src/io/page_dmatrix-inl.hpp | 98 +++++++-- src/io/page_fmatrix-inl.hpp | 382 ------------------------------------ src/utils/thread_buffer.h | 2 +- wrapper/xgboost.py | 18 +- wrapper/xgboost_wrapper.cpp | 5 + wrapper/xgboost_wrapper.h | 13 ++ 13 files changed, 185 insertions(+), 431 deletions(-) delete mode 100644 src/io/page_fmatrix-inl.hpp diff --git a/.gitignore b/.gitignore index 8b2c65f62..35061b857 100644 --- a/.gitignore +++ b/.gitignore @@ -54,7 +54,7 @@ train* rabit .Rbuildignore R-package.Rproj - +*.cache* R-package/inst R-package/src diff --git a/R-package/src/Makevars b/R-package/src/Makevars index 406f59517..d0eb23b25 100644 --- a/R-package/src/Makevars +++ b/R-package/src/Makevars @@ -2,7 +2,7 @@ PKGROOT=../../ # _*_ mode: Makefile; _*_ PKG_CPPFLAGS= -DXGBOOST_CUSTOMIZE_MSG_ -DXGBOOST_CUSTOMIZE_PRNG_ -DXGBOOST_STRICT_CXX98_ -DRABIT_CUSTOMIZE_MSG_ -DRABIT_STRICT_CXX98_ -I$(PKGROOT) -PKG_CXXFLAGS= $(SHLIB_OPENMP_CFLAGS) -PKG_LIBS = $(SHLIB_OPENMP_CFLAGS) +PKG_CXXFLAGS= $(SHLIB_OPENMP_CFLAGS) $(SHLIB_PTHREAD_FLAGS) +PKG_LIBS = $(SHLIB_OPENMP_CFLAGS) $(SHLIB_PTHREAD_FLAGS) OBJECTS= xgboost_R.o xgboost_assert.o $(PKGROOT)/wrapper/xgboost_wrapper.o $(PKGROOT)/src/io/io.o $(PKGROOT)/src/gbm/gbm.o $(PKGROOT)/src/tree/updater.o $(PKGROOT)/subtree/rabit/src/engine_empty.o $(PKGROOT)/src/io/dmlc_simple.o diff --git a/R-package/src/Makevars.win b/R-package/src/Makevars.win index e3c98d2b7..56b550e7f 100644 --- a/R-package/src/Makevars.win +++ b/R-package/src/Makevars.win @@ -13,7 +13,7 @@ xgblib: cp -r ../../subtree . PKG_CPPFLAGS= -DXGBOOST_CUSTOMIZE_MSG_ -DXGBOOST_CUSTOMIZE_PRNG_ -DXGBOOST_STRICT_CXX98_ -DRABIT_CUSTOMIZE_MSG_ -DRABIT_STRICT_CXX98_ -I$(PKGROOT) -I../.. -PKG_CXXFLAGS= $(SHLIB_OPENMP_CFLAGS) -PKG_LIBS = $(SHLIB_OPENMP_CFLAGS) +PKG_CXXFLAGS= $(SHLIB_OPENMP_CFLAGS) $(SHLIB_PTHREAD_FLAGS) +PKG_LIBS = $(SHLIB_OPENMP_CFLAGS) $(SHLIB_PTHREAD_FLAGS) OBJECTS= xgboost_R.o xgboost_assert.o $(PKGROOT)/wrapper/xgboost_wrapper.o $(PKGROOT)/src/io/io.o $(PKGROOT)/src/gbm/gbm.o $(PKGROOT)/src/tree/updater.o $(PKGROOT)/subtree/rabit/src/engine_empty.o $(PKGROOT)/src/io/dmlc_simple.o $(OBJECTS) : xgblib diff --git a/R-package/src/xgboost_R.cpp b/R-package/src/xgboost_R.cpp index a2ca9536f..f67462564 100644 --- a/R-package/src/xgboost_R.cpp +++ b/R-package/src/xgboost_R.cpp @@ -76,6 +76,17 @@ extern "C" { _WrapperEnd(); return ret; } + SEXP XGDMatrixCreateCache_R(SEXP fname, SEXP cache_file, SEXP silent) { + _WrapperBegin(); + void *handle = XGDMatrixCreateCache(CHAR(asChar(fname)), + CHAR(asChar(cache_file)), + asInteger(silent)); + SEXP ret = PROTECT(R_MakeExternalPtr(handle, R_NilValue, R_NilValue)); + R_RegisterCFinalizerEx(ret, _DMatrixFinalizer, TRUE); + UNPROTECT(1); + _WrapperEnd(); + return ret; + } SEXP XGDMatrixCreateFromMat_R(SEXP mat, SEXP missing) { _WrapperBegin(); diff --git a/R-package/src/xgboost_R.h b/R-package/src/xgboost_R.h index 61b84a80e..1314cef15 100644 --- a/R-package/src/xgboost_R.h +++ b/R-package/src/xgboost_R.h @@ -24,6 +24,15 @@ extern "C" { * \return a loaded data matrix */ SEXP XGDMatrixCreateFromFile_R(SEXP fname, SEXP silent); + /*! + * \brief load a cached DMatrix, this is backed by several cache_files + * and usually cost less memory + * \param fname the name of the file, can be a cached buffer or text + * \param cache_file the name of cached file + * \param silent whether print messages during loading + * \return a loaded data matrix + */ + SEXP XGDMatrixCreateCache_R(SEXP fname, SEXP cache_file, SEXP silent); /*! * \brief create matrix content from dense matrix * This assumes the matrix is stored in column major format diff --git a/src/io/io.cpp b/src/io/io.cpp index 1535a9e42..8d6856b92 100644 --- a/src/io/io.cpp +++ b/src/io/io.cpp @@ -6,37 +6,53 @@ #include "../utils/io.h" #include "../utils/utils.h" #include "simple_dmatrix-inl.hpp" -// implements data loads using dmatrix simple for now +#include "page_dmatrix-inl.hpp" namespace xgboost { namespace io { -DataMatrix* LoadDataMatrix(const char *fname, bool silent, - bool savebuffer, bool loadsplit) { - if (!std::strcmp(fname, "stdin") || - !std::strncmp(fname, "s3://", 5) || - !std::strncmp(fname, "hdfs://", 7) || - loadsplit) { - DMatrixSimple *dmat = new DMatrixSimple(); - dmat->LoadText(fname, silent, loadsplit); - return dmat; - } - int magic; - utils::FileStream fs(utils::FopenCheck(fname, "rb")); - utils::Check(fs.Read(&magic, sizeof(magic)) != 0, "invalid input file format"); - fs.Seek(0); - if (magic == DMatrixSimple::kMagic) { - DMatrixSimple *dmat = new DMatrixSimple(); - dmat->LoadBinary(fs, silent, fname); +DataMatrix* LoadDataMatrix(const char *fname, + bool silent, + bool savebuffer, + bool loadsplit, + const char *cache_file) { + if (cache_file == NULL) { + if (!std::strcmp(fname, "stdin") || + !std::strncmp(fname, "s3://", 5) || + !std::strncmp(fname, "hdfs://", 7) || + loadsplit) { + DMatrixSimple *dmat = new DMatrixSimple(); + dmat->LoadText(fname, silent, loadsplit); + return dmat; + } + int magic; + utils::FileStream fs(utils::FopenCheck(fname, "rb")); + utils::Check(fs.Read(&magic, sizeof(magic)) != 0, "invalid input file format"); + fs.Seek(0); + if (magic == DMatrixSimple::kMagic) { + DMatrixSimple *dmat = new DMatrixSimple(); + dmat->LoadBinary(fs, silent, fname); + fs.Close(); + return dmat; + } fs.Close(); + DMatrixSimple *dmat = new DMatrixSimple(); + dmat->CacheLoad(fname, silent, savebuffer); + return dmat; + } else { + if (!strcmp(fname, cache_file)) { + DMatrixPage *dmat = new DMatrixPage(); + utils::FileStream fs(utils::FopenCheck(fname, "rb")); + dmat->LoadBinary(fs, silent, fname); + fs.Close(); + return dmat; + } + DMatrixPage *dmat = new DMatrixPage(); + dmat->LoadText(fname, cache_file, false, loadsplit); return dmat; } - fs.Close(); - DMatrixSimple *dmat = new DMatrixSimple(); - dmat->CacheLoad(fname, silent, savebuffer); - return dmat; } -void SaveDataMatrix(const DataMatrix &dmat, const char *fname, bool silent) { +void SaveDataMatrix(const DataMatrix &dmat, const char *fname, bool silent) { if (dmat.magic == DMatrixSimple::kMagic) { const DMatrixSimple *p_dmat = static_cast(&dmat); p_dmat->SaveBinary(fname, silent); diff --git a/src/io/io.h b/src/io/io.h index 2de76dd38..ed075977c 100644 --- a/src/io/io.h +++ b/src/io/io.h @@ -21,12 +21,16 @@ typedef learner::DMatrix DataMatrix; * \param savebuffer whether temporal buffer the file if the file is in text format * \param loadsplit whether we only load a split of input files * such that each worker node get a split of the data + * \param cache_file name of cache_file, used by external memory version + * can be NULL, if cache_file is specified, this will be the temporal + * space that can be re-used to store intermediate data * \return a loaded DMatrix */ DataMatrix* LoadDataMatrix(const char *fname, bool silent, bool savebuffer, - bool loadsplit); + bool loadsplit, + const char *cache_file = NULL); /*! * \brief save DataMatrix into stream, * note: the saved dmatrix format may not be in exactly same as input diff --git a/src/io/page_dmatrix-inl.hpp b/src/io/page_dmatrix-inl.hpp index 4f70ff2e9..73a3087fb 100644 --- a/src/io/page_dmatrix-inl.hpp +++ b/src/io/page_dmatrix-inl.hpp @@ -2,7 +2,7 @@ #define XGBOOST_IO_PAGE_ROW_ITER_INL_HPP_ /*! * \file page_row_iter-inl.hpp - * row iterator based on sparse page + * row iterator based on sparse page * \author Tianqi Chen */ #include @@ -212,23 +212,24 @@ class DMatrixPageBase : public DataMatrix { // to be cleaned up in a more clear way } /*! \brief load and initialize the iterator with fi */ - inline void Load(utils::FileStream &fi, - bool silent = false, - const char *fname = NULL, - bool skip_magic_check = false) { + inline void LoadBinary(utils::FileStream &fi, + bool silent, + const char *fname_) { + std::string fname = fname_; int tmagic; utils::Check(fi.Read(&tmagic, sizeof(tmagic)) != 0, "invalid input file format"); - if (!skip_magic_check) { - utils::Check(tmagic == magic, "invalid format,magic number mismatch"); - } + utils::Check(tmagic == magic, "invalid format,magic number mismatch"); this->info.LoadBinary(fi); - iter_->Load(fi); + // load in the row data file + fname += ".row.blob"; + utils::FileStream fs(utils::FopenCheck(fname.c_str(), "rb")); + iter_->Load(fs); if (!silent) { utils::Printf("DMatrixPage: %lux%lu matrix is loaded", static_cast(info.num_row()), static_cast(info.num_col())); - if (fname != NULL) { - utils::Printf(" from %s\n", fname); + if (fname_ != NULL) { + utils::Printf(" from %s\n", fname_); } else { utils::Printf("\n"); } @@ -237,18 +238,83 @@ class DMatrixPageBase : public DataMatrix { } } } - /*! \brief save a DataMatrix as DMatrixPage*/ - inline static void Save(const char* fname, const DataMatrix &mat, bool silent) { - utils::FileStream fs(utils::FopenCheck(fname, "wb")); + /*! \brief save a DataMatrix as DMatrixPage */ + inline static void Save(const char *fname_, const DataMatrix &mat, bool silent) { + std::string fname = fname_; + utils::FileStream fs(utils::FopenCheck(fname.c_str(), "wb")); int magic = kMagic; fs.Write(&magic, sizeof(magic)); mat.info.SaveBinary(fs); - ThreadRowPageIterator::Save(mat.fmat()->RowIterator(), fs); fs.Close(); + fname += ".row.blob"; + utils::FileStream fbin(utils::FopenCheck(fname.c_str(), "wb")); + ThreadRowPageIterator::Save(mat.fmat()->RowIterator(), fbin); + fbin.Close(); if (!silent) { utils::Printf("DMatrixPage: %lux%lu is saved to %s\n", static_cast(mat.info.num_row()), - static_cast(mat.info.num_col()), fname); + static_cast(mat.info.num_col()), fname_); + } + } + /*! \brief save a LibSVM format file as DMatrixPage */ + inline void LoadText(const char *uri, + const char* cache_file, + bool silent, + bool loadsplit) { + int rank = 0, npart = 1; + if (loadsplit) { + rank = rabit::GetRank(); + npart = rabit::GetWorldSize(); + } + std::string fname_row = std::string(cache_file) + ".row.blob"; + utils::FileStream fo(utils::FopenCheck(fname_row.c_str(), "wb")); + RowBatchPage page(ThreadRowPageIterator::kPageSize); + dmlc::InputSplit *in = + dmlc::InputSplit::Create(uri, rank, npart); + std::string line; + info.Clear(); + while (in->ReadRecord(&line)) { + float label; + std::istringstream ss(line); + std::vector feats; + ss >> label; + while (!ss.eof()) { + RowBatch::Entry e; + if (!(ss >> e.index)) break; + ss.ignore(32, ':'); + if (!(ss >> e.fvalue)) break; + feats.push_back(e); + } + RowBatch::Inst row(BeginPtr(feats), feats.size()); + if (!page.PushRow(row)) { + page.Save(fo); + page.Clear(); + utils::Check(page.PushRow(row), "row is too big"); + } + for (size_t i = 0; i < feats.size(); ++i) { + info.info.num_col = std::max(info.info.num_col, + static_cast(feats[i].index+1)); + } + this->info.labels.push_back(label); + info.info.num_row += 1; + } + if (page.Size() != 0) { + page.Save(fo); + } + delete in; + fo.Close(); + iter_->Load(utils::FileStream(utils::FopenCheck(fname_row.c_str(), "rb"))); + // save data matrix + utils::FileStream fs(utils::FopenCheck(cache_file, "wb")); + int magic = kMagic; + fs.Write(&magic, sizeof(magic)); + this->info.SaveBinary(fs); + fs.Close(); + if (!silent) { + utils::Printf("DMatrixPage: %lux%lu is parsed from %s\n", + static_cast(info.num_row()), + static_cast(info.num_col()), + uri); } } /*! \brief magic number used to identify DMatrix */ diff --git a/src/io/page_fmatrix-inl.hpp b/src/io/page_fmatrix-inl.hpp deleted file mode 100644 index 44cb9abdc..000000000 --- a/src/io/page_fmatrix-inl.hpp +++ /dev/null @@ -1,382 +0,0 @@ -#ifndef XGBOOST_IO_PAGE_FMATRIX_INL_HPP_ -#define XGBOOST_IO_PAGE_FMATRIX_INL_HPP_ -/*! - * \file page_fmatrix-inl.hpp - * sparse page manager for fmatrix - * \author Tianqi Chen - */ -#include -#include -#include -#include "../data.h" -#include "../utils/iterator.h" -#include "../utils/io.h" -#include "../utils/matrix_csr.h" -#include "../utils/thread_buffer.h" -namespace xgboost { -namespace io { -class CSCMatrixManager { - public: - /*! \brief in memory page */ - struct Page { - public: - /*! \brief initialize the page */ - explicit Page(size_t size) { - buffer.resize(size); - col_index.reserve(10); - col_data.reserve(10); - } - /*! \brief clear the page */ - inline void Clear(void) { - num_entry = 0; - col_index.clear(); - col_data.clear(); - } - /*! \brief number of used entries */ - size_t num_entry; - /*! \brief column index */ - std::vector col_index; - /*! \brief column data */ - std::vector col_data; - /*! \brief number of free entries */ - inline size_t NumFreeEntry(void) const { - return buffer.size() - num_entry; - } - inline ColBatch::Entry* AllocEntry(size_t len) { - ColBatch::Entry *p_data = &buffer[0] + num_entry; - num_entry += len; - return p_data; - } - /*! \brief get underlying batch */ - inline ColBatch GetBatch(void) const { - ColBatch batch; - batch.size = col_index.size(); - batch.col_index = BeginPtr(col_index); - batch.col_data = BeginPtr(col_data); - return batch; - } - - private: - /*! \brief buffer space, not to be changed since ready */ - std::vector buffer; - }; - /*! \brief define type of page pointer */ - typedef Page *PagePtr; - // constructor - CSCMatrixManager(void) { - fi_ = NULL; - } - /*! \brief get column pointer */ - inline const std::vector &col_ptr(void) const { - return col_ptr_; - } - inline void SetParam(const char *name, const char *val) { - } - inline PagePtr Create(void) { - return new Page(page_size_); - } - inline void FreeSpace(PagePtr &a) { - delete a; - } - inline void Destroy(void) { - } - inline void BeforeFirst(void) { - col_index_ = col_todo_; - read_top_ = 0; - } - inline bool LoadNext(PagePtr &val) { - val->Clear(); - if (read_top_ >= col_index_.size()) return false; - while (read_top_ < col_index_.size()) { - if (!this->TryFill(col_index_[read_top_], val)) { - return true; - } - ++read_top_; - } - return true; - } - inline bool Init(void) { - this->BeforeFirst(); - return true; - } - inline void Setup(utils::ISeekStream *fi, double page_ratio) { - fi_ = fi; - fi_->Read(&begin_meta_ , sizeof(begin_meta_)); - begin_data_ = static_cast(fi->Tell()); - fi_->Seek(begin_meta_); - fi_->Read(&col_ptr_); - size_t psmax = 0; - for (size_t i = 0; i < col_ptr_.size() - 1; ++i) { - psmax = std::max(psmax, col_ptr_[i+1] - col_ptr_[i]); - } - utils::Check(page_ratio >= 1.0f, "col_page_ratio must be at least 1"); - page_size_ = std::max(static_cast(psmax * page_ratio), psmax); - } - inline void SetColSet(const std::vector &cset, bool setall) { - if (!setall) { - col_todo_.resize(0); - for (size_t i = 0; i < cset.size(); ++i) { - if (col_todo_[i] < static_cast(col_ptr_.size() - 1)) { - col_todo_.push_back(cset[i]); - } - } - std::sort(col_todo_.begin(), col_todo_.end()); - } else { - col_todo_.resize(col_ptr_.size()-1); - for (size_t i = 0; i < col_todo_.size(); ++i) { - col_todo_[i] = static_cast(i); - } - } - } - - private: - /*! \brief fill a page with */ - inline bool TryFill(size_t cidx, Page *p_page) { - size_t len = col_ptr_[cidx+1] - col_ptr_[cidx]; - if (p_page->NumFreeEntry() < len) return false; - ColBatch::Entry *p_data = p_page->AllocEntry(len); - fi_->Seek(col_ptr_[cidx] * sizeof(ColBatch::Entry) + begin_data_); - utils::Check(fi_->Read(p_data, sizeof(ColBatch::Entry) * len) != 0, - "invalid column buffer format"); - p_page->col_data.push_back(ColBatch::Inst(p_data, static_cast(len))); - p_page->col_index.push_back(static_cast(cidx)); - return true; - } - // the following are in memory auxiliary data structure - /*! \brief top of reader position */ - size_t read_top_; - /*! \brief size of page */ - size_t page_size_; - /*! \brief column index to be loaded */ - std::vector col_index_; - /*! \brief column index to be after calling before first */ - std::vector col_todo_; - // the following are input content - /*! \brief beginning position of data content */ - size_t begin_data_; - /*! \brief size of data content */ - size_t begin_meta_; - /*! \brief input stream */ - utils::ISeekStream *fi_; - /*! \brief column pointer of CSC format */ - std::vector col_ptr_; -}; - -class ThreadColPageIterator : public utils::IIterator { - public: - explicit ThreadColPageIterator(utils::ISeekStream *fi, - float page_ratio, bool silent) { - itr_.SetParam("buffer_size", "2"); - itr_.get_factory().Setup(fi, page_ratio); - itr_.Init(); - if (!silent) { - utils::Printf("ThreadColPageIterator: finish initialzing, %u columns\n", - static_cast(col_ptr().size() - 1)); - } - } - virtual ~ThreadColPageIterator(void) { - } - virtual void BeforeFirst(void) { - itr_.BeforeFirst(); - } - virtual bool Next(void) { - // page to be loaded - CSCMatrixManager::PagePtr page; - if (!itr_.Next(page)) return false; - out_ = page->GetBatch(); - return true; - } - virtual const ColBatch &Value(void) const { - return out_; - } - inline const std::vector &col_ptr(void) const { - return itr_.get_factory().col_ptr(); - } - inline void SetColSet(const std::vector &cset, - bool setall = false) { - itr_.get_factory().SetColSet(cset, setall); - } - - private: - // output data - ColBatch out_; - // internal iterator - utils::ThreadBuffer itr_; -}; -/*! - * \brief sparse matrix that support column access - */ -class FMatrixPage : public IFMatrix { - public: - /*! \brief constructor */ - FMatrixPage(utils::IIterator *iter, std::string fname_buffer) - : fname_cbuffer_(fname_buffer) { - this->row_iter_ = iter; - this->col_iter_ = NULL; - this->fi_ = NULL; - } - // destructor - virtual ~FMatrixPage(void) { - if (row_iter_ != NULL) delete row_iter_; - if (col_iter_ != NULL) delete col_iter_; - if (fi_ != NULL) { - fi_->Close(); delete fi_; - } - } - /*! \return whether column access is enabled */ - virtual bool HaveColAccess(void) const { - return col_iter_ != NULL; - } - /*! \brief get number of colmuns */ - virtual size_t NumCol(void) const { - utils::Check(this->HaveColAccess(), "NumCol:need column access"); - return col_iter_->col_ptr().size() - 1; - } - /*! \brief get number of buffered rows */ - virtual const std::vector &buffered_rowset(void) const { - return buffered_rowset_; - } - /*! \brief get column size */ - virtual size_t GetColSize(size_t cidx) const { - const std::vector &col_ptr = col_iter_->col_ptr(); - return col_ptr[cidx+1] - col_ptr[cidx]; - } - /*! \brief get column density */ - virtual float GetColDensity(size_t cidx) const { - const std::vector &col_ptr = col_iter_->col_ptr(); - size_t nmiss = buffered_rowset_.size() - (col_ptr[cidx+1] - col_ptr[cidx]); - return 1.0f - (static_cast(nmiss)) / buffered_rowset_.size(); - } - virtual void InitColAccess(const std::vector &enabled, float pkeep = 1.0f) { - if (this->HaveColAccess()) return; - utils::Printf("start to initialize page col access\n"); - if (this->LoadColData()) { - utils::Printf("loading previously saved col data\n"); - return; - } - this->InitColData(pkeep, fname_cbuffer_.c_str(), - 1 << 30, 5); - utils::Check(this->LoadColData(), "fail to read in column data"); - utils::Printf("finish initialize page col access\n"); - } - /*! - * \brief get the row iterator associated with FMatrix - */ - virtual utils::IIterator* RowIterator(void) { - row_iter_->BeforeFirst(); - return row_iter_; - } - /*! - * \brief get the column based iterator - */ - virtual utils::IIterator* ColIterator(void) { - std::vector cset; - col_iter_->SetColSet(cset, true); - col_iter_->BeforeFirst(); - return col_iter_; - } - /*! - * \brief colmun based iterator - */ - virtual utils::IIterator *ColIterator(const std::vector &fset) { - col_iter_->SetColSet(fset, false); - col_iter_->BeforeFirst(); - return col_iter_; - } - - protected: - /*! - * \brief try load column data from file - */ - inline bool LoadColData(void) { - FILE *fp = fopen64(fname_cbuffer_.c_str(), "rb"); - if (fp == NULL) return false; - fi_ = new utils::FileStream(fp); - static_cast(fi_)->Read(&buffered_rowset_); - col_iter_ = new ThreadColPageIterator(fi_, 2.0f, false); - return true; - } - /*! - * \brief intialize column data - * \param pkeep probability to keep a row - */ - inline void InitColData(float pkeep, const char *fname, - size_t buffer_size, size_t col_step) { - buffered_rowset_.clear(); - utils::FileStream fo(utils::FopenCheck(fname, "wb+")); - // use 64M buffer - utils::SparseCSRFileBuilder builder(&fo, buffer_size); - // start working - row_iter_->BeforeFirst(); - while (row_iter_->Next()) { - const RowBatch &batch = row_iter_->Value(); - for (size_t i = 0; i < batch.size; ++i) { - if (pkeep == 1.0f || random::SampleBinary(pkeep)) { - buffered_rowset_.push_back(static_cast(batch.base_rowid+i)); - RowBatch::Inst inst = batch[i]; - for (bst_uint j = 0; j < inst.length; ++j) { - builder.AddBudget(inst[j].index); - } - } - } - } - // write buffered rowset - static_cast(&fo)->Write(buffered_rowset_); - builder.InitStorage(); - row_iter_->BeforeFirst(); - size_t ktop = 0; - while (row_iter_->Next()) { - const RowBatch &batch = row_iter_->Value(); - for (size_t i = 0; i < batch.size; ++i) { - if (ktop < buffered_rowset_.size() && - buffered_rowset_[ktop] == batch.base_rowid + i) { - ++ktop; - RowBatch::Inst inst = batch[i]; - for (bst_uint j = 0; j < inst.length; ++j) { - builder.PushElem(inst[j].index, - ColBatch::Entry((bst_uint)(batch.base_rowid+i), - inst[j].fvalue)); - } - if (ktop % 100000 == 0) { - utils::Printf("\r \r"); - utils::Printf("InitCol: %lu rows ", static_cast(ktop)); - } - } - } - } - builder.Finalize(); - builder.SortRows(ColBatch::Entry::CmpValue, col_step); - fo.Close(); - } - - private: - // row iterator - utils::IIterator *row_iter_; - // column iterator - ThreadColPageIterator *col_iter_; - // file pointer to data - utils::FileStream *fi_; - // file name of column buffer - std::string fname_cbuffer_; - /*! \brief list of row index that are buffered */ - std::vector buffered_rowset_; -}; - -class DMatrixColPage : public DMatrixPageBase<0xffffab03> { - public: - explicit DMatrixColPage(const char *fname) { - fmat_ = new FMatrixPage(iter_, fname); - } - virtual ~DMatrixColPage(void) { - delete fmat_; - } - virtual IFMatrix *fmat(void) const { - return fmat_; - } - /*! \brief the real fmatrix */ - IFMatrix *fmat_; -}; - -} // namespace io -} // namespace xgboost -#endif // XGBOOST_IO_PAGE_FMATRIX_INL_HPP_ diff --git a/src/utils/thread_buffer.h b/src/utils/thread_buffer.h index ed36e1b43..d4ca1d111 100644 --- a/src/utils/thread_buffer.h +++ b/src/utils/thread_buffer.h @@ -31,7 +31,7 @@ class ThreadBuffer { } /*!\brief set parameter, will also pass the parameter to factory */ inline void SetParam(const char *name, const char *val) { - if (!strcmp( name, "buffer_size")) buf_size = atoi(val); + if (!std::strcmp( name, "buffer_size")) buf_size = atoi(val); factory.SetParam(name, val); } /*! diff --git a/wrapper/xgboost.py b/wrapper/xgboost.py index 1906ce09e..bfab05deb 100644 --- a/wrapper/xgboost.py +++ b/wrapper/xgboost.py @@ -87,27 +87,39 @@ def c_array(ctype, values): class DMatrix(object): - def __init__(self, data, label=None, missing=0.0, weight=None): + def __init__(self, data, label=None, missing=0.0, weight=None, cache_file=None): """ Data matrix used in XGBoost. Parameters ---------- data : string/numpy array/scipy.sparse - Data source, string type is the path of svmlight format txt file or xgb buffer. + Data source, string type is the path of svmlight format txt file, + xgb buffer or path to cache_file label : list or numpy 1-D array (optional) Label of the training data. missing : float Value in the data which needs to be present as a missing value. weight : list or numpy 1-D array (optional) Weight for each instance. + cache_file: string + Path to the binary cache of input data, when this is enabled, + several binary cache files with the prefix cache_file will be created, + xgboost will try to use external memory as much as possible, + thus save memory during computation in general """ # force into void_p, mac need to pass things in as void_p if data is None: self.handle = None return - if isinstance(data, string_types): + if cache_file is not None: + if not isinstance(data, string_types): + raise Exception('cache_file must be used together with input file name') + if not isinstance(cache_file, string_types): + raise Exception('cache_file must be string') + self.handle = ctypes.c_void_p(xglib.XGDMatrixCreateCache(c_str(data), c_str(cache_file), 0)) + elif isinstance(data, string_types): self.handle = ctypes.c_void_p(xglib.XGDMatrixCreateFromFile(c_str(data), 0)) elif isinstance(data, scipy.sparse.csr_matrix): self._init_from_csr(data) diff --git a/wrapper/xgboost_wrapper.cpp b/wrapper/xgboost_wrapper.cpp index dec266ff6..45fc05082 100644 --- a/wrapper/xgboost_wrapper.cpp +++ b/wrapper/xgboost_wrapper.cpp @@ -114,6 +114,11 @@ extern "C"{ void* XGDMatrixCreateFromFile(const char *fname, int silent) { return LoadDataMatrix(fname, silent != 0, false, false); } + void* XGDMatrixCreateCache(const char *fname, + const char *cache_file, + int silent) { + return LoadDataMatrix(fname, silent != 0, false, false, cache_file); + } void* XGDMatrixCreateFromCSR(const bst_ulong *indptr, const unsigned *indices, const float *data, diff --git a/wrapper/xgboost_wrapper.h b/wrapper/xgboost_wrapper.h index 78df68c71..66d1dfbc0 100644 --- a/wrapper/xgboost_wrapper.h +++ b/wrapper/xgboost_wrapper.h @@ -19,9 +19,22 @@ extern "C" { #endif /*! * \brief load a data matrix + * \param fname the name of the file + * \param silent whether print messages during loading * \return a loaded data matrix */ XGB_DLL void* XGDMatrixCreateFromFile(const char *fname, int silent); + /*! + * \brief load a cached DMatrix, this is backed by several cache_files + * and usually cost less memory + * \param fname the name of the file, can be a cached buffer or text + * \param cache_file the name of cached file + * \param silent whether print messages during loading + * \return a loaded data matrix + */ + XGB_DLL void* XGDMatrixCreateCache(const char *fname, + const char *cache_file, + int silent); /*! * \brief create a matrix content from csr format * \param indptr pointer to row headers