diff --git a/.gitignore b/.gitignore index 1a2a4b48e..cb017114b 100644 --- a/.gitignore +++ b/.gitignore @@ -46,3 +46,6 @@ Debug *csv *.cpage.col *.cpage +xgboost +xgboost-mpi +train* diff --git a/Makefile b/Makefile index a6e7f3daa..c99a9a7fe 100644 --- a/Makefile +++ b/Makefile @@ -30,9 +30,9 @@ io.o: src/io/io.cpp src/io/*.hpp src/utils/*.h src/learner/dmatrix.h src/*.h sync_mpi.o: src/sync/sync_mpi.cpp sync_empty.o: src/sync/sync_empty.cpp main.o: src/xgboost_main.cpp src/utils/*.h src/*.h src/learner/*.hpp src/learner/*.h -xgboost: updater.o gbm.o io.o main.o sync_empty.o xgboost-mpi: updater.o gbm.o io.o main.o sync_mpi.o -wrapper/libxgboostwrapper.so: wrapper/xgboost_wrapper.cpp src/utils/*.h src/*.h src/learner/*.hpp src/learner/*.h $(OBJ) +xgboost: updater.o gbm.o io.o main.o sync_empty.o +wrapper/libxgboostwrapper.so: wrapper/xgboost_wrapper.cpp src/utils/*.h src/*.h src/learner/*.hpp src/learner/*.h updater.o gbm.o io.o sync_empty.o $(BIN) : $(CXX) $(CFLAGS) $(LDFLAGS) -o $@ $(filter %.cpp %.o %.c, $^) diff --git a/demo/mpi/README.md b/demo/mpi/README.md new file mode 100644 index 000000000..60fd0eb6e --- /dev/null +++ b/demo/mpi/README.md @@ -0,0 +1,3 @@ +This folder contains toy example script to run xgboost-mpi. + +This is an experimental distributed version of xgboost diff --git a/demo/mpi/mpi.conf b/demo/mpi/mpi.conf new file mode 100644 index 000000000..5b1f978d1 --- /dev/null +++ b/demo/mpi/mpi.conf @@ -0,0 +1,36 @@ +# General Parameters, see comment for each definition +# choose the booster, can be gbtree or gblinear +booster = gbtree +# choose logistic regression loss function for binary classification +objective = binary:logistic + +# Tree Booster Parameters +# step size shrinkage +eta = 1.0 +# minimum loss reduction required to make a further partition +gamma = 1.0 +# minimum sum of instance weight(hessian) needed in a child +min_child_weight = 1 +# maximum depth of a tree +max_depth = 3 + +# Task Parameters +# the number of round to do boosting +num_round = 2 +# 0 means do not save any model except the final round model +save_period = 0 +use_buffer = 0 + + +# The path of training data %d is the wildcard for the rank of the data +# The idea is each process take a feature matrix with subset of columns +# +data = "train.col%d" + +# The path of validation data, used to monitor training process, here [test] sets name of the validation set +eval[test] = "../data/agaricus.txt.test" +# evaluate on training data as well each round +eval_train = 1 + +# The path of test data, need to use full data of test, try not use it, or keep an subsampled version +test:data = "agaricus.txt.test" diff --git a/demo/mpi/runexp-mpi.sh b/demo/mpi/runexp-mpi.sh new file mode 100755 index 000000000..cc0c6d459 --- /dev/null +++ b/demo/mpi/runexp-mpi.sh @@ -0,0 +1,19 @@ +#!/bin/bash +if [[ $# -ne 1 ]] +then + echo "Usage: nprocess" + exit -1 +fi + +rm -rf train.col* +k=$1 + +# split the lib svm file into k subfiles +python splitsvm.py ../data/agaricus.txt.train train $k + +# run xgboost mpi +mpirun -n $k ../../xgboost-mpi mpi.conf + +# the model can be directly loaded by single machine xgboost solver, as usuall +../../xgboost mpi.conf task=dump model_in=0002.model fmap=../data/featmap.txt name_dump=dump.nice.$k.txt +cat dump.nice.$k.txt diff --git a/demo/mpi/splitsvm.py b/demo/mpi/splitsvm.py new file mode 100644 index 000000000..365aef610 --- /dev/null +++ b/demo/mpi/splitsvm.py @@ -0,0 +1,32 @@ +#!/usr/bin/python +import sys +import random + +# split libsvm file into different subcolumns +if len(sys.argv) < 4: + print ('Usage: k') + exit(0) + +random.seed(10) +fmap = {} + +k = int(sys.argv[3]) +fi = open( sys.argv[1], 'r' ) +fos = [] + +for i in range(k): + fos.append(open( sys.argv[2]+'.col%d' % i, 'w' )) + +for l in open(sys.argv[1]): + arr = l.split() + for f in fos: + f.write(arr[0]) + for it in arr[1:]: + fid = int(it.split(':')[0]) + if fid not in fmap: + fmap[fid] = random.randint(0, k-1) + fos[fmap[fid]].write(' '+it) + for f in fos: + f.write('\n') +for f in fos: + f.close() diff --git a/src/learner/learner-inl.hpp b/src/learner/learner-inl.hpp index c8c146b45..c43ec7700 100644 --- a/src/learner/learner-inl.hpp +++ b/src/learner/learner-inl.hpp @@ -10,6 +10,7 @@ #include #include #include +#include "../sync/sync.h" #include "./objective.h" #include "./evaluation.h" #include "../gbm/gbm.h" @@ -61,6 +62,7 @@ class BoostLearner { buffer_size += mats[i]->info.num_row(); num_feature = std::max(num_feature, static_cast(mats[i]->info.num_col())); } + sync::AllReduce(&num_feature, 1, sync::kMax); char str_temp[25]; if (num_feature > mparam.num_feature) { utils::SPrintf(str_temp, sizeof(str_temp), "%u", num_feature); diff --git a/src/sync/sync.h b/src/sync/sync.h index cf82597e0..239840cb3 100644 --- a/src/sync/sync.h +++ b/src/sync/sync.h @@ -15,11 +15,16 @@ namespace sync { /*! \brief reduce operator supported */ enum ReduceOp { kSum, + kMax, kBitwiseOR }; /*! \brief get rank of current process */ int GetRank(void); +/*! + * \brief this is used to check if sync module is a true distributed implementation, or simply a dummpy + */ +bool IsDistributed(void); /*! \brief intiialize the synchronization module */ void Init(int argc, char *argv[]); /*! \brief finalize syncrhonization module */ diff --git a/src/sync/sync_empty.cpp b/src/sync/sync_empty.cpp index e46a6906a..84e2f770b 100644 --- a/src/sync/sync_empty.cpp +++ b/src/sync/sync_empty.cpp @@ -6,18 +6,28 @@ namespace sync { int GetRank(void) { return 0; } + void Init(int argc, char *argv[]) { } + void Finalize(void) { } + +bool IsDistributed(void) { + return false; +} + template<> void AllReduce(uint32_t *sendrecvbuf, int count, ReduceOp op) { } + template<> void AllReduce(float *sendrecvbuf, int count, ReduceOp op) { } + void Bcast(std::string *sendrecv_data, int root) { } + ReduceHandle::ReduceHandle(void) : handle(NULL) {} ReduceHandle::~ReduceHandle(void) {} void ReduceHandle::Init(ReduceFunction redfunc, bool commute) {} diff --git a/src/sync/sync_mpi.cpp b/src/sync/sync_mpi.cpp index 2890ab609..ecd83c601 100644 --- a/src/sync/sync_mpi.cpp +++ b/src/sync/sync_mpi.cpp @@ -12,6 +12,10 @@ void Init(int argc, char *argv[]) { MPI::Init(argc, argv); } +bool IsDistributed(void) { + return true; +} + void Finalize(void) { MPI::Finalize(); } @@ -20,6 +24,7 @@ void AllReduce_(void *sendrecvbuf, int count, const MPI::Datatype &dtype, Reduce switch(op) { case kBitwiseOR: MPI::COMM_WORLD.Allreduce(MPI_IN_PLACE, sendrecvbuf, count, dtype, MPI::BOR); return; case kSum: MPI::COMM_WORLD.Allreduce(MPI_IN_PLACE, sendrecvbuf, count, dtype, MPI::SUM); return; + case kMax: MPI::COMM_WORLD.Allreduce(MPI_IN_PLACE, sendrecvbuf, count, dtype, MPI::MAX); return; } } diff --git a/src/tree/updater_distcol-inl.hpp b/src/tree/updater_distcol-inl.hpp index 38249b7d4..d94cdf409 100644 --- a/src/tree/updater_distcol-inl.hpp +++ b/src/tree/updater_distcol-inl.hpp @@ -93,9 +93,15 @@ class DistColMaker : public ColMaker { while (fsplits.size() != 0 && fsplits.back() >= p_fmat->NumCol()) { fsplits.pop_back(); } - // setup BitMap - bitmap.Resize(this->position.size()); - bitmap.Clear(); + // bitmap is only word concurrent, set to bool first + { + bst_omp_uint ndata = static_cast(this->position.size()); + boolmap.resize(ndata); + #pragma omp parallel for schedule(static) + for (bst_omp_uint j = 0; j < ndata; ++j) { + boolmap[j] = 0; + } + } utils::IIterator *iter = p_fmat->ColIterator(fsplits); while (iter->Next()) { const ColBatch &batch = iter->Value(); @@ -110,15 +116,16 @@ class DistColMaker : public ColMaker { const int nid = this->DecodePosition(ridx); if (!tree[nid].is_leaf() && tree[nid].split_index() == fid) { if (fvalue < tree[nid].split_cond()) { - if (!tree[nid].default_left()) bitmap.SetTrue(ridx); + if (!tree[nid].default_left()) boolmap[ridx] = 1; } else { - if (tree[nid].default_left()) bitmap.SetTrue(ridx); + if (tree[nid].default_left()) boolmap[ridx] = 1; } } } } } - + + bitmap.InitFromBool(boolmap); // communicate bitmap sync::AllReduce(BeginPtr(bitmap.data), bitmap.data.size(), sync::kBitwiseOR); const std::vector &rowset = p_fmat->buffered_rowset(); @@ -159,6 +166,7 @@ class DistColMaker : public ColMaker { private: utils::BitMap bitmap; + std::vector boolmap; sync::Reducer reducer; }; // we directly introduce pruner here diff --git a/src/utils/bitmap.h b/src/utils/bitmap.h index 9c7cf2fc2..92420656a 100644 --- a/src/utils/bitmap.h +++ b/src/utils/bitmap.h @@ -7,6 +7,7 @@ */ #include #include "./utils.h" +#include "./omp.h" namespace xgboost { namespace utils { @@ -35,6 +36,25 @@ struct BitMap { inline void SetTrue(size_t i) { data[i >> 5] |= (1 << (i & 31U)); } + /*! \brief initialize the value of bit map from vector of bool*/ + inline void InitFromBool(const std::vector &vec) { + this->Resize(vec.size()); + // parallel over the full cases + bst_omp_uint nsize = static_cast(vec.size() / 32); + #pragma omp parallel for schedule(static) + for (bst_omp_uint i = 0; i < nsize; ++i) { + uint32_t res = 0; + for (int k = 0; k < 32; ++k) { + int bit = vec[(i << 5) | k]; + res |= (bit << k); + } + data[i] = res; + } + if (nsize != vec.size()) data.back() = 0; + for (size_t i = nsize; i < vec.size(); ++i) { + if (vec[i]) this->SetTrue(i); + } + } /*! \brief clear the bitmap, set all places to false */ inline void Clear(void) { std::fill(data.begin(), data.end(), 0U); diff --git a/src/xgboost_main.cpp b/src/xgboost_main.cpp index e96342f69..690417855 100644 --- a/src/xgboost_main.cpp +++ b/src/xgboost_main.cpp @@ -14,7 +14,7 @@ namespace xgboost { /*! * \brief wrapping the training process */ -class BoostLearnTask{ +class BoostLearnTask { public: inline int Run(int argc, char *argv[]) { if (argc < 2) { @@ -31,6 +31,9 @@ class BoostLearnTask{ this->SetParam(name, val); } } + if (sync::IsDistributed()) { + this->SetParam("updater", "distcol"); + } if (sync::GetRank() != 0) { this->SetParam("silent", "2"); } @@ -93,6 +96,7 @@ class BoostLearnTask{ name_pred = "pred.txt"; name_dump = "dump.txt"; model_dir_path = "./"; + load_part = 0; data = NULL; } ~BoostLearnTask(void){ @@ -103,13 +107,20 @@ class BoostLearnTask{ } private: inline void InitData(void) { + if (strchr(train_path.c_str(), '%') != NULL) { + char s_tmp[256]; + utils::SPrintf(s_tmp, sizeof(s_tmp), train_path.c_str(), sync::GetRank()); + train_path = s_tmp; + load_part = 1; + } + if (name_fmap != "NULL") fmap.LoadText(name_fmap.c_str()); if (task == "dump") return; if (task == "pred") { data = io::LoadDataMatrix(test_path.c_str(), silent != 0, use_buffer != 0); } else { // training - data = io::LoadDataMatrix(train_path.c_str(), silent != 0, use_buffer != 0); + data = io::LoadDataMatrix(train_path.c_str(), silent != 0 && load_part == 0, use_buffer != 0); utils::Assert(eval_data_names.size() == eval_data_paths.size(), "BUG"); for (size_t i = 0; i < eval_data_names.size(); ++i) { deval.push_back(io::LoadDataMatrix(eval_data_paths[i].c_str(), silent != 0, use_buffer != 0)); @@ -182,6 +193,7 @@ class BoostLearnTask{ fclose(fo); } inline void SaveModel(const char *fname) const { + if (sync::GetRank() != 0) return; utils::FileStream fo(utils::FopenCheck(fname, "wb")); learner.SaveModel(fo); fo.Close(); @@ -205,6 +217,8 @@ class BoostLearnTask{ private: /*! \brief whether silent */ int silent; + /*! \brief special load */ + int load_part; /*! \brief whether use auto binary buffer */ int use_buffer; /*! \brief whether evaluate training statistics */