diff --git a/.gitignore b/.gitignore index 35061b857..48e3a9400 100644 --- a/.gitignore +++ b/.gitignore @@ -57,4 +57,3 @@ R-package.Rproj *.cache* R-package/inst R-package/src - diff --git a/src/io/page_dmatrix-inl.hpp b/src/io/page_dmatrix-inl.hpp index 73a3087fb..eb56bb80d 100644 --- a/src/io/page_dmatrix-inl.hpp +++ b/src/io/page_dmatrix-inl.hpp @@ -1,7 +1,7 @@ -#ifndef XGBOOST_IO_PAGE_ROW_ITER_INL_HPP_ -#define XGBOOST_IO_PAGE_ROW_ITER_INL_HPP_ +#ifndef XGBOOST_IO_PAGE_DMATRIX_INL_HPP_ +#define XGBOOST_IO_PAGE_DMATRIX_INL_HPP_ /*! - * \file page_row_iter-inl.hpp + * \file page_dmatrix-inl.hpp * row iterator based on sparse page * \author Tianqi Chen */ @@ -10,97 +10,11 @@ #include "../utils/iterator.h" #include "../utils/thread_buffer.h" #include "./simple_fmatrix-inl.hpp" +#include "./sparse_batch_page.h" +#include "./page_fmatrix-inl.hpp" namespace xgboost { namespace io { -/*! \brief page structure that can be used to store a rowbatch */ -struct RowBatchPage { - public: - explicit RowBatchPage(size_t page_size) : kPageSize(page_size) { - data_ = new int[kPageSize]; - utils::Assert(data_ != NULL, "fail to allocate row batch page"); - this->Clear(); - } - ~RowBatchPage(void) { - if (data_ != NULL) delete [] data_; - } - /*! - * \brief Push one row into page - * \param row an instance row - * \return false or true to push into - */ - inline bool PushRow(const RowBatch::Inst &row) { - const size_t dsize = row.length * sizeof(RowBatch::Entry); - if (FreeBytes() < dsize+ sizeof(int)) return false; - row_ptr(Size() + 1) = row_ptr(Size()) + row.length; - memcpy(data_ptr(row_ptr(Size())) , row.data, dsize); - ++data_[0]; - return true; - } - /*! - * \brief get a row batch representation from the page - * \param p_rptr a temporal space that can be used to provide - * ind_ptr storage for RowBatch - * \return a new RowBatch object - */ - inline RowBatch GetRowBatch(std::vector *p_rptr, size_t base_rowid) { - RowBatch batch; - batch.base_rowid = base_rowid; - batch.data_ptr = this->data_ptr(0); - batch.size = static_cast(this->Size()); - std::vector &rptr = *p_rptr; - rptr.resize(this->Size() + 1); - for (size_t i = 0; i < rptr.size(); ++i) { - rptr[i] = static_cast(this->row_ptr(static_cast(i))); - } - batch.ind_ptr = &rptr[0]; - return batch; - } - /*! \brief get i-th row from the batch */ - inline RowBatch::Inst operator[](int i) { - return RowBatch::Inst(data_ptr(0) + row_ptr(i), - static_cast(row_ptr(i+1) - row_ptr(i))); - } - /*! - * \brief clear the page, cleanup the content - */ - inline void Clear(void) { - memset(&data_[0], 0, sizeof(int) * kPageSize); - } - /*! - * \brief load one page form instream - * \return true if loading is successful - */ - inline bool Load(utils::IStream &fi) { - return fi.Read(&data_[0], sizeof(int) * kPageSize) != 0; - } - /*! \brief save one page into outstream */ - inline void Save(utils::IStream &fo) { - fo.Write(&data_[0], sizeof(int) * kPageSize); - } - /*! \return number of elements */ - inline int Size(void) const { - return data_[0]; - } - - protected: - /*! \return number of elements */ - inline size_t FreeBytes(void) { - return (kPageSize - (Size() + 2)) * sizeof(int) - - row_ptr(Size()) * sizeof(RowBatch::Entry); - } - /*! \brief equivalent row pointer at i */ - inline int& row_ptr(int i) { - return data_[kPageSize - i - 1]; - } - inline RowBatch::Entry* data_ptr(int i) { - return (RowBatch::Entry*)(&data_[1]) + i; - } - // content of data - int *data_; - // page size - const size_t kPageSize; -}; /*! \brief thread buffer iterator */ class ThreadRowPageIterator: public utils::IIterator { public: @@ -118,7 +32,10 @@ class ThreadRowPageIterator: public utils::IIterator { } virtual bool Next(void) { if (!itr.Next(page_)) return false; - out_ = page_->GetRowBatch(&tmp_ptr_, base_rowid_); + out_.base_rowid = base_rowid_; + out_.ind_ptr = BeginPtr(page_->offset); + out_.data_ptr = BeginPtr(page_->data); + out_.size = page_->offset.size() - 1; base_rowid_ += out_.size; return true; } @@ -127,76 +44,18 @@ class ThreadRowPageIterator: public utils::IIterator { } /*! \brief load and initialize the iterator with fi */ inline void Load(const utils::FileStream &fi) { - itr.get_factory().SetFile(fi); + itr.get_factory().SetFile(fi, 0); itr.Init(); this->BeforeFirst(); } - /*! - * \brief save a row iterator to output stream, in row iterator format - */ - inline static void Save(utils::IIterator *iter, - utils::IStream &fo) { - RowBatchPage page(kPageSize); - iter->BeforeFirst(); - while (iter->Next()) { - const RowBatch &batch = iter->Value(); - for (size_t i = 0; i < batch.size; ++i) { - if (!page.PushRow(batch[i])) { - page.Save(fo); - page.Clear(); - utils::Check(page.PushRow(batch[i]), "row is too big"); - } - } - } - if (page.Size() != 0) page.Save(fo); - } - /*! \brief page size 64 MB */ - static const size_t kPageSize = 64 << 18; private: // base row id size_t base_rowid_; - // temporal ptr - std::vector tmp_ptr_; // output data RowBatch out_; - // page pointer type - typedef RowBatchPage* PagePtr; - // loader factory for page - struct Factory { - public: - size_t file_begin_; - utils::FileStream fi; - Factory(void) {} - inline void SetFile(const utils::FileStream &fi) { - this->fi = fi; - file_begin_ = this->fi.Tell(); - } - inline bool Init(void) { - return true; - } - inline void SetParam(const char *name, const char *val) {} - inline bool LoadNext(PagePtr &val) { - return val->Load(fi); - } - inline PagePtr Create(void) { - PagePtr a = new RowBatchPage(kPageSize); - return a; - } - inline void FreeSpace(PagePtr &a) { - delete a; - } - inline void Destroy(void) { - fi.Close(); - } - inline void BeforeFirst(void) { - fi.Seek(file_begin_); - } - }; - - protected: - PagePtr page_; - utils::ThreadBuffer itr; + SparsePage *page_; + utils::ThreadBuffer itr; }; /*! \brief data matrix using page */ @@ -247,8 +106,20 @@ class DMatrixPageBase : public DataMatrix { mat.info.SaveBinary(fs); fs.Close(); fname += ".row.blob"; + utils::IIterator *iter = mat.fmat()->RowIterator(); utils::FileStream fbin(utils::FopenCheck(fname.c_str(), "wb")); - ThreadRowPageIterator::Save(mat.fmat()->RowIterator(), fbin); + SparsePage page; + iter->BeforeFirst(); + while (iter->Next()) { + const RowBatch &batch = iter->Value(); + for (size_t i = 0; i < batch.size; ++i) { + page.Push(batch[i]); + if (page.MemCostBytes() >= kPageSize) { + page.Save(&fbin); page.Clear(); + } + } + } + if (page.data.size() != 0) page.Save(&fbin); fbin.Close(); if (!silent) { utils::Printf("DMatrixPage: %lux%lu is saved to %s\n", @@ -268,7 +139,7 @@ class DMatrixPageBase : public DataMatrix { } std::string fname_row = std::string(cache_file) + ".row.blob"; utils::FileStream fo(utils::FopenCheck(fname_row.c_str(), "wb")); - RowBatchPage page(ThreadRowPageIterator::kPageSize); + SparsePage page; dmlc::InputSplit *in = dmlc::InputSplit::Create(uri, rank, npart); std::string line; @@ -286,10 +157,9 @@ class DMatrixPageBase : public DataMatrix { feats.push_back(e); } RowBatch::Inst row(BeginPtr(feats), feats.size()); - if (!page.PushRow(row)) { - page.Save(fo); - page.Clear(); - utils::Check(page.PushRow(row), "row is too big"); + page.Push(row); + if (page.MemCostBytes() >= kPageSize) { + page.Save(&fo); page.Clear(); } for (size_t i = 0; i < feats.size(); ++i) { info.info.num_col = std::max(info.info.num_col, @@ -298,8 +168,8 @@ class DMatrixPageBase : public DataMatrix { this->info.labels.push_back(label); info.info.num_row += 1; } - if (page.Size() != 0) { - page.Save(fo); + if (page.data.size() != 0) { + page.Save(&fo); } delete in; fo.Close(); @@ -319,7 +189,8 @@ class DMatrixPageBase : public DataMatrix { } /*! \brief magic number used to identify DMatrix */ static const int kMagic = TKMagic; - + /*! \brief page size 64 MB */ + static const size_t kPageSize = 64 << 18; protected: /*! \brief row iterator */ ThreadRowPageIterator *iter_; diff --git a/src/io/page_fmatrix-inl.hpp b/src/io/page_fmatrix-inl.hpp new file mode 100644 index 000000000..44643c329 --- /dev/null +++ b/src/io/page_fmatrix-inl.hpp @@ -0,0 +1,130 @@ +#ifndef XGBOOST_IO_PAGE_FMATRIX_INL_HPP_ +#define XGBOOST_IO_PAGE_FMATRIX_INL_HPP_ +/*! + * \file page_fmatrix-inl.hpp + * col iterator based on sparse page + * \author Tianqi Chen + */ +namespace xgboost { +namespace io { +/*! \brief thread buffer iterator */ +class ThreadColPageIterator: public utils::IIterator { + public: + ThreadColPageIterator(void) { + itr.SetParam("buffer_size", "2"); + page_ = NULL; + } + virtual ~ThreadColPageIterator(void) {} + virtual void Init(void) {} + virtual void BeforeFirst(void) { + itr.BeforeFirst(); + } + virtual bool Next(void) { + if (!itr.Next(page_)) return false; + out_.col_index = BeginPtr(itr.get_factory().index_set()); + col_data_.resize(page_->offset.size() - 1, SparseBatch::Inst(NULL, 0)); + for (size_t i = 0; i < col_data_.size(); ++i) { + col_data_[i] = SparseBatch::Inst + (BeginPtr(page_->data) + page_->offset[i], + page_->offset[i + 1] - page_->offset[i]); + } + out_.col_data = BeginPtr(col_data_); + out_.size = col_data_.size(); + return true; + } + virtual const ColBatch &Value(void) const { + return out_; + } + /*! \brief load and initialize the iterator with fi */ + inline void SetFile(const utils::FileStream &fi) { + itr.get_factory().SetFile(fi, 0); + itr.Init(); + } + // set index set + inline void SetIndexSet(const std::vector &fset) { + itr.get_factory().SetIndexSet(fset); + } + + private: + // output data + ColBatch out_; + SparsePage *page_; + std::vector col_data_; + utils::ThreadBuffer itr; +}; +/*! + * \brief sparse matrix that support column access, CSC + */ +class FMatrixS : public IFMatrix { + public: + typedef SparseBatch::Entry Entry; + /*! \brief constructor */ + FMatrixS(utils::IIterator *iter) { + this->iter_ = iter; + } + // destructor + virtual ~FMatrixS(void) { + if (iter_ != NULL) delete iter_; + } + /*! \return whether column access is enabled */ + virtual bool HaveColAccess(void) const { + return col_ptr_.size() != 0; + } + /*! \brief get number of colmuns */ + virtual size_t NumCol(void) const { + utils::Check(this->HaveColAccess(), "NumCol:need column access"); + return col_ptr_.size() - 1; + } + /*! \brief get number of buffered rows */ + virtual const std::vector &buffered_rowset(void) const { + return buffered_rowset_; + } + /*! \brief get column size */ + virtual size_t GetColSize(size_t cidx) const { + return col_ptr_[cidx+1] - col_ptr_[cidx]; + } + /*! \brief get column density */ + virtual float GetColDensity(size_t cidx) const { + size_t nmiss = buffered_rowset_.size() - (col_ptr_[cidx+1] - col_ptr_[cidx]); + return 1.0f - (static_cast(nmiss)) / buffered_rowset_.size(); + } + virtual void InitColAccess(const std::vector &enabled, + float pkeep = 1.0f) { + if (this->HaveColAccess()) return; + this->InitColData(pkeep, enabled); + } + /*! + * \brief get the row iterator associated with FMatrix + */ + virtual utils::IIterator* RowIterator(void) { + iter_->BeforeFirst(); + return iter_; + } + /*! + * \brief get the column based iterator + */ + virtual utils::IIterator* ColIterator(void) { + size_t ncol = this->NumCol(); + col_iter_.col_index_.resize(ncol); + for (size_t i = 0; i < ncol; ++i) { + col_iter_.col_index_[i] = static_cast(i); + } + col_iter_.SetBatch(col_ptr_, col_data_); + return &col_iter_; + } + /*! + * \brief colmun based iterator + */ + virtual utils::IIterator *ColIterator(const std::vector &fset) { + size_t ncol = this->NumCol(); + col_iter_.col_index_.resize(0); + for (size_t i = 0; i < fset.size(); ++i) { + if (fset[i] < ncol) col_iter_.col_index_.push_back(fset[i]); + } + col_iter_.SetBatch(col_ptr_, col_data_); + return &col_iter_; + } +}; +} // namespace io +} // namespace xgboost +#endif // XGBOOST_IO_PAGE_FMATRIX_INL_HPP_ diff --git a/src/io/simple_fmatrix-inl.hpp b/src/io/simple_fmatrix-inl.hpp index 9f204536f..3ba6f2801 100644 --- a/src/io/simple_fmatrix-inl.hpp +++ b/src/io/simple_fmatrix-inl.hpp @@ -16,7 +16,7 @@ namespace io { /*! * \brief sparse matrix that support column access, CSC */ -class FMatrixS : public IFMatrix{ +class FMatrixS : public IFMatrix { public: typedef SparseBatch::Entry Entry; /*! \brief constructor */ @@ -238,7 +238,7 @@ class FMatrixS : public IFMatrix{ inline void SetBatch(const std::vector &ptr, const std::vector &data) { batch_.size = col_index_.size(); - col_data_.resize(col_index_.size(), SparseBatch::Inst(NULL,0)); + col_data_.resize(col_index_.size(), SparseBatch::Inst(NULL, 0)); for (size_t i = 0; i < col_data_.size(); ++i) { const bst_uint ridx = col_index_[i]; col_data_[i] = SparseBatch::Inst(&data[0] + ptr[ridx], diff --git a/src/io/sparse_batch_page.h b/src/io/sparse_batch_page.h new file mode 100644 index 000000000..f7bbb08de --- /dev/null +++ b/src/io/sparse_batch_page.h @@ -0,0 +1,216 @@ +#ifndef XGBOOST_IO_SPARSE_BATCH_PAGE_H_ +#define XGBOOST_IO_SPARSE_BATCH_PAGE_H_ +/*! + * \file sparse_batch_page.h + * content holder of sparse batch that can be saved to disk + * the representation can be effectively + * use in external memory computation + * \author Tianqi Chen + */ +#include "../data.h" + +namespace xgboost { +namespace io { +/*! + * \brief storage unit of sparse batch + */ +class SparsePage { + public: + /*! \brief offset of the segments */ + std::vector offset; + /*! \brief the data of the segments */ + std::vector data; + /*! \brief constructor */ + SparsePage() { + this->Clear(); + } + /*! + * \brief load the by providing a list of interested segments + * only the interested segments are loaded + * \param fi the input stream of the file + * \param sorted_index_set sorted index of segments we are interested in + * \return true of the loading as successful, false if end of file was reached + */ + inline bool Load(utils::ISeekStream *fi, + const std::vector &sorted_index_set) { + if (!fi->Read(&disk_offset_)) return false; + // setup the offset + offset.clear(); offset.push_back(0); + for (size_t i = 0; i < sorted_index_set.size(); ++i) { + bst_uint fid = sorted_index_set[i]; + size_t size = disk_offset_[fid + 1] - disk_offset_[fid]; + offset.push_back(offset.back() + size); + } + data.resize(offset.back()); + // read in the data + size_t begin = fi->Tell(); + size_t curr_offset = 0; + for (size_t i = 0; i < sorted_index_set.size();) { + bst_uint fid = sorted_index_set[i]; + if (disk_offset_[fid] != curr_offset) { + utils::Assert(disk_offset_[fid] > curr_offset, "fset index was not sorted"); + fi->Seek(begin + disk_offset_[fid]); + curr_offset = disk_offset_[fid]; + } + size_t j, size_to_read = 0; + for (j = i; j < sorted_index_set.size(); ++j) { + if (disk_offset_[sorted_index_set[j]] == disk_offset_[fid] + size_to_read) { + size_to_read += offset[j + 1] - offset[j]; + } else { + break; + } + } + if (size_to_read != 0) { + utils::Check(fi->Read(BeginPtr(data) + offset[i], + size_to_read * sizeof(SparseBatch::Entry)) != 0, + "Invalid SparsePage file"); + curr_offset += size_to_read; + } + i = j; + } + return true; + } + /*! + * \brief load all the segments + * \param fi the input stream of the file + * \return true of the loading as successful, false if end of file was reached + */ + inline bool Load(utils::IStream *fi) { + if (!fi->Read(&offset)) return false; + utils::Check(offset.size() != 0, "Invalid SparsePage file"); + data.resize(offset.back()); + if (data.size() != 0) { + utils::Check(fi->Read(BeginPtr(data), data.size() * sizeof(SparseBatch::Entry)) != 0, + "Invalid SparsePage file"); + } + return true; + } + /*! + * \brief save the data to fo, when a page was written + * to disk it must contain all the elements in the + * \param fo output stream + */ + inline void Save(utils::IStream *fo) const { + utils::Assert(offset.size() != 0 && offset[0] == 0, "bad offset"); + utils::Assert(offset.back() == data.size(), "in consistent SparsePage"); + fo->Write(offset); + if (data.size() != 0) { + fo->Write(BeginPtr(data), data.size() * sizeof(SparseBatch::Entry)); + } + } + /*! \return estimation of memory cost of this page */ + inline size_t MemCostBytes(void) const { + return offset.size() * sizeof(size_t) + data.size() * sizeof(SparseBatch::Entry); + } + /*! \brief clear the page */ + inline void Clear(void) { + offset.clear(); + offset.push_back(0); + data.clear(); + } + /*! + * \brief load all the segments and add it to existing batch + * \param fi the input stream of the file + * \return true of the loading as successful, false if end of file was reached + */ + inline bool PushLoad(utils::IStream *fi) { + if (!fi->Read(&disk_offset_)) return false; + data.resize(offset.back() + disk_offset_.back()); + if (disk_offset_.back() != 0) { + utils::Check(fi->Read(BeginPtr(data) + offset.back(), + disk_offset_.back() * sizeof(SparseBatch::Entry)) != 0, + "Invalid SparsePage file"); + } + size_t top = offset.back(); + size_t begin = offset.size(); + offset.resize(offset.size() + disk_offset_.size()); + for (size_t i = 0; i < disk_offset_.size(); ++i) { + offset[i + begin] = top + disk_offset_[i]; + } + } + /*! + * \brief Push row batch into the page + * \param batch the row batch + */ + inline void Push(const RowBatch &batch) { + data.resize(offset.back() + batch.size); + std::memcpy(BeginPtr(data) + offset.back(), + batch.data_ptr + batch.ind_ptr[0], + sizeof(SparseBatch::Entry) * batch.ind_ptr[batch.size]); + size_t top = offset.back(); + size_t begin = offset.size(); + offset.resize(offset.size() + batch.size); + for (size_t i = 0; i < batch.size; ++i) { + offset[i + begin] = top + batch.ind_ptr[i] - batch.ind_ptr[0]; + } + } + /*! + * \brief Push one instance into page + * \param row an instance row + */ + inline void Push(const SparseBatch::Inst &inst) { + offset.push_back(offset.back() + inst.length); + size_t begin = data.size(); + data.resize(begin + inst.length); + std::memcpy(BeginPtr(data) + begin, inst.data, + sizeof(SparseBatch::Entry) * inst.length); + } + + private: + /*! \brief external memory column offset */ + std::vector disk_offset_; +}; +/*! + * \brief factory class for SparsePage, + * used in threadbuffer template + */ +class SparsePageFactory { + public: + SparsePageFactory(void) {} + inline void SetFile(const utils::FileStream &fi, + size_t file_begin = 0) { + fi_ = fi; + file_begin_ = file_begin; + } + inline const std::vector &index_set(void) const { + return action_index_set_; + } + // set index set, will be used after next before first + inline void SetIndexSet(const std::vector &index_set) { + set_index_set_ = index_set; + std::sort(set_index_set_.begin(), set_index_set_.end()); + } + inline bool Init(void) { + return true; + } + inline void SetParam(const char *name, const char *val) {} + inline bool LoadNext(SparsePage *val) { + if (action_index_set_.size() != 0) { + return val->Load(&fi_, action_index_set_); + } else { + return val->Load(&fi_); + } + } + inline SparsePage *Create(void) { + return new SparsePage(); + } + inline void FreeSpace(SparsePage *a) { + delete a; + } + inline void Destroy(void) { + fi_.Close(); + } + inline void BeforeFirst(void) { + fi_.Seek(file_begin_); + action_index_set_ = set_index_set_; + } + + private: + size_t file_begin_; + utils::FileStream fi_; + std::vector action_index_set_; + std::vector set_index_set_; +}; +} // namespace io +} // namespace xgboost +#endif // XGBOOST_IO_SPARSE_BATCH_PAGE_H_