diff --git a/src/data.h b/src/data.h index 2ea5f222a..162a31bfe 100644 --- a/src/data.h +++ b/src/data.h @@ -138,9 +138,10 @@ class IFMatrix { virtual utils::IIterator *ColIterator(const std::vector &fset) = 0; /*! * \brief check if column access is supported, if not, initialize column access + * \param enabled whether certain feature should be included in column access * \param subsample subsample ratio when generating column access */ - virtual void InitColAccess(float subsample) = 0; + virtual void InitColAccess(const std::vector &enabled, float subsample) = 0; // the following are column meta data, should be able to answer them fast /*! \return whether column access is enabled */ virtual bool HaveColAccess(void) const = 0; diff --git a/src/io/page_fmatrix-inl.hpp b/src/io/page_fmatrix-inl.hpp index af8be333f..971abbb0e 100644 --- a/src/io/page_fmatrix-inl.hpp +++ b/src/io/page_fmatrix-inl.hpp @@ -247,7 +247,7 @@ class FMatrixPage : public IFMatrix { size_t nmiss = buffered_rowset_.size() - (col_ptr[cidx+1] - col_ptr[cidx]); return 1.0f - (static_cast(nmiss)) / buffered_rowset_.size(); } - virtual void InitColAccess(float pkeep = 1.0f) { + virtual void InitColAccess(const std::vector &enabled, float pkeep = 1.0f) { if (this->HaveColAccess()) return; utils::Printf("start to initialize page col access\n"); if (this->LoadColData()) { diff --git a/src/io/simple_fmatrix-inl.hpp b/src/io/simple_fmatrix-inl.hpp index 997268ff3..88bc69019 100644 --- a/src/io/simple_fmatrix-inl.hpp +++ b/src/io/simple_fmatrix-inl.hpp @@ -48,9 +48,10 @@ class FMatrixS : public IFMatrix{ size_t nmiss = buffered_rowset_.size() - (col_ptr_[cidx+1] - col_ptr_[cidx]); return 1.0f - (static_cast(nmiss)) / buffered_rowset_.size(); } - virtual void InitColAccess(float pkeep = 1.0f) { + virtual void InitColAccess(const std::vector &enabled, + float pkeep = 1.0f) { if (this->HaveColAccess()) return; - this->InitColData(pkeep); + this->InitColData(pkeep, enabled); } /*! * \brief get the row iterator associated with FMatrix @@ -141,7 +142,7 @@ class FMatrixS : public IFMatrix{ * \brief intialize column data * \param pkeep probability to keep a row */ - inline void InitColData(float pkeep) { + inline void InitColData(float pkeep, const std::vector &enabled) { buffered_rowset_.clear(); // note: this part of code is serial, todo, parallelize this transformer utils::SparseCSRMBuilder builder(col_ptr_, col_data_); @@ -155,7 +156,9 @@ class FMatrixS : public IFMatrix{ buffered_rowset_.push_back(static_cast(batch.base_rowid+i)); RowBatch::Inst inst = batch[i]; for (bst_uint j = 0; j < inst.length; ++j) { - builder.AddBudget(inst[j].index); + if (enabled[inst[j].index]){ + builder.AddBudget(inst[j].index); + } } } } @@ -172,9 +175,11 @@ class FMatrixS : public IFMatrix{ ++ktop; RowBatch::Inst inst = batch[i]; for (bst_uint j = 0; j < inst.length; ++j) { - builder.PushElem(inst[j].index, - Entry((bst_uint)(batch.base_rowid+i), - inst[j].fvalue)); + if (enabled[inst[j].index]) { + builder.PushElem(inst[j].index, + Entry((bst_uint)(batch.base_rowid+i), + inst[j].fvalue)); + } } } } diff --git a/src/learner/learner-inl.hpp b/src/learner/learner-inl.hpp index c43ec7700..1a001eb95 100644 --- a/src/learner/learner-inl.hpp +++ b/src/learner/learner-inl.hpp @@ -31,6 +31,7 @@ class BoostLearner { name_gbm_ = "gbtree"; silent= 0; prob_buffer_row = 1.0f; + part_load_col = 0; } ~BoostLearner(void) { if (obj_ != NULL) delete obj_; @@ -88,6 +89,7 @@ class BoostLearner { this->SetParam(n.c_str(), val); } if (!strcmp(name, "silent")) silent = atoi(val); + if (!strcmp(name, "part_load_col")) part_load_col = atoi(val); if (!strcmp(name, "prob_buffer_row")) { prob_buffer_row = static_cast(atof(val)); this->SetParam("updater", "grow_colmaker,refresh,prune"); @@ -164,8 +166,41 @@ class BoostLearner { * if not intialize it * \param p_train pointer to the matrix used by training */ - inline void CheckInit(DMatrix *p_train) { - p_train->fmat()->InitColAccess(prob_buffer_row); + inline void CheckInit(DMatrix *p_train) { + int ncol = p_train->info.info.num_col; + std::vector enabled(ncol, true); + + if (part_load_col != 0) { + std::vector col_index; + for (int i = 0; i < ncol; ++i) { + col_index.push_back(i); + } + random::Shuffle(col_index); + std::string s_model; + utils::MemoryBufferStream ms(&s_model); + utils::IStream &fs = ms; + if (sync::GetRank() == 0) { + fs.Write(col_index); + sync::Bcast(&s_model, 0); + } else { + sync::Bcast(&s_model, 0); + fs.Read(&col_index); + } + int nsize = sync::GetWorldSize(); + int step = (ncol + nsize -1) / nsize; + int pid = sync::GetRank(); + std::fill(enabled.begin(), enabled.end(), false); + int start = step * pid; + int end = std::min(step * (pid + 1), ncol); + utils::Printf("rank %d idset:", pid); + for (int i = start; i < end; ++i) { + enabled[col_index[i]] = true; + utils::Printf(" %u", col_index[i]); + } + utils::Printf("\n"); + } + // initialize column access + p_train->fmat()->InitColAccess(enabled, prob_buffer_row); } /*! * \brief update the model for one iteration @@ -316,6 +351,8 @@ class BoostLearner { // data fields // silent during training int silent; + // randomly load part of data + int part_load_col; // maximum buffred row value float prob_buffer_row; // evaluation set diff --git a/src/sync/sync.h b/src/sync/sync.h index 239840cb3..293f53515 100644 --- a/src/sync/sync.h +++ b/src/sync/sync.h @@ -21,6 +21,9 @@ enum ReduceOp { /*! \brief get rank of current process */ int GetRank(void); +/*! \brief get total number of process */ +int GetWorldSize(void); + /*! * \brief this is used to check if sync module is a true distributed implementation, or simply a dummpy */ diff --git a/src/sync/sync_empty.cpp b/src/sync/sync_empty.cpp index 84e2f770b..108f170ef 100644 --- a/src/sync/sync_empty.cpp +++ b/src/sync/sync_empty.cpp @@ -17,6 +17,10 @@ bool IsDistributed(void) { return false; } +int GetWorldSize(void) { + return 1; +} + template<> void AllReduce(uint32_t *sendrecvbuf, int count, ReduceOp op) { } diff --git a/src/sync/sync_mpi.cpp b/src/sync/sync_mpi.cpp index ecd83c601..faf66ab6f 100644 --- a/src/sync/sync_mpi.cpp +++ b/src/sync/sync_mpi.cpp @@ -8,6 +8,10 @@ int GetRank(void) { return MPI::COMM_WORLD.Get_rank(); } +int GetWorldSize(void) { + return MPI::COMM_WORLD.Get_size(); +} + void Init(int argc, char *argv[]) { MPI::Init(argc, argv); } diff --git a/src/xgboost_main.cpp b/src/xgboost_main.cpp index 690417855..dd549634c 100644 --- a/src/xgboost_main.cpp +++ b/src/xgboost_main.cpp @@ -160,7 +160,7 @@ class BoostLearnTask { if (!silent) printf("boosting round %d, %lu sec elapsed\n", i, elapsed); learner.UpdateOneIter(i, *data); std::string res = learner.EvalOneIter(i, devalall, eval_data_names); - if (silent < 1) { + if (silent < 2) { fprintf(stderr, "%s\n", res.c_str()); } if (save_period != 0 && (i + 1) % save_period == 0) {