From c42ba8d2811ee5e3d55670a395215998db28cdde Mon Sep 17 00:00:00 2001 From: tqchen Date: Wed, 19 Nov 2014 19:19:53 -0800 Subject: [PATCH] get multinode in --- multi-node/README.md | 3 ++- multi-node/col-split/README.md | 2 +- multi-node/row-split/README.md | 17 +++++++++++++ multi-node/row-split/machine-row.conf | 31 +++++++++++++++++++++++ multi-node/row-split/machine-row.sh | 21 ++++++++++++++++ multi-node/row-split/mushroom-row.conf | 35 ++++++++++++++++++++++++++ multi-node/row-split/mushroom-row.sh | 19 ++++++++++++++ src/sync/sync.h | 14 +++++------ src/sync/sync_empty.cpp | 3 +++ src/sync/sync_mpi.cpp | 9 ++++--- src/tree/updater.cpp | 2 +- src/tree/updater_histmaker-inl.hpp | 6 ++--- src/utils/io.h | 2 +- src/utils/quantile.h | 16 +++++++----- 14 files changed, 157 insertions(+), 23 deletions(-) create mode 100644 multi-node/row-split/README.md create mode 100644 multi-node/row-split/machine-row.conf create mode 100755 multi-node/row-split/machine-row.sh create mode 100644 multi-node/row-split/mushroom-row.conf create mode 100755 multi-node/row-split/mushroom-row.sh diff --git a/multi-node/README.md b/multi-node/README.md index fab7472e7..ba445da12 100644 --- a/multi-node/README.md +++ b/multi-node/README.md @@ -28,6 +28,7 @@ Design Choice this will reduce the communication overhead and improve the performance. - One way to do that is limit mpi slot in each machine to be 1, or reserve nthread processors for each process. -Examples +Usage ==== * [Column-based version](col-split) +* [Row-based version](row-split) diff --git a/multi-node/col-split/README.md b/multi-node/col-split/README.md index c0b9fef7c..bdafb2e32 100644 --- a/multi-node/col-split/README.md +++ b/multi-node/col-split/README.md @@ -1,6 +1,6 @@ Distributed XGBoost: Column Split Version ==== -* run ```bash run-mushroom.sh``` +* run ```bash mushroom-row.sh ``` How to Use ==== diff --git a/multi-node/row-split/README.md b/multi-node/row-split/README.md new file mode 100644 index 000000000..6c0078883 --- /dev/null +++ b/multi-node/row-split/README.md @@ -0,0 +1,17 @@ +Distributed XGBoost: Row Split Version +==== +* Mushroom: run ```bash mushroom-row.sh ``` +* Machine: run ```bash machine-row.sh ``` + +How to Use +==== +* First split the data by rows +* In the config, specify data file as containing a wildcard %d, where %d is the rank of the node, each node will load their part of data +* Enable ow split mode by ```dsplit=row``` + +Notes +==== +* The code is multi-threaded, so you want to run one xgboost-mpi per node +* Row-based solver split data by row, each node work on subset of rows, it uses an approximate histogram count algorithm, + and will only examine subset of potential split points as opposed to all split points. +* ```colsample_bytree``` is not enabled in row split mode so far diff --git a/multi-node/row-split/machine-row.conf b/multi-node/row-split/machine-row.conf new file mode 100644 index 000000000..ac816ab45 --- /dev/null +++ b/multi-node/row-split/machine-row.conf @@ -0,0 +1,31 @@ +# General Parameters, see comment for each definition +# choose the tree booster, can also change to gblinear +booster = gbtree +# this is the only difference with classification, use reg:linear to do linear classification +# when labels are in [0,1] we can also use reg:logistic +objective = reg:linear + +# Tree Booster Parameters +# step size shrinkage +eta = 1.0 +# minimum loss reduction required to make a further partition +gamma = 1.0 +# minimum sum of instance weight(hessian) needed in a child +min_child_weight = 1 +# maximum depth of a tree +max_depth = 3 + +# Task parameters +# the number of round to do boosting +num_round = 2 +# 0 means do not save any model except the final round model +save_period = 0 +use_buffer = 0 + +# The path of training data +data = "train-machine.row%d" +# The path of validation data, used to monitor training process, here [test] sets name of the validation set +eval[test] = "../../demo/regression/machine.txt.test" +# The path of test data +test:data = "../../demo/regression/machine.txt.test" + diff --git a/multi-node/row-split/machine-row.sh b/multi-node/row-split/machine-row.sh new file mode 100755 index 000000000..41b8e8634 --- /dev/null +++ b/multi-node/row-split/machine-row.sh @@ -0,0 +1,21 @@ +#!/bin/bash +if [[ $# -ne 1 ]] +then + echo "Usage: nprocess" + exit -1 +fi + +rm -rf train-machine.row* *.model +k=$1 +# make machine data +cd ../../demo/regression/ +python mapfeat.py +python mknfold.py machine.txt 1 +cd - + +# split the lib svm file into k subfiles +python splitrows.py ../../demo/regression/machine.txt.train train-machine $k + +# run xgboost mpi +mpirun -n $k ../../xgboost-mpi machine-row.conf dsplit=row + diff --git a/multi-node/row-split/mushroom-row.conf b/multi-node/row-split/mushroom-row.conf new file mode 100644 index 000000000..4cc2e8b11 --- /dev/null +++ b/multi-node/row-split/mushroom-row.conf @@ -0,0 +1,35 @@ +# General Parameters, see comment for each definition +# choose the booster, can be gbtree or gblinear +booster = gbtree +# choose logistic regression loss function for binary classification +objective = binary:logistic + +# Tree Booster Parameters +# step size shrinkage +eta = 1.0 +# minimum loss reduction required to make a further partition +gamma = 1.0 +# minimum sum of instance weight(hessian) needed in a child +min_child_weight = 1 +# maximum depth of a tree +max_depth = 3 + +# Task Parameters +# the number of round to do boosting +num_round = 2 +# 0 means do not save any model except the final round model +save_period = 0 +use_buffer = 0 + +# The path of training data %d is the wildcard for the rank of the data +# The idea is each process take a feature matrix with subset of columns +# +data = "train.row%d" + +# The path of validation data, used to monitor training process, here [test] sets name of the validation set +eval[test] = "../../demo/data/agaricus.txt.test" +# evaluate on training data as well each round +eval_train = 1 + +# The path of test data, need to use full data of test, try not use it, or keep an subsampled version +test:data = "../../demo/data/agaricus.txt.test" diff --git a/multi-node/row-split/mushroom-row.sh b/multi-node/row-split/mushroom-row.sh new file mode 100755 index 000000000..a98fb6b0d --- /dev/null +++ b/multi-node/row-split/mushroom-row.sh @@ -0,0 +1,19 @@ +#!/bin/bash +if [[ $# -ne 1 ]] +then + echo "Usage: nprocess" + exit -1 +fi + +rm -rf train.row* *.model +k=$1 + +# split the lib svm file into k subfiles +python splitrows.py ../../demo/data/agaricus.txt.train train $k + +# run xgboost mpi +mpirun -n $k ../../xgboost-mpi mushroom-row.conf dsplit=row nthread=1 + +# the model can be directly loaded by single machine xgboost solver, as usuall +../../xgboost mushroom-row.conf task=dump model_in=0002.model fmap=../../demo/data/featmap.txt name_dump=dump.nice.$k.txt +cat dump.nice.$k.txt diff --git a/src/sync/sync.h b/src/sync/sync.h index e728932e0..0619c3ea3 100644 --- a/src/sync/sync.h +++ b/src/sync/sync.h @@ -160,13 +160,13 @@ class SerializeReducer { inline void AllReduce(DType *sendrecvobj, size_t max_n4byte, size_t count) { buffer.resize(max_n4byte * count); for (size_t i = 0; i < count; ++i) { - utils::MemoryFixSizeBuffer fs(BeginPtr(buffer) + i * max_n4byte * 4, max_n4byte * 4); - sendrecvobj[i]->Save(fs); + utils::MemoryFixSizeBuffer fs(BeginPtr(buffer) + i * max_n4byte, max_n4byte * 4); + sendrecvobj[i].Save(fs); } handle.AllReduce(BeginPtr(buffer), max_n4byte, count); for (size_t i = 0; i < count; ++i) { - utils::MemoryFixSizeBuffer fs(BeginPtr(buffer) + i * max_n4byte * 4, max_n4byte * 4); - sendrecvobj[i]->Load(fs); + utils::MemoryFixSizeBuffer fs(BeginPtr(buffer) + i * max_n4byte, max_n4byte * 4); + sendrecvobj[i].Load(fs); } } @@ -178,12 +178,12 @@ class SerializeReducer { // temp space DType tsrc, tdst; for (int i = 0; i < len_; ++i) { - utils::MemoryFixSizeBuffer fsrc((void*)(src_) + i * nbytes, nbytes); - utils::MemoryFixSizeBuffer fdst(dst_ + i * nbytes, nbytes); + utils::MemoryFixSizeBuffer fsrc((char*)(src_) + i * nbytes, nbytes); + utils::MemoryFixSizeBuffer fdst((char*)(dst_) + i * nbytes, nbytes); tsrc.Load(fsrc); tdst.Load(fdst); // govern const check - tdst.Reduce(static_cast(tsrc)); + tdst.Reduce(static_cast(tsrc), nbytes); fdst.Seek(0); tdst.Save(fdst); } diff --git a/src/sync/sync_empty.cpp b/src/sync/sync_empty.cpp index d11d164cd..c0f956db3 100644 --- a/src/sync/sync_empty.cpp +++ b/src/sync/sync_empty.cpp @@ -38,6 +38,9 @@ void Bcast(std::string *sendrecv_data, int root) { ReduceHandle::ReduceHandle(void) : handle(NULL) {} ReduceHandle::~ReduceHandle(void) {} +int ReduceHandle::TypeSize(const MPI::Datatype &dtype) { + return 0; +} void ReduceHandle::Init(ReduceFunction redfunc, size_t type_n4bytes, bool commute) {} void ReduceHandle::AllReduce(void *sendrecvbuf, size_t type_n4bytes, size_t n4byte) {} } // namespace sync diff --git a/src/sync/sync_mpi.cpp b/src/sync/sync_mpi.cpp index b96a509a0..42b7c7ba6 100644 --- a/src/sync/sync_mpi.cpp +++ b/src/sync/sync_mpi.cpp @@ -97,9 +97,12 @@ void ReduceHandle::AllReduce(void *sendrecvbuf, size_t type_n4bytes, size_t coun utils::Assert(handle != NULL, "must intialize handle to call AllReduce"); MPI::Op *op = reinterpret_cast(handle); MPI::Datatype *dtype = reinterpret_cast(htype); - - if (created_type_n4bytes != type_n4bytes || htype == NULL) { - dtype->Free(); + if (created_type_n4bytes != type_n4bytes || dtype == NULL) { + if (dtype == NULL) { + dtype = new MPI::Datatype(); + } else { + dtype->Free(); + } *dtype = MPI::INT.Create_contiguous(type_n4bytes); dtype->Commit(); created_type_n4bytes = type_n4bytes; diff --git a/src/tree/updater.cpp b/src/tree/updater.cpp index a4cd65de0..a1349b806 100644 --- a/src/tree/updater.cpp +++ b/src/tree/updater.cpp @@ -18,7 +18,7 @@ IUpdater* CreateUpdater(const char *name) { if (!strcmp(name, "sync")) return new TreeSyncher(); if (!strcmp(name, "refresh")) return new TreeRefresher(); if (!strcmp(name, "grow_colmaker")) return new ColMaker(); - //if (!strcmp(name, "grow_histmaker")) return new CQHistMaker(); + if (!strcmp(name, "grow_histmaker")) return new CQHistMaker(); //if (!strcmp(name, "grow_skmaker")) return new SketchMaker(); if (!strcmp(name, "distcol")) return new DistColMaker(); diff --git a/src/tree/updater_histmaker-inl.hpp b/src/tree/updater_histmaker-inl.hpp index f05308ce2..76f8ccf31 100644 --- a/src/tree/updater_histmaker-inl.hpp +++ b/src/tree/updater_histmaker-inl.hpp @@ -507,7 +507,7 @@ class CQHistMaker: public HistMaker { // node statistics std::vector node_stats; // summary array - std::vector< WXQSketch::SummaryContainer> summary_array; + std::vector summary_array; // reducer for summary sync::SerializeReducer sreducer; // per node, per feature sketch @@ -517,6 +517,7 @@ class CQHistMaker: public HistMaker { template class QuantileHistMaker: public HistMaker { protected: + typedef utils::WXQuantileSketch WXQSketch; virtual void ResetPosAndPropose(const std::vector &gpair, IFMatrix *p_fmat, const BoosterInfo &info, @@ -624,9 +625,8 @@ class QuantileHistMaker: public HistMaker { } private: - typedef utils::WXQuantileSketch WXQSketch; // summary array - std::vector< WXQSketch::SummaryContainer> summary_array; + std::vector summary_array; // reducer for summary sync::SerializeReducer sreducer; // local temp column data structure diff --git a/src/utils/io.h b/src/utils/io.h index 1a748feab..97aaa94b2 100644 --- a/src/utils/io.h +++ b/src/utils/io.h @@ -106,7 +106,7 @@ struct MemoryFixSizeBuffer : public ISeekStream { } virtual ~MemoryFixSizeBuffer(void) {} virtual size_t Read(void *ptr, size_t size) { - utils::Assert(curr_ptr_ <= buffer_size_, + utils::Assert(curr_ptr_ + size <= buffer_size_, "read can not have position excceed buffer length"); size_t nread = std::min(buffer_size_ - curr_ptr_, size); if (nread != 0) memcpy(ptr, p_buffer_ + curr_ptr_, nread); diff --git a/src/utils/quantile.h b/src/utils/quantile.h index 6727c8675..8d49afc98 100644 --- a/src/utils/quantile.h +++ b/src/utils/quantile.h @@ -519,12 +519,12 @@ class QuantileSketchTemplate { /*! \brief same as summary, but use STL to backup the space */ struct SummaryContainer : public Summary { std::vector space; - explicit SummaryContainer(void) : Summary(NULL, 0) { - } - explicit SummaryContainer(const SummaryContainer &src) : Summary(NULL, src.size) { + SummaryContainer(const SummaryContainer &src) : Summary(NULL, src.size) { this->space = src.space; this->data = BeginPtr(this->space); } + SummaryContainer(void) : Summary(NULL, 0) { + } /*! \brief reserve space for summary */ inline void Reserve(size_t size) { if (size > space.size()) { @@ -576,13 +576,17 @@ class QuantileSketchTemplate { /*! \brief save the data structure into stream */ inline void Save(IStream &fo) const { fo.Write(&(this->size), sizeof(this->size)); - fo.Write(data, this->size * sizeof(Entry)); + if (this->size != 0) { + fo.Write(this->data, this->size * sizeof(Entry)); + } } /*! \brief load data structure from input stream */ inline void Load(IStream &fi) { utils::Check(fi.Read(&this->size, sizeof(this->size)) != 0, "invalid SummaryArray 1"); - this->Reserve(this->size); - utils::Check(fi.Read(data, this->size * sizeof(Entry)) != 0, "invalid SummaryArray 2"); + this->Reserve(this->size); + if (this->size != 0) { + utils::Check(fi.Read(this->data, this->size * sizeof(Entry)) != 0, "invalid SummaryArray 2"); + } } }; /*!