diff --git a/Makefile b/Makefile index 2852b7ac5..b952dbf5d 100644 --- a/Makefile +++ b/Makefile @@ -11,11 +11,11 @@ else endif # specify tensor path -BIN = xgboost -OBJ = updater.o gbm.o io.o +BIN = +OBJ = updater.o gbm.o io.o main.o MPIOBJ = sync.o -MPIBIN = test/test -SLIB = wrapper/libxgboostwrapper.so +MPIBIN = test/test xgboost +SLIB = #wrapper/libxgboostwrapper.so .PHONY: clean all python Rpack @@ -28,8 +28,9 @@ updater.o: src/tree/updater.cpp src/tree/*.hpp src/*.h src/tree/*.h gbm.o: src/gbm/gbm.cpp src/gbm/*.hpp src/gbm/*.h io.o: src/io/io.cpp src/io/*.hpp src/utils/*.h src/learner/dmatrix.h src/*.h sync.o: src/sync/sync.cpp -xgboost: src/xgboost_main.cpp src/utils/*.h src/*.h src/learner/*.hpp src/learner/*.h $(OBJ) -wrapper/libxgboostwrapper.so: wrapper/xgboost_wrapper.cpp src/utils/*.h src/*.h src/learner/*.hpp src/learner/*.h $(OBJ) +main.o: src/xgboost_main.cpp src/utils/*.h src/*.h src/learner/*.hpp src/learner/*.h +xgboost: $(OBJ) $(MPIOBJ) +#wrapper/libxgboostwrapper.so: wrapper/xgboost_wrapper.cpp src/utils/*.h src/*.h src/learner/*.hpp src/learner/*.h $(OBJ) test/test: test/test.cpp sync.o $(BIN) : diff --git a/src/sync/sync.cpp b/src/sync/sync.cpp index 705d19fae..ced5e2cb1 100644 --- a/src/sync/sync.cpp +++ b/src/sync/sync.cpp @@ -5,29 +5,6 @@ namespace xgboost { namespace sync { -// code for reduce handle -ReduceHandle::ReduceHandle(void) : handle(NULL) { -} -ReduceHandle::~ReduceHandle(void) { - if (handle != NULL) { - MPI::Op *op = reinterpret_cast(handle); - op->Free(); - delete op; - } -} -void ReduceHandle::Init(ReduceFunction redfunc, bool commute) { - utils::Assert(handle == NULL, "cannot initialize reduce handle twice"); - MPI::Op *op = new MPI::Op(); - MPI::User_function *pf = reinterpret_cast(redfunc); - op->Init(pf, commute); - handle = op; -} -void ReduceHandle::AllReduce(void *sendrecvbuf, size_t n4byte) { - utils::Assert(handle != NULL, "must intialize handle to call AllReduce"); - MPI::Op *op = reinterpret_cast(handle); - MPI::COMM_WORLD.Allreduce(MPI_IN_PLACE, sendrecvbuf, n4byte, MPI_INT, *op); -} - int GetRank(void) { return MPI::COMM_WORLD.Get_rank(); } @@ -57,5 +34,37 @@ void AllReduce(float *sendrecvbuf, int count, ReduceOp op) { AllReduce_(sendrecvbuf, count, MPI::FLOAT, op); } +void Bcast(std::string *sendrecv_data, int root) { + unsigned len = static_cast(sendrecv_data->length()); + MPI::COMM_WORLD.Bcast(&len, 1, MPI::UNSIGNED, root); + sendrecv_data->resize(len); + if (len != 0) { + MPI::COMM_WORLD.Bcast(&(*sendrecv_data)[0], len, MPI::CHAR, root); + } +} + +// code for reduce handle +ReduceHandle::ReduceHandle(void) : handle(NULL) { +} +ReduceHandle::~ReduceHandle(void) { + if (handle != NULL) { + MPI::Op *op = reinterpret_cast(handle); + op->Free(); + delete op; + } +} +void ReduceHandle::Init(ReduceFunction redfunc, bool commute) { + utils::Assert(handle == NULL, "cannot initialize reduce handle twice"); + MPI::Op *op = new MPI::Op(); + MPI::User_function *pf = reinterpret_cast(redfunc); + op->Init(pf, commute); + handle = op; +} +void ReduceHandle::AllReduce(void *sendrecvbuf, size_t n4byte) { + utils::Assert(handle != NULL, "must intialize handle to call AllReduce"); + MPI::Op *op = reinterpret_cast(handle); + MPI::COMM_WORLD.Allreduce(MPI_IN_PLACE, sendrecvbuf, n4byte, MPI_INT, *op); +} + } // namespace sync } // namespace xgboost diff --git a/src/sync/sync.h b/src/sync/sync.h index 57c27c1d8..cf82597e0 100644 --- a/src/sync/sync.h +++ b/src/sync/sync.h @@ -18,11 +18,39 @@ enum ReduceOp { kBitwiseOR }; -typedef void (ReduceFunction) (const void *src, void *dst, int len); +/*! \brief get rank of current process */ +int GetRank(void); +/*! \brief intiialize the synchronization module */ +void Init(int argc, char *argv[]); +/*! \brief finalize syncrhonization module */ +void Finalize(void); -/* !\brief handle for customized reducer */ +/*! + * \brief in-place all reduce operation + * \param sendrecvbuf the in place send-recv buffer + * \param count count of data + * \param op reduction function + */ +template +void AllReduce(DType *sendrecvbuf, int count, ReduceOp op); + +/*! + * \brief broadcast an std::string to all others from root + * \param sendrecv_data the pointer to send or recive buffer, + * receive buffer does not need to be pre-allocated + * and string will be resized to correct length + * \param root the root of process + */ +void Bcast(std::string *sendrecv_data, int root); + +/*! + * \brief handle for customized reducer + * user do not need to use this, used Reducer instead + */ class ReduceHandle { public: + // reduce function + typedef void (ReduceFunction) (const void *src, void *dst, int len); // constructor ReduceHandle(void); // destructor @@ -41,22 +69,8 @@ class ReduceHandle { void *handle; }; -/*! \brief get rank of current process */ -int GetRank(void); -/*! \brief intiialize the synchronization module */ -void Init(int argc, char *argv[]); -/*! \brief finalize syncrhonization module */ -void Finalize(void); +// ----- extensions for ease of use ------ /*! - * \brief in-place all reduce operation - * \param sendrecvbuf the in place send-recv buffer - * \param count count of data - * \param op reduction function - */ -template -void AllReduce(DType *sendrecvbuf, int count, ReduceOp op); - -/*! * \brief template class to make customized reduce and all reduce easy * Do not use reducer directly in the function you call Finalize, because the destructor can happen after Finalize * \tparam DType data type that to be reduced diff --git a/src/tree/model.h b/src/tree/model.h index 8049a1608..dbc35b3b4 100644 --- a/src/tree/model.h +++ b/src/tree/model.h @@ -110,6 +110,10 @@ class TreeModel { inline bool is_left_child(void) const { return (parent_ & (1U << 31)) != 0; } + /*! \brief whether this node is deleted */ + inline bool is_deleted(void) const { + return sindex_ == std::numeric_limits::max(); + } /*! \brief whether current node is root */ inline bool is_root(void) const { return parent_ == -1; @@ -144,7 +148,11 @@ class TreeModel { this->cleft_ = -1; this->cright_ = right; } - + /*! \brief mark that this node is deleted */ + inline void mark_delete(void) { + this->sindex_ = std::numeric_limits::max(); + } + private: friend class TreeModel; /*! @@ -197,11 +205,11 @@ class TreeModel { leaf_vector.resize(param.num_nodes * param.size_leaf_vector); return nd; } - // delete a tree node + // delete a tree node, keep the parent field to allow trace back inline void DeleteNode(int nid) { utils::Assert(nid >= param.num_roots, "can not delete root"); deleted_nodes.push_back(nid); - nodes[nid].set_parent(-1); + nodes[nid].mark_delete(); ++param.num_deleted; } diff --git a/src/tree/param.h b/src/tree/param.h index 4a7c790a6..cf646a76e 100644 --- a/src/tree/param.h +++ b/src/tree/param.h @@ -345,6 +345,10 @@ struct SplitEntry{ return false; } } + /*! \brief same as update, used by AllReduce*/ + inline void Reduce(const SplitEntry &e) { + this->Update(e); + } /*!\return feature index to split on */ inline unsigned split_index(void) const { return sindex & ((1U << 31) - 1U); diff --git a/src/tree/updater_colmaker-inl.hpp b/src/tree/updater_colmaker-inl.hpp index 596c8c8f5..347326fe7 100644 --- a/src/tree/updater_colmaker-inl.hpp +++ b/src/tree/updater_colmaker-inl.hpp @@ -486,13 +486,17 @@ class ColMaker: public IUpdater { #pragma omp parallel for schedule(static) for (bst_omp_uint i = 0; i < ndata; ++i) { const bst_uint ridx = rowset[i]; - const int nid = position[ridx]; - if (nid >= 0) { - if (tree[nid].is_leaf()) { - position[ridx] = - nid - 1; + int nid = position[ridx]; + if (nid < 0) nid = ~nid; + if (tree[nid].is_leaf()) { + position[ridx] = ~nid; + } else { + // push to default branch, correct latter + int pid = tree[nid].default_left() ? tree[nid].cleft(): tree[nid].cright(); + if (position[ridx] < 0) { + position[ridx] = ~pid; } else { - // push to default branch, correct latter - position[ridx] = tree[nid].default_left() ? tree[nid].cleft(): tree[nid].cright(); + position[ridx] = pid; } } } @@ -535,7 +539,8 @@ class ColMaker: public IUpdater { const bst_uint ridx = col[j].index; const float fvalue = col[j].fvalue; int nid = position[ridx]; - if (nid < 0) continue; + if (nid < 0) nid = ~nid; + // go back to parent, correct those who are not default nid = tree[nid].parent(); if (tree[nid].split_index() == fid) { diff --git a/src/tree/updater_distcol-inl.hpp b/src/tree/updater_distcol-inl.hpp index f5d37c1fc..aae084561 100644 --- a/src/tree/updater_distcol-inl.hpp +++ b/src/tree/updater_distcol-inl.hpp @@ -7,7 +7,10 @@ * \author Tianqi Chen */ #include "../utils/bitmap.h" +#include "../utils/io.h" +#include "../sync/sync.h" #include "./updater_colmaker-inl.hpp" +#include "./updater_prune-inl.hpp" namespace xgboost { namespace tree { @@ -19,6 +22,7 @@ class DistColMaker : public ColMaker { // set training parameter virtual void SetParam(const char *name, const char *val) { param.SetParam(name, val); + pruner.SetParam(name, val); } virtual void Update(const std::vector &gpair, IFMatrix *p_fmat, @@ -26,15 +30,46 @@ class DistColMaker : public ColMaker { const std::vector &trees) { TStats::CheckInfo(info); utils::Check(trees.size() == 1, "DistColMaker: only support one tree at a time"); + // build the tree builder.Update(gpair, p_fmat, info, trees[0]); + // prune the tree + pruner.Update(gpair, p_fmat, info, trees); + this->SyncTrees(trees[0]); + // update position after the tree is pruned + builder.UpdatePosition(p_fmat, *trees[0]); } + private: + inline void SyncTrees(RegTree *tree) { + std::string s_model; + utils::MemoryBufferStream fs(&s_model); + int rank = sync::GetRank(); + if (rank == 0) { + tree->SaveModel(fs); + sync::Bcast(&s_model, 0); + } else { + sync::Bcast(&s_model, 0); + tree->LoadModel(fs); + } + } struct Builder : public ColMaker::Builder { public: Builder(const TrainParam ¶m) : ColMaker::Builder(param) { } - protected: + inline void UpdatePosition(IFMatrix *p_fmat, const RegTree &tree) { + const std::vector &rowset = p_fmat->buffered_rowset(); + const bst_omp_uint ndata = static_cast(rowset.size()); + #pragma omp parallel for schedule(static) + for (bst_omp_uint i = 0; i < ndata; ++i) { + const bst_uint ridx = rowset[i]; + int nid = this->position[ridx]; + if (nid < 0) { + + } + } + } + protected: virtual void SetNonDefaultPosition(const std::vector &qexpand, IFMatrix *p_fmat, const RegTree &tree) { // step 2, classify the non-default data into right places @@ -80,8 +115,8 @@ class DistColMaker : public ColMaker { } } // communicate bitmap - //sync::AllReduce(); - const std::vector &rowset = p_fmat->buffered_rowset(); + sync::AllReduce(BeginPtr(bitmap.data), bitmap.data.size(), sync::kBitwiseOR); + const std::vector &rowset = p_fmat->buffered_rowset(); // get the new position const bst_omp_uint ndata = static_cast(rowset.size()); #pragma omp parallel for schedule(static) @@ -100,19 +135,29 @@ class DistColMaker : public ColMaker { } // synchronize the best solution of each node virtual void SyncBestSolution(const std::vector &qexpand) { + std::vector vec; for (size_t i = 0; i < qexpand.size(); ++i) { const int nid = qexpand[i]; for (int tid = 0; tid < this->nthread; ++tid) { this->snode[nid].best.Update(this->stemp[tid][nid].best); } + vec.push_back(this->snode[nid].best); } // communicate best solution - // sync::AllReduce + reducer.AllReduce(BeginPtr(vec), vec.size()); + // assign solution back + for (size_t i = 0; i < qexpand.size(); ++i) { + const int nid = qexpand[i]; + this->snode[nid].best = vec[i]; + } } private: utils::BitMap bitmap; + sync::Reducer reducer; }; + // we directly introduce pruner here + TreePruner pruner; // training parameter TrainParam param; // pointer to the builder diff --git a/src/utils/io.h b/src/utils/io.h index d4746681a..7dd550dc8 100644 --- a/src/utils/io.h +++ b/src/utils/io.h @@ -92,11 +92,49 @@ class IStream { class ISeekStream: public IStream { public: /*! \brief seek to certain position of the file */ - virtual void Seek(long pos) = 0; + virtual void Seek(size_t pos) = 0; /*! \brief tell the position of the stream */ - virtual long Tell(void) = 0; + virtual size_t Tell(void) = 0; }; +/*! \brief a in memory buffer that can be read and write as stream interface */ +struct MemoryBufferStream : public ISeekStream { + public: + MemoryBufferStream(std::string *p_buffer) + : p_buffer_(p_buffer) { + curr_ptr_ = 0; + } + virtual ~MemoryBufferStream(void) {} + virtual size_t Read(void *ptr, size_t size) { + utils::Assert(curr_ptr_ <= p_buffer_->length(), + "read can not have position excceed buffer length"); + size_t nread = std::min(p_buffer_->length() - curr_ptr_, size); + if (nread != 0) memcpy(ptr, &(*p_buffer_)[0] + curr_ptr_, nread); + curr_ptr_ += nread; + return nread; + } + virtual void Write(const void *ptr, size_t size) { + if (size == 0) return; + if (curr_ptr_ + size > p_buffer_->length()) { + p_buffer_->resize(curr_ptr_+size); + } + memcpy(&(*p_buffer_)[0] + curr_ptr_, ptr, size); + curr_ptr_ += size; + } + virtual void Seek(size_t pos) { + curr_ptr_ = static_cast(pos); + } + virtual size_t Tell(void) { + return curr_ptr_; + } + + private: + /*! \brief in memory buffer */ + std::string *p_buffer_; + /*! \brief current pointer */ + size_t curr_ptr_; +}; // class MemoryBufferStream + /*! \brief implementation of file i/o stream */ class FileStream : public ISeekStream { public: @@ -110,10 +148,10 @@ class FileStream : public ISeekStream { virtual void Write(const void *ptr, size_t size) { std::fwrite(ptr, size, 1, fp); } - virtual void Seek(long pos) { + virtual void Seek(size_t pos) { std::fseek(fp, pos, SEEK_SET); } - virtual long Tell(void) { + virtual size_t Tell(void) { return std::ftell(fp); } inline void Close(void) {