From 7d1e9f06d43b2601963e91ebb6fa871c0dbf7426 Mon Sep 17 00:00:00 2001 From: tqchen Date: Mon, 1 Sep 2014 10:45:05 -0700 Subject: [PATCH] add fmatrix in, todo add buffer file --- src/io/page_dmatrix-inl.hpp | 20 +-- src/io/page_fmatrix-inl.hpp | 248 +++++++++++++++++++++++++++++++++--- src/utils/thread_buffer.h | 3 + 3 files changed, 237 insertions(+), 34 deletions(-) diff --git a/src/io/page_dmatrix-inl.hpp b/src/io/page_dmatrix-inl.hpp index 23013b98b..01b7f8fc7 100644 --- a/src/io/page_dmatrix-inl.hpp +++ b/src/io/page_dmatrix-inl.hpp @@ -108,7 +108,6 @@ class ThreadRowPageIterator: public utils::IIterator { itr.SetParam("buffer_size", "2"); page_ = NULL; base_rowid_ = 0; - isend_ = false; } virtual ~ThreadRowPageIterator(void) { } @@ -116,11 +115,10 @@ class ThreadRowPageIterator: public utils::IIterator { } virtual void BeforeFirst(void) { itr.BeforeFirst(); - isend_ = false; base_rowid_ = 0; } virtual bool Next(void) { - if(!this->LoadNextPage()) return false; + if(!itr.Next(page_)) return false; out_ = page_->GetRowBatch(&tmp_ptr_, base_rowid_); base_rowid_ += out_.size; return true; @@ -154,21 +152,12 @@ class ThreadRowPageIterator: public utils::IIterator { if (page.Size() != 0) page.Save(fo); } private: - // load in next page - inline bool LoadNextPage(void) { - ptop_ = 0; - bool ret = itr.Next(page_); - isend_ = !ret; - return ret; - } // base row id size_t base_rowid_; // temporal ptr std::vector tmp_ptr_; // output data RowBatch out_; - // whether we reach end of file - bool isend_; // page pointer type typedef RowBatchPage* PagePtr; // loader factory for page @@ -205,7 +194,6 @@ class ThreadRowPageIterator: public utils::IIterator { protected: PagePtr page_; - int ptop_; utils::ThreadBuffer itr; }; @@ -234,7 +222,8 @@ class DMatrixPage : public DataMatrix { iter_->Load(fi); if (!silent) { printf("DMatrixPage: %lux%lu matrix is loaded", - info.num_row(), info.num_col()); + static_cast(info.num_row()), + static_cast(info.num_col())); if (fname != NULL) { printf(" from %s\n", fname); } else { @@ -255,7 +244,8 @@ class DMatrixPage : public DataMatrix { fs.Close(); if (!silent) { printf("DMatrixPage: %lux%lu is saved to %s\n", - mat.info.num_row(), mat.info.num_col(), fname); + static_cast(mat.info.num_row()), + static_cast(mat.info.num_col()), fname); } } /*! \brief the real fmatrix */ diff --git a/src/io/page_fmatrix-inl.hpp b/src/io/page_fmatrix-inl.hpp index f077f0dde..cf4923b7b 100644 --- a/src/io/page_fmatrix-inl.hpp +++ b/src/io/page_fmatrix-inl.hpp @@ -20,7 +20,7 @@ class CSCMatrixManager { explicit Page(size_t size) { buffer.resize(size); col_index.reserve(10); - col_data.reserved(10); + col_data.reserve(10); } /*! \brief clear the page */ inline void Clear(void) { @@ -57,49 +57,259 @@ class CSCMatrixManager { /*! \brief define type of page pointer */ typedef Page *PagePtr; /*! \brief get column pointer */ - const std::vector &col_ptr(void) const { + inline const std::vector &col_ptr(void) const { return col_ptr_; } - inline bool Init(void) { - return true; - } inline void SetParam(const char *name, const char *val) { - } - inline bool LoadNext(PagePtr &val) { - } inline PagePtr Create(void) { - PagePtr a = new Page(); - return a; + return new Page(page_size_); } inline void FreeSpace(PagePtr &a) { delete a; } inline void Destroy(void) { - fi.Close(); } inline void BeforeFirst(void) { - fi.Seek(file_begin_); + col_index_ = col_todo_; + read_top_ = 0; + } + inline bool LoadNext(PagePtr &val) { + val->Clear(); + if (read_top_ >= col_index_.size()) return false; + while (read_top_ < col_index_.size()) { + if (!this->TryFill(col_index_[read_top_], val)) return true; + ++read_top_; + } + return true; + } + inline bool Init(void) { + this->BeforeFirst(); + return true; + } + inline void Setup(utils::ISeekStream *fi, double page_ratio) { + fi_ = fi; + fi_->Read(&begin_meta_ , sizeof(size_t)); + fi_->Seek(begin_meta_); + fi_->Read(&col_ptr_); + size_t psmax = 0; + for (size_t i = 0; i < col_ptr_.size() - 1; ++i) { + psmax = std::max(psmax, col_ptr_[i+1] - col_ptr_[i]); + } + utils::Check(page_ratio >= 1.0f, "col_page_ratio must be at least 1"); + page_size_ = std::max(static_cast(psmax * page_ratio), psmax); + } + inline void SetColSet(const std::vector &cset, bool setall) { + if (!setall) { + col_todo_.resize(cset.size()); + for (size_t i = 0; i < cset.size(); ++i) { + col_todo_[i] = cset[i]; + utils::Assert(col_todo_[i] < static_cast(col_ptr_.size() - 1), + "CSCMatrixManager: column index exceed bound"); + } + std::sort(col_todo_.begin(), col_todo_.end()); + } else { + col_todo_.resize(col_ptr_.size()-1); + for (size_t i = 0; i < col_todo_.size(); ++i) { + col_todo_[i] = static_cast(i); + } + } } - private: /*! \brief fill a page with */ - inline bool Fill(size_t cidx, Page *p_page) { + inline bool TryFill(size_t cidx, Page *p_page) { size_t len = col_ptr_[cidx+1] - col_ptr_[cidx]; if (p_page->NumFreeEntry() < len) return false; ColBatch::Entry *p_data = p_page->AllocEntry(len); - fi->Seek(col_ptr_[cidx]); - utils::Check(fi->Read(p_data, sizeof(ColBatch::Entry) * len) != 0, + fi_->Seek(col_ptr_[cidx]); + utils::Check(fi_->Read(p_data, sizeof(ColBatch::Entry) * len) != 0, "invalid column buffer format"); p_page->col_data.push_back(ColBatch::Inst(p_data, len)); p_page->col_index.push_back(cidx); } + // the following are in memory auxiliary data structure + /*! \brief top of reader position */ + size_t read_top_; + /*! \brief size of page */ + size_t page_size_; + /*! \brief column index to be loaded */ + std::vector col_index_; + /*! \brief column index to be after calling before first */ + std::vector col_todo_; + // the following are input content /*! \brief size of data content */ - size_t data_size_; + size_t begin_meta_; /*! \brief input stream */ - utils::ISeekStream *fi; + utils::ISeekStream *fi_; /*! \brief column pointer of CSC format */ - std::vector col_ptr_; + std::vector col_ptr_; +}; + +class ThreadColPageIterator : public utils::IIterator { + public: + ThreadColPageIterator(void) { + itr_.SetParam("buffer_size", "2"); + page_ = NULL; + fi_ = NULL; + silent = 0; + } + virtual ~ThreadColPageIterator(void) { + if (fi_ != NULL) { + fi_->Close(); delete fi_; + } + } + virtual void Init(void) { + fi_ = new utils::FileStream(utils::FopenCheck(col_pagefile_.c_str(), "rb")); + itr_.get_factory().Setup(fi_, col_pageratio_); + if (silent == 0) { + printf("ThreadColPageIterator: finish initialzing from %s, %u columns\n", + col_pagefile_.c_str(), static_cast(col_ptr().size() - 1)); + } + } + virtual void SetParam(const char *name, const char *val) { + if (!strcmp("col_pageratio", val)) col_pageratio_ = atof(val); + if (!strcmp("col_pagefile", val)) col_pagefile_ = val; + if (!strcmp("silent", val)) silent = atoi(val); + } + virtual void BeforeFirst(void) { + itr_.BeforeFirst(); + } + virtual bool Next(void) { + if(!itr_.Next(page_)) return false; + out_ = page_->GetBatch(); + return true; + } + virtual const ColBatch &Value(void) const{ + return out_; + } + inline const std::vector &col_ptr(void) const { + return itr_.get_factory().col_ptr(); + } + inline void SetColSet(const std::vector &cset, bool setall = false) { + itr_.get_factory().SetColSet(cset, setall); + } + + private: + // shutup + int silent; + // input file + utils::FileStream *fi_; + // size of page + float col_pageratio_; + // name of file + std::string col_pagefile_; + // output data + ColBatch out_; + // page to be loaded + CSCMatrixManager::PagePtr page_; + // internal iterator + utils::ThreadBuffer itr_; +}; + +/*! + * \brief sparse matrix that support column access + */ +class FMatrixPage : public IFMatrix { + public: + /*! \brief constructor */ + FMatrixPage(utils::IIterator *iter) { + this->row_iter_ = iter; + this->col_iter_ = NULL; + } + // destructor + virtual ~FMatrixPage(void) { + if (row_iter_ != NULL) delete row_iter_; + if (col_iter_ != NULL) delete col_iter_; + } + /*! \return whether column access is enabled */ + virtual bool HaveColAccess(void) const { + return col_iter_ != NULL; + } + /*! \brief get number of colmuns */ + virtual size_t NumCol(void) const { + utils::Check(this->HaveColAccess(), "NumCol:need column access"); + return col_iter_->col_ptr().size() - 1; + } + /*! \brief get number of buffered rows */ + virtual const std::vector &buffered_rowset(void) const { + return buffered_rowset_; + } + /*! \brief get column size */ + virtual size_t GetColSize(size_t cidx) const { + const std::vector &col_ptr = col_iter_->col_ptr(); + return col_ptr[cidx+1] - col_ptr[cidx]; + } + /*! \brief get column density */ + virtual float GetColDensity(size_t cidx) const { + const std::vector &col_ptr = col_iter_->col_ptr(); + size_t nmiss = buffered_rowset_.size() - (col_ptr[cidx+1] - col_ptr[cidx]); + return 1.0f - (static_cast(nmiss)) / buffered_rowset_.size(); + } + virtual void InitColAccess(float pkeep = 1.0f) { + if (this->HaveColAccess()) return; + this->InitColData(pkeep); + } + /*! + * \brief get the row iterator associated with FMatrix + */ + virtual utils::IIterator* RowIterator(void) { + row_iter_->BeforeFirst(); + return row_iter_; + } + /*! + * \brief get the column based iterator + */ + virtual utils::IIterator* ColIterator(void) { + std::vector cset; + col_iter_->SetColSet(cset, true); + col_iter_->BeforeFirst(); + return col_iter_; + } + /*! + * \brief colmun based iterator + */ + virtual utils::IIterator *ColIterator(const std::vector &fset) { + col_iter_->SetColSet(fset, false); + col_iter_->BeforeFirst(); + return col_iter_; + } + + protected: + /*! + * \brief intialize column data + * \param pkeep probability to keep a row + */ + inline void InitColData(float pkeep) { + buffered_rowset_.clear(); + // start working + row_iter_->BeforeFirst(); + while (row_iter_->Next()) { + const RowBatch &batch = row_iter_->Value(); + for (size_t i = 0; i < batch.size; ++i) { + } + } + row_iter_->BeforeFirst(); + size_t ktop = 0; + while (row_iter_->Next()) { + const RowBatch &batch = row_iter_->Value(); + for (size_t i = 0; i < batch.size; ++i) { + if (ktop < buffered_rowset_.size() && + buffered_rowset_[ktop] == batch.base_rowid+i) { + ++ktop; + // TODO1 + } + } + } + // sort columns + } + + private: + // row iterator + utils::IIterator *row_iter_; + // column iterator + ThreadColPageIterator *col_iter_; + /*! \brief list of row index that are buffered */ + std::vector buffered_rowset_; }; } // namespace io diff --git a/src/utils/thread_buffer.h b/src/utils/thread_buffer.h index fa488a220..ace50c4b8 100644 --- a/src/utils/thread_buffer.h +++ b/src/utils/thread_buffer.h @@ -113,6 +113,9 @@ class ThreadBuffer { inline ElemFactory &get_factory(void) { return factory; } + inline const ElemFactory &get_factory(void) const{ + return factory; + } // size of buffer int buf_size; private: