diff --git a/.gitignore b/.gitignore index cb017114b..8bb1ead7f 100644 --- a/.gitignore +++ b/.gitignore @@ -49,3 +49,4 @@ Debug xgboost xgboost-mpi train* +rabit diff --git a/Makefile b/Makefile index a78c28e56..3f0b3c7cf 100644 --- a/Makefile +++ b/Makefile @@ -46,7 +46,7 @@ xgboost: updater.o gbm.o io.o main.o librabit wrapper/libxgboostwrapper.so: wrapper/xgboost_wrapper.cpp src/utils/*.h src/*.h src/learner/*.hpp src/learner/*.h updater.o gbm.o io.o librabit $(BIN) : - $(CXX) $(CFLAGS) -o $@ $(filter %.cpp %.o %.c, $^) $(LDFLAGS) -lrabit + $(CXX) $(CFLAGS) -o $@ $(filter %.cpp %.o %.c, $^) $(LDFLAGS) -lrabit_mock $(SLIB) : $(CXX) $(CFLAGS) -fPIC -shared -o $@ $(filter %.cpp %.o %.c, $^) $(LDFLAGS) -lrabit diff --git a/multi-node/col-split/README.md b/multi-node/col-split/README.md index 04227d1eb..4f0d07b27 100644 --- a/multi-node/col-split/README.md +++ b/multi-node/col-split/README.md @@ -1,7 +1,9 @@ Distributed XGBoost: Column Split Version ==== * run ```bash mushroom-col-rabit.sh ``` - - mushroom-col-tcp.sh starts xgboost job using rabit's allreduce + - mushroom-col-rabit.sh starts xgboost job using rabit's allreduce +* run ```bash mushroom-col-rabit-mock.sh ``` + - mushroom-col-rabit-mock.sh starts xgboost job using rabit's allreduce, inserts suicide signal at certain point and test recovery * run ```bash mushroom-col-mpi.sh ``` - mushroom-col.sh starts xgboost-mpi job diff --git a/multi-node/col-split/mushroom-col-rabit-mock.sh b/multi-node/col-split/mushroom-col-rabit-mock.sh new file mode 100755 index 000000000..5148a0b61 --- /dev/null +++ b/multi-node/col-split/mushroom-col-rabit-mock.sh @@ -0,0 +1,25 @@ +#!/bin/bash +if [[ $# -ne 1 ]] +then + echo "Usage: nprocess" + exit -1 +fi + +# +# This script is same as mushroom-col except that we will be using xgboost instead of xgboost-mpi +# xgboost used built in tcp-based allreduce module, and can be run on more enviroment, so long as we know how to start job by modifying ../submit_job_tcp.py +# +rm -rf train.col* *.model +k=$1 + +# split the lib svm file into k subfiles +python splitsvm.py ../../demo/data/agaricus.txt.train train $k + +# run xgboost mpi +../../rabit/tracker/rabit_mpi.py $k local ../../rabit/test/keepalive.sh ../../xgboost mushroom-col.conf dsplit=col mock=0,0,1,0 mock=1,1,0,0 + +# the model can be directly loaded by single machine xgboost solver, as usuall +#../../xgboost mushroom-col.conf task=dump model_in=0002.model fmap=../../demo/data/featmap.txt name_dump=dump.nice.$k.txt + + +#cat dump.nice.$k.txt diff --git a/src/gbm/gbtree-inl.hpp b/src/gbm/gbtree-inl.hpp index e63ea42fa..e8f1b1933 100644 --- a/src/gbm/gbtree-inl.hpp +++ b/src/gbm/gbtree-inl.hpp @@ -53,19 +53,13 @@ class GBTree : public IGradBooster { utils::Check(fi.Read(&tree_info[0], sizeof(int) * mparam.num_trees) != 0, "GBTree: invalid model file"); } - if (mparam.num_pbuffer != 0) { + if (mparam.num_pbuffer != 0 && with_pbuffer) { pred_buffer.resize(mparam.PredBufferSize()); pred_counter.resize(mparam.PredBufferSize()); - if (with_pbuffer) { - utils::Check(fi.Read(&pred_buffer[0], pred_buffer.size() * sizeof(float)) != 0, - "GBTree: invalid model file"); - utils::Check(fi.Read(&pred_counter[0], pred_counter.size() * sizeof(unsigned)) != 0, - "GBTree: invalid model file"); - } else { - // reset predict buffer if the input do not have them - std::fill(pred_buffer.begin(), pred_buffer.end(), 0.0f); - std::fill(pred_counter.begin(), pred_counter.end(), 0); - } + utils::Check(fi.Read(&pred_buffer[0], pred_buffer.size() * sizeof(float)) != 0, + "GBTree: invalid model file"); + utils::Check(fi.Read(&pred_counter[0], pred_counter.size() * sizeof(unsigned)) != 0, + "GBTree: invalid model file"); } } virtual void SaveModel(utils::IStream &fo, bool with_pbuffer) const { @@ -77,7 +71,7 @@ class GBTree : public IGradBooster { if (tree_info.size() != 0) { fo.Write(&tree_info[0], sizeof(int) * tree_info.size()); } - if (mparam.num_pbuffer != 0 && with_pbuffer) { + if (mparam.num_pbuffer != 0 && with_pbuffer) { fo.Write(&pred_buffer[0], pred_buffer.size() * sizeof(float)); fo.Write(&pred_counter[0], pred_counter.size() * sizeof(unsigned)); } diff --git a/src/learner/learner-inl.hpp b/src/learner/learner-inl.hpp index 1640071b6..2f6c3f0b3 100644 --- a/src/learner/learner-inl.hpp +++ b/src/learner/learner-inl.hpp @@ -48,11 +48,9 @@ class BoostLearner : public rabit::ISerializable { * \param mats array of pointers to matrix whose prediction result need to be cached */ inline void SetCacheData(const std::vector& mats) { - // estimate feature bound - unsigned num_feature = 0; + utils::Assert(cache_.size() == 0, "can only call cache data once"); // assign buffer index size_t buffer_size = 0; - utils::Assert(cache_.size() == 0, "can only call cache data once"); for (size_t i = 0; i < mats.size(); ++i) { bool dupilicate = false; for (size_t j = 0; j < i; ++j) { @@ -63,16 +61,10 @@ class BoostLearner : public rabit::ISerializable { mats[i]->cache_learner_ptr_ = this; cache_.push_back(CacheEntry(mats[i], buffer_size, mats[i]->info.num_row())); buffer_size += mats[i]->info.num_row(); - num_feature = std::max(num_feature, static_cast(mats[i]->info.num_col())); } - rabit::Allreduce(&num_feature, 1); char str_temp[25]; - if (num_feature > mparam.num_feature) { - utils::SPrintf(str_temp, sizeof(str_temp), "%u", num_feature); - this->SetParam("bst:num_feature", str_temp); - } - utils::SPrintf(str_temp, sizeof(str_temp), "%lu", - static_cast(buffer_size)); + utils::SPrintf(str_temp, sizeof(str_temp), "%lu", + static_cast(buffer_size)); this->SetParam("num_pbuffer", str_temp); if (!silent) { utils::Printf("buffer_size=%ld\n", static_cast(buffer_size)); @@ -126,10 +118,29 @@ class BoostLearner : public rabit::ISerializable { cfg_.push_back(std::make_pair(std::string(name), std::string(val))); } } + // this is an internal function + // initialize the trainer, called at InitModel and LoadModel + inline void InitTrainer(bool calc_num_feature = true) { + if (calc_num_feature) { + // estimate feature bound + unsigned num_feature = 0; + for (size_t i = 0; i < cache_.size(); ++i) { + num_feature = std::max(num_feature, + static_cast(cache_[i].mat_->info.num_col())); + } + // run allreduce on num_feature to find the maximum value + rabit::Allreduce(&num_feature, 1); + if (num_feature > mparam.num_feature) mparam.num_feature = num_feature; + } + char str_temp[25]; + utils::SPrintf(str_temp, sizeof(str_temp), "%d", mparam.num_feature); + this->SetParam("bst:num_feature", str_temp); + } /*! * \brief initialize the model */ inline void InitModel(void) { + this->InitTrainer(); // initialize model this->InitObjGBM(); // reset the base score @@ -141,8 +152,9 @@ class BoostLearner : public rabit::ISerializable { * \brief load model from stream * \param fi input stream * \param with_pbuffer whether to load with predict buffer + * \param calc_num_feature whether call InitTrainer with calc_num_feature */ - inline void LoadModel(utils::IStream &fi, bool with_pbuffer = true) { + inline void LoadModel(utils::IStream &fi, bool with_pbuffer = true, bool calc_num_feature = true) { utils::Check(fi.Read(&mparam, sizeof(ModelParam)) != 0, "BoostLearner: wrong model format"); utils::Check(fi.Read(&name_obj_), "BoostLearner: wrong model format"); @@ -150,9 +162,10 @@ class BoostLearner : public rabit::ISerializable { // delete existing gbm if any if (obj_ != NULL) delete obj_; if (gbm_ != NULL) delete gbm_; + this->InitTrainer(calc_num_feature); this->InitObjGBM(); gbm_->LoadModel(fi, with_pbuffer); - if (with_pbuffer && distributed_mode == 2 && rabit::GetRank() != 0) { + if (!with_pbuffer || distributed_mode == 2) { gbm_->ResetPredBuffer(pred_buffer_size); } } @@ -160,7 +173,7 @@ class BoostLearner : public rabit::ISerializable { virtual void Load(rabit::IStream &fi) { RabitStreamAdapter fs(fi); // for row split, we should not keep pbuffer - this->LoadModel(fs, distributed_mode != 2); + this->LoadModel(fs, distributed_mode != 2, false); } // rabit save model to rabit checkpoint virtual void Save(rabit::IStream &fo) const { @@ -209,9 +222,12 @@ class BoostLearner : public rabit::ISerializable { * \param p_train pointer to the data matrix */ inline void UpdateOneIter(int iter, const DMatrix &train) { + printf("!!UpdateOneIter\n"); this->PredictRaw(train, &preds_); obj_->GetGradient(preds_, train.info, iter, &gpair_); + printf("!!UpdateOneDoboost\n"); gbm_->DoBoost(train.fmat(), this->FindBufferOffset(train), train.info.info, &gpair_); + printf("!!UpdateOneIter finish\n"); } /*! * \brief evaluate the model for specific iteration @@ -335,7 +351,7 @@ class BoostLearner : public rabit::ISerializable { /* \brief number of class, if it is multi-class classification */ int num_class; /*! \brief reserved field */ - int reserved[32]; + int reserved[31]; /*! \brief constructor */ ModelParam(void) { base_score = 0.5f; diff --git a/src/xgboost_main.cpp b/src/xgboost_main.cpp index d25140461..89cc1b77d 100644 --- a/src/xgboost_main.cpp +++ b/src/xgboost_main.cpp @@ -48,8 +48,7 @@ class BoostLearnTask { } if (rabit::GetRank() != 0) { this->SetParam("silent", "2"); - } - + } if (task == "train") { // if task is training, will try recover from checkpoint this->TaskTrain(); @@ -151,7 +150,7 @@ class BoostLearnTask { learner.SetCacheData(dcache); // add training set to evaluation set if needed - if( eval_train != 0 ) { + if (eval_train != 0) { devalall.push_back(data); eval_data_names.push_back(std::string("train")); }