From 7c3a392136d44e0f0a0da7e1d840b824a13a004b Mon Sep 17 00:00:00 2001 From: tqchen Date: Wed, 19 Nov 2014 15:28:09 -0800 Subject: [PATCH] compile --- Makefile | 2 +- multi-node/col-split/mushroom-col.sh | 2 +- src/sync/sync.h | 84 ++++++++++++++---------- src/sync/sync_empty.cpp | 4 +- src/sync/sync_mpi.cpp | 40 +++++++++--- src/tree/updater.cpp | 6 +- src/tree/updater_histmaker-inl.hpp | 36 ++++++----- src/utils/quantile.h | 95 ++++++++-------------------- 8 files changed, 136 insertions(+), 133 deletions(-) diff --git a/Makefile b/Makefile index 51b7a578a..d5fd7c394 100644 --- a/Makefile +++ b/Makefile @@ -19,7 +19,7 @@ SLIB = wrapper/libxgboostwrapper.so .PHONY: clean all mpi python Rpack -all: $(BIN) $(OBJ) $(SLIB) +all: $(BIN) $(OBJ) $(SLIB) mpi mpi: $(MPIBIN) python: wrapper/libxgboostwrapper.so diff --git a/multi-node/col-split/mushroom-col.sh b/multi-node/col-split/mushroom-col.sh index 906ace94c..63b5092ba 100755 --- a/multi-node/col-split/mushroom-col.sh +++ b/multi-node/col-split/mushroom-col.sh @@ -5,7 +5,7 @@ then exit -1 fi -rm -rf train.col* +rm -rf train.col* *.model k=$1 # split the lib svm file into k subfiles diff --git a/src/sync/sync.h b/src/sync/sync.h index fe34983ef..e728932e0 100644 --- a/src/sync/sync.h +++ b/src/sync/sync.h @@ -11,6 +11,10 @@ #include "../utils/io.h" #include +namespace MPI { +// forward delcaration of MPI::Datatype, but not include content +class Datatype; +}; namespace xgboost { /*! \brief syncrhonizer module that minimumly wraps interface of MPI */ namespace sync { @@ -62,23 +66,31 @@ void Bcast(std::string *sendrecv_data, int root); class ReduceHandle { public: // reduce function - typedef void (ReduceFunction) (const void *src, void *dst, int len); + typedef void (ReduceFunction) (const void *src, void *dst, int len, const MPI::Datatype &dtype); // constructor ReduceHandle(void); // destructor ~ReduceHandle(void); - // initialize the reduce function - void Init(ReduceFunction redfunc, bool commute = true); + /*! + * \brief initialize the reduce function, with the type the reduce function need to deal with + */ + void Init(ReduceFunction redfunc, size_t type_n4bytes, bool commute = true); /*! * \brief customized in-place all reduce operation * \param sendrecvbuf the in place send-recv buffer - * \param n4bytes number of nbytes send through all reduce + * \param type_n4bytes unit size of the type, in terms of 4bytes + * \param count number of elements to send */ - void AllReduce(void *sendrecvbuf, size_t n4bytes); - + void AllReduce(void *sendrecvbuf, size_t type_n4bytes, size_t count); + /*! \return the number of bytes occupied by the type */ + static int TypeSize(const MPI::Datatype &dtype); private: // handle data field void *handle; + // handle to the type field + void *htype; + // the created type in 4 bytes + size_t created_type_n4bytes; }; // ----- extensions for ease of use ------ @@ -92,7 +104,7 @@ template class Reducer { public: Reducer(void) { - handle.Init(ReduceInner); + handle.Init(ReduceInner, kUnit); utils::Assert(sizeof(DType) % sizeof(int) == 0, "struct must be multiple of int"); } /*! @@ -102,24 +114,23 @@ class Reducer { * \param reducer the reducer function */ inline void AllReduce(DType *sendrecvbuf, size_t count) { - handle.AllReduce(sendrecvbuf, count * kUnit); + handle.AllReduce(sendrecvbuf, kUnit, count); } private: // unit size static const size_t kUnit = sizeof(DType) / sizeof(int); // inner implementation of reducer - inline static void ReduceInner(const void *src_, void *dst_, int len_) { + inline static void ReduceInner(const void *src_, void *dst_, int len_, const MPI::Datatype &dtype) { const int *psrc = reinterpret_cast(src_); int *pdst = reinterpret_cast(dst_); DType tdst, tsrc; - utils::Assert(len_ % kUnit == 0, "length not divide by size"); - for (size_t i = 0; i < len_; i += kUnit) { + for (size_t i = 0; i < len_; ++i) { // use memcpy to avoid alignment issue - std::memcpy(&tdst, pdst + i, sizeof(tdst)); - std::memcpy(&tsrc, psrc + i, sizeof(tsrc)); + std::memcpy(&tdst, pdst + i * kUnit, sizeof(tdst)); + std::memcpy(&tsrc, psrc + i * kUnit, sizeof(tsrc)); tdst.Reduce(tsrc); - std::memcpy(pdst + i, &tdst, sizeof(tdst)); + std::memcpy(pdst + i * kUnit, &tdst, sizeof(tdst)); } } // function handle @@ -135,38 +146,47 @@ class Reducer { * (1) Save(IStream &fs) (2) Load(IStream &fs) (3) Reduce(const DType &d); */ template -class ComplexReducer { +class SerializeReducer { public: - ComplexReducer(void) { - handle.Init(ReduceInner); + SerializeReducer(void) { + handle.Init(ReduceInner, 0); } /*! - * \brief customized in-place all reduce operation + * \brief customized in-place all reduce operation * \param sendrecvobj pointer to the object to be reduced * \param max_n4byte maximum amount of memory needed in 4byte * \param reducer the reducer function */ - inline void AllReduce(DType *sendrecvobj, size_t max_n4byte) { - buffer.resize(max_n4byte); - utils::MemoryFixSizeBuffer fs(BeginPtr(buffer), max_n4byte * 4); - sendrecvobj->Save(fs); - handle.AllReduce(BeginPtr(buffer), max_n4byte); - fs.Seek(0); - sendrecvobj->Load(fs); + inline void AllReduce(DType *sendrecvobj, size_t max_n4byte, size_t count) { + buffer.resize(max_n4byte * count); + for (size_t i = 0; i < count; ++i) { + utils::MemoryFixSizeBuffer fs(BeginPtr(buffer) + i * max_n4byte * 4, max_n4byte * 4); + sendrecvobj[i]->Save(fs); + } + handle.AllReduce(BeginPtr(buffer), max_n4byte, count); + for (size_t i = 0; i < count; ++i) { + utils::MemoryFixSizeBuffer fs(BeginPtr(buffer) + i * max_n4byte * 4, max_n4byte * 4); + sendrecvobj[i]->Load(fs); + } } private: // unit size // inner implementation of reducer - inline static void ReduceInner(const void *src_, void *dst_, int len_) { - utils::MemoryFixSizeBuffer fsrc((void*)(src_), len_); - utils::MemoryFixSizeBuffer fdst(dst_, len_); + inline static void ReduceInner(const void *src_, void *dst_, int len_, const MPI::Datatype &dtype) { + int nbytes = ReduceHandle::TypeSize(dtype); // temp space DType tsrc, tdst; - tsrc.Load(fsrc); tdst.Load(fdst); - // govern const check - tdst.Reduce(static_cast(tsrc)); - tdst.Save(fdst); + for (int i = 0; i < len_; ++i) { + utils::MemoryFixSizeBuffer fsrc((void*)(src_) + i * nbytes, nbytes); + utils::MemoryFixSizeBuffer fdst(dst_ + i * nbytes, nbytes); + tsrc.Load(fsrc); + tdst.Load(fdst); + // govern const check + tdst.Reduce(static_cast(tsrc)); + fdst.Seek(0); + tdst.Save(fdst); + } } // function handle ReduceHandle handle; diff --git a/src/sync/sync_empty.cpp b/src/sync/sync_empty.cpp index a86707d61..d11d164cd 100644 --- a/src/sync/sync_empty.cpp +++ b/src/sync/sync_empty.cpp @@ -38,8 +38,8 @@ void Bcast(std::string *sendrecv_data, int root) { ReduceHandle::ReduceHandle(void) : handle(NULL) {} ReduceHandle::~ReduceHandle(void) {} -void ReduceHandle::Init(ReduceFunction redfunc, bool commute) {} -void ReduceHandle::AllReduce(void *sendrecvbuf, size_t n4byte) {} +void ReduceHandle::Init(ReduceFunction redfunc, size_t type_n4bytes, bool commute) {} +void ReduceHandle::AllReduce(void *sendrecvbuf, size_t type_n4bytes, size_t n4byte) {} } // namespace sync } // namespace xgboost diff --git a/src/sync/sync_mpi.cpp b/src/sync/sync_mpi.cpp index 45f6c3d75..b96a509a0 100644 --- a/src/sync/sync_mpi.cpp +++ b/src/sync/sync_mpi.cpp @@ -1,6 +1,7 @@ #include "./sync.h" #include "../utils/utils.h" -#include "mpi.h" +#include + // use MPI to implement sync namespace xgboost { namespace sync { @@ -60,7 +61,7 @@ void Bcast(std::string *sendrecv_data, int root) { } // code for reduce handle -ReduceHandle::ReduceHandle(void) : handle(NULL) { +ReduceHandle::ReduceHandle(void) : handle(NULL), htype(NULL) { } ReduceHandle::~ReduceHandle(void) { if (handle != NULL) { @@ -68,19 +69,42 @@ ReduceHandle::~ReduceHandle(void) { op->Free(); delete op; } + if (htype != NULL) { + MPI::Datatype *dtype = reinterpret_cast(htype); + dtype->Free(); + delete dtype; + } } -void ReduceHandle::Init(ReduceFunction redfunc, bool commute) { +int ReduceHandle::TypeSize(const MPI::Datatype &dtype) { + return dtype.Get_size(); +} +void ReduceHandle::Init(ReduceFunction redfunc, size_t type_n4bytes, bool commute) { utils::Assert(handle == NULL, "cannot initialize reduce handle twice"); + if (type_n4bytes != 0) { + MPI::Datatype *dtype = new MPI::Datatype(); + *dtype = MPI::INT.Create_contiguous(type_n4bytes); + dtype->Commit(); + created_type_n4bytes = type_n4bytes; + htype = dtype; + } + MPI::Op *op = new MPI::Op(); - MPI::User_function *pf = reinterpret_cast(redfunc); + MPI::User_function *pf = redfunc; op->Init(pf, commute); handle = op; } -void ReduceHandle::AllReduce(void *sendrecvbuf, size_t n4byte) { - utils::Assert(handle != NULL, "must intialize handle to call AllReduce"); +void ReduceHandle::AllReduce(void *sendrecvbuf, size_t type_n4bytes, size_t count) { + utils::Assert(handle != NULL, "must intialize handle to call AllReduce"); MPI::Op *op = reinterpret_cast(handle); - MPI::COMM_WORLD.Allreduce(MPI_IN_PLACE, sendrecvbuf, n4byte, MPI_INT, *op); -} + MPI::Datatype *dtype = reinterpret_cast(htype); + if (created_type_n4bytes != type_n4bytes || htype == NULL) { + dtype->Free(); + *dtype = MPI::INT.Create_contiguous(type_n4bytes); + dtype->Commit(); + created_type_n4bytes = type_n4bytes; + } + MPI::COMM_WORLD.Allreduce(MPI_IN_PLACE, sendrecvbuf, count, *dtype, *op); +} } // namespace sync } // namespace xgboost diff --git a/src/tree/updater.cpp b/src/tree/updater.cpp index a087bf9ed..a4cd65de0 100644 --- a/src/tree/updater.cpp +++ b/src/tree/updater.cpp @@ -7,7 +7,7 @@ #include "./updater_refresh-inl.hpp" #include "./updater_colmaker-inl.hpp" #include "./updater_distcol-inl.hpp" -#include "./updater_skmaker-inl.hpp" +//#include "./updater_skmaker-inl.hpp" #include "./updater_histmaker-inl.hpp" namespace xgboost { @@ -18,8 +18,8 @@ IUpdater* CreateUpdater(const char *name) { if (!strcmp(name, "sync")) return new TreeSyncher(); if (!strcmp(name, "refresh")) return new TreeRefresher(); if (!strcmp(name, "grow_colmaker")) return new ColMaker(); - if (!strcmp(name, "grow_histmaker")) return new CQHistMaker(); - if (!strcmp(name, "grow_skmaker")) return new SketchMaker(); + //if (!strcmp(name, "grow_histmaker")) return new CQHistMaker(); + //if (!strcmp(name, "grow_skmaker")) return new SketchMaker(); if (!strcmp(name, "distcol")) return new DistColMaker(); utils::Error("unknown updater:%s", name); diff --git a/src/tree/updater_histmaker-inl.hpp b/src/tree/updater_histmaker-inl.hpp index 4c0136ac8..f05308ce2 100644 --- a/src/tree/updater_histmaker-inl.hpp +++ b/src/tree/updater_histmaker-inl.hpp @@ -306,6 +306,7 @@ class CQHistMaker: public HistMaker { hist.data[istart].Add(gpair, info, ridx); } }; + typedef utils::WXQuantileSketch WXQSketch; virtual void CreateHist(const std::vector &gpair, IFMatrix *p_fmat, const BoosterInfo &info, @@ -371,21 +372,22 @@ class CQHistMaker: public HistMaker { // setup maximum size unsigned max_size = this->param.max_sketch_size(); // synchronize sketch - summary_array.Init(sketchs.size(), max_size); + summary_array.resize(sketchs.size()); for (size_t i = 0; i < sketchs.size(); ++i) { utils::WXQuantileSketch::SummaryContainer out; sketchs[i].GetSummary(&out); - summary_array.Set(i, out); + summary_array[i].Reserve(max_size); + summary_array[i].SetPrune(out, max_size); } - size_t n4bytes = (summary_array.MemSize() + 3) / 4; - sreducer.AllReduce(&summary_array, n4bytes); + size_t n4bytes = (WXQSketch::SummaryContainer::CalcMemCost(max_size) + 3) / 4; + sreducer.AllReduce(BeginPtr(summary_array), n4bytes, summary_array.size()); // now we get the final result of sketch, setup the cut this->wspace.cut.clear(); this->wspace.rptr.clear(); this->wspace.rptr.push_back(0); for (size_t wid = 0; wid < this->qexpand.size(); ++wid) { for (int fid = 0; fid < tree.param.num_feature; ++fid) { - const WXQSketch::Summary a = summary_array[wid * tree.param.num_feature + fid]; + const WXQSketch::Summary &a = summary_array[wid * tree.param.num_feature + fid]; for (size_t i = 1; i < a.size; ++i) { bst_float cpt = a.data[i].value - rt_eps; if (i == 1 || cpt > this->wspace.cut.back()) { @@ -407,7 +409,7 @@ class CQHistMaker: public HistMaker { } utils::Assert(this->wspace.rptr.size() == (tree.param.num_feature + 1) * this->qexpand.size() + 1, - "cut space inconsistent"); + "cut space inconsistent"); } private: @@ -496,7 +498,6 @@ class CQHistMaker: public HistMaker { } } - typedef utils::WXQuantileSketch WXQSketch; // thread temp data std::vector< std::vector > thread_sketch; // used to hold statistics @@ -506,9 +507,9 @@ class CQHistMaker: public HistMaker { // node statistics std::vector node_stats; // summary array - WXQSketch::SummaryArray summary_array; + std::vector< WXQSketch::SummaryContainer> summary_array; // reducer for summary - sync::ComplexReducer sreducer; + sync::SerializeReducer sreducer; // per node, per feature sketch std::vector< utils::WXQuantileSketch > sketchs; }; @@ -580,23 +581,24 @@ class QuantileHistMaker: public HistMaker { } } // setup maximum size - size_t max_size = static_cast(this->param.sketch_ratio / this->param.sketch_eps); + unsigned max_size = this->param.max_sketch_size(); // synchronize sketch - summary_array.Init(sketchs.size(), max_size); + summary_array.resize(sketchs.size()); for (size_t i = 0; i < sketchs.size(); ++i) { utils::WQuantileSketch::SummaryContainer out; sketchs[i].GetSummary(&out); - summary_array.Set(i, out); + summary_array[i].Reserve(max_size); + summary_array[i].SetPrune(out, max_size); } - size_t n4bytes = (summary_array.MemSize() + 3) / 4; - sreducer.AllReduce(&summary_array, n4bytes); + size_t n4bytes = (WXQSketch::SummaryContainer::CalcMemCost(max_size) + 3) / 4; + sreducer.AllReduce(BeginPtr(summary_array), n4bytes, summary_array.size()); // now we get the final result of sketch, setup the cut this->wspace.cut.clear(); this->wspace.rptr.clear(); this->wspace.rptr.push_back(0); for (size_t wid = 0; wid < this->qexpand.size(); ++wid) { for (int fid = 0; fid < tree.param.num_feature; ++fid) { - const WXQSketch::Summary a = summary_array[wid * tree.param.num_feature + fid]; + const WXQSketch::Summary &a = summary_array[wid * tree.param.num_feature + fid]; for (size_t i = 1; i < a.size; ++i) { bst_float cpt = a.data[i].value - rt_eps; if (i == 1 || cpt > this->wspace.cut.back()) { @@ -624,9 +626,9 @@ class QuantileHistMaker: public HistMaker { private: typedef utils::WXQuantileSketch WXQSketch; // summary array - WXQSketch::SummaryArray summary_array; + std::vector< WXQSketch::SummaryContainer> summary_array; // reducer for summary - sync::ComplexReducer sreducer; + sync::SerializeReducer sreducer; // local temp column data structure std::vector col_ptr; // local storage of column data diff --git a/src/utils/quantile.h b/src/utils/quantile.h index c3cdb86c2..6727c8675 100644 --- a/src/utils/quantile.h +++ b/src/utils/quantile.h @@ -224,6 +224,12 @@ struct WQSummary { */ inline void SetCombine(const WQSummary &sa, const WQSummary &sb) { + if (sa.size == 0) { + this->CopyFrom(sb); return; + } + if (sb.size == 0) { + this->CopyFrom(sa); return; + } utils::Assert(sa.size > 0 && sb.size > 0, "invalid input for merge"); const Entry *a = sa.data, *a_end = sa.data + sa.size; const Entry *b = sb.data, *b_end = sb.data + sb.size; @@ -453,6 +459,12 @@ struct GKSummary { } inline void SetCombine(const GKSummary &sa, const GKSummary &sb) { + if (sa.size == 0) { + this->CopyFrom(sb); return; + } + if (sb.size == 0) { + this->CopyFrom(sa); return; + } utils::Assert(sa.size > 0 && sb.size > 0, "invalid input for merge"); const Entry *a = sa.data, *a_end = sa.data + sa.size; const Entry *b = sb.data, *b_end = sb.data + sb.size; @@ -537,96 +549,41 @@ class QuantileSketchTemplate { this->SetMerge(begin[0], begin[1]); } else { // recursive merge - SummaryContainer lhs, rhs; + SummaryContainer lhs, rhs; lhs.SetCombine(begin, begin + len / 2); rhs.SetCombine(begin + len / 2, end); this->Reserve(lhs.size + rhs.size); this->SetCombine(lhs, rhs); } } - }; - /*! - * \brief represent an array of summary - * each contains fixed maximum size summary - */ - class SummaryArray { - public: - /*! - * \brief intialize the SummaryArray - * \param num_summary number of summary in the array - * \param max_size maximum number of elements in each summary - */ - inline void Init(unsigned num_summary, unsigned max_size) { - this->num_summary = num_summary; - this->max_size = max_size; - sizes.resize(num_summary); - data.resize(num_summary * max_size); - } - /*! - * \brief set i-th element of array to be the src summary, - * the summary can be pruned if it does not fit into max_size - * \param the index in the array - * \param src the source summary - * \tparam the type if source summary - */ - template - inline void Set(size_t i, const TSrc &src) { - Summary dst = (*this)[i]; - dst.SetPrune(src, max_size); - this->sizes[i] = dst.size; - } - /*! - * \brief get i-th summary of the array, only use this for read purpose - */ - inline const Summary operator[](size_t i) const { - return Summary((Entry*)BeginPtr(data) + i * max_size, sizes[i]); - } /*! * \brief do elementwise combination of summary array * this[i] = combine(this[i], src[i]) for each i * \param src the source summary + * \param max_nbyte, maximum number of byte allowed in here */ - inline void Reduce(const SummaryArray &src) { - utils::Check(num_summary == src.num_summary && - max_size == src.max_size, "array shape mismatch in reduce"); + inline void Reduce(const Summary &src, size_t max_nbyte) { + this->Reserve((max_nbyte - sizeof(this->size)) / sizeof(Entry)); SummaryContainer temp; - temp.Reserve(max_size * 2); - for (unsigned i = 0; i < num_summary; ++i) { - temp.SetCombine((*this)[i], src[i]); - this->Set(i, temp); - } + temp.Reserve(this->size + src.size); + temp.SetCombine(*this, src); + this->SetPrune(temp, space.size()); } /*! \brief return the number of bytes this data structure cost in serialization */ - inline size_t MemSize(void) const { - return sizeof(num_summary) + sizeof(max_size) - + data.size() * sizeof(Entry) + sizes.size() * sizeof(unsigned); + inline static size_t CalcMemCost(size_t nentry) { + return sizeof(size_t) + sizeof(Entry) * nentry; } /*! \brief save the data structure into stream */ inline void Save(IStream &fo) const { - fo.Write(&num_summary, sizeof(num_summary)); - fo.Write(&max_size, sizeof(max_size)); - fo.Write(BeginPtr(sizes), sizes.size() * sizeof(unsigned)); - fo.Write(BeginPtr(data), data.size() * sizeof(Entry)); + fo.Write(&(this->size), sizeof(this->size)); + fo.Write(data, this->size * sizeof(Entry)); } /*! \brief load data structure from input stream */ inline void Load(IStream &fi) { - utils::Check(fi.Read(&num_summary, sizeof(num_summary)) != 0, "invalid SummaryArray"); - utils::Check(fi.Read(&max_size, sizeof(max_size)) != 0, "invalid SummaryArray"); - sizes.resize(num_summary); - data.resize(num_summary * max_size); - utils::Check(fi.Read(BeginPtr(sizes), sizes.size() * sizeof(unsigned)) != 0, "invalid SummaryArray"); - utils::Check(fi.Read(BeginPtr(data), data.size() * sizeof(Entry)) != 0, "invalid SummaryArray"); + utils::Check(fi.Read(&this->size, sizeof(this->size)) != 0, "invalid SummaryArray 1"); + this->Reserve(this->size); + utils::Check(fi.Read(data, this->size * sizeof(Entry)) != 0, "invalid SummaryArray 2"); } - - private: - /*! \brief number of summaries in the group */ - unsigned num_summary; - /*! \brief maximum size of each summary */ - unsigned max_size; - /*! \brief the current size of each summary */ - std::vector sizes; - /*! \brief the data content */ - std::vector data; }; /*! * \brief intialize the quantile sketch, given the performance specification