From e72a869fd18e6fea41ad02ba925e4ab3f82c8b84 Mon Sep 17 00:00:00 2001 From: tqchen Date: Fri, 19 Dec 2014 20:57:53 -0800 Subject: [PATCH] add complex reducer in --- src/engine.cc | 22 ++++++++++++++ src/engine.h | 42 +++++++++++++++++++++++++++ src/engine_empty.cc | 14 +++++++++ src/engine_mpi.cc | 54 ++++++++++++++++++++++++++++++++++ src/rabit-inl.h | 67 ++++++++++++++++++++++++++++++++++++++++++ src/rabit.h | 71 +++++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 270 insertions(+) diff --git a/src/engine.cc b/src/engine.cc index cc6a48745..d6d6c92b6 100644 --- a/src/engine.cc +++ b/src/engine.cc @@ -48,5 +48,27 @@ void Allreduce_(void *sendrecvbuf, void *prepare_arg) { GetEngine()->Allreduce(sendrecvbuf, type_nbytes, count, red, prepare_fun, prepare_arg); } + +// code for reduce handle +ReduceHandle::ReduceHandle(void) : handle_(NULL), htype_(NULL) { +} +ReduceHandle::~ReduceHandle(void) {} + +int ReduceHandle::TypeSize(const MPI::Datatype &dtype) { + return static_cast(dtype.type_size); +} +void ReduceHandle::Init(IEngine::ReduceFunction redfunc, size_t type_nbytes) { + utils::Assert(handle_ == NULL, "cannot initialize reduce handle twice"); + handle_ = reinterpret_cast(redfunc); +} +void ReduceHandle::Allreduce(void *sendrecvbuf, + size_t type_nbytes, size_t count, + IEngine::PreprocFunction prepare_fun, + void *prepare_arg) { + utils::Assert(handle_ != NULL, "must intialize handle to call AllReduce"); + GetEngine()->Allreduce(sendrecvbuf, type_nbytes, count, + reinterpret_cast(handle_), + prepare_fun, prepare_arg); +} } // namespace engine } // namespace rabit diff --git a/src/engine.h b/src/engine.h index 891290ae0..03cd8e44a 100644 --- a/src/engine.h +++ b/src/engine.h @@ -177,6 +177,48 @@ void Allreduce_(void *sendrecvbuf, mpi::OpType op, IEngine::PreprocFunction prepare_fun = NULL, void *prepare_arg = NULL); + +/*! + * \brief handle for customized reducer, used to handle customized reduce + * this class is mainly created for compatiblity issue with MPI's customized reduce + */ +class ReduceHandle { + public: + // constructor + ReduceHandle(void); + // destructor + ~ReduceHandle(void); + /*! + * \brief initialize the reduce function, + * with the type the reduce function need to deal with + * the reduce function MUST be communicative + */ + void Init(IEngine::ReduceFunction redfunc, size_t type_nbytes); + /*! + * \brief customized in-place all reduce operation + * \param sendrecvbuf the in place send-recv buffer + * \param type_n4bytes unit size of the type, in terms of 4bytes + * \param count number of elements to send + * \param prepare_func Lazy preprocessing function, lazy prepare_fun(prepare_arg) + * will be called by the function before performing Allreduce, to intialize the data in sendrecvbuf_. + * If the result of Allreduce can be recovered directly, then prepare_func will NOT be called + * \param prepare_arg argument used to passed into the lazy preprocessing function + */ + void Allreduce(void *sendrecvbuf, + size_t type_nbytes, size_t count, + IEngine::PreprocFunction prepare_fun = NULL, + void *prepare_arg = NULL); + /*! \return the number of bytes occupied by the type */ + static int TypeSize(const MPI::Datatype &dtype); + + private: + // handle data field + void *handle_; + // handle to the type field + void *htype_; + // the created type in 4 bytes + size_t created_type_nbytes_; +}; } // namespace engine } // namespace rabit #endif // RABIT_ENGINE_H diff --git a/src/engine_empty.cc b/src/engine_empty.cc index a2cbd2358..be37e3a7a 100644 --- a/src/engine_empty.cc +++ b/src/engine_empty.cc @@ -86,5 +86,19 @@ void Allreduce_(void *sendrecvbuf, IEngine::PreprocFunction prepare_fun, void *prepare_arg) { } + +// code for reduce handle +ReduceHandle::ReduceHandle(void) : handle_(NULL), htype_(NULL) { +} +ReduceHandle::~ReduceHandle(void) {} + +int ReduceHandle::TypeSize(const MPI::Datatype &dtype) { + return 0; +} +void ReduceHandle::Init(IEngine::ReduceFunction redfunc, size_t type_nbytes) {} +void ReduceHandle::Allreduce(void *sendrecvbuf, + size_t type_nbytes, size_t count, + IEngine::PreprocFunction prepare_fun, + void *prepare_arg) {} } // namespace engine } // namespace rabit diff --git a/src/engine_mpi.cc b/src/engine_mpi.cc index 7bf1fa2b6..46867d3cc 100644 --- a/src/engine_mpi.cc +++ b/src/engine_mpi.cc @@ -124,5 +124,59 @@ void Allreduce_(void *sendrecvbuf, if (prepare_fun != NULL) prepare_fun(prepare_arg); MPI::COMM_WORLD.Allreduce(MPI_IN_PLACE, sendrecvbuf, count, GetType(dtype), GetOp(op)); } + +// code for reduce handle +ReduceHandle::ReduceHandle(void) : handle_(NULL), htype_(NULL) { +} +ReduceHandle::~ReduceHandle(void) { + if (handle_ != NULL) { + MPI::Op *op = reinterpret_cast(handle_); + op->Free(); + delete op; + } + if (htype_ != NULL) { + MPI::Datatype *dtype = reinterpret_cast(htype_); + dtype->Free(); + delete dtype; + } +} +int ReduceHandle::TypeSize(const MPI::Datatype &dtype) { + return dtype.Get_size(); +} +void ReduceHandle::Init(IEngine::ReduceFunction redfunc, size_t type_nbytes) { + utils::Assert(handle_ == NULL, "cannot initialize reduce handle twice"); + if (type_nbytes != 0) { + MPI::Datatype *dtype = new MPI::Datatype(); + *dtype = MPI::CHAR.Create_contiguous(type_nbytes); + dtype->Commit(); + created_type_nbytes_ = type_nbytes; + htype_ = dtype; + } + + MPI::Op *op = new MPI::Op(); + MPI::User_function *pf = redfunc; + op->Init(pf, true); + handle_ = op; +} +void ReduceHandle::Allreduce(void *sendrecvbuf, + size_t type_nbytes, size_t count, + IEngine::PreprocFunction prepare_fun, + void *prepare_arg) { + utils::Assert(handle_ != NULL, "must intialize handle to call AllReduce"); + MPI::Op *op = reinterpret_cast(handle_); + MPI::Datatype *dtype = reinterpret_cast(htype_); + if (created_type_nbytes_ != type_nbytes || dtype == NULL) { + if (dtype == NULL) { + dtype = new MPI::Datatype(); + } else { + dtype->Free(); + } + *dtype = MPI::CHAR.Create_contiguous(type_nbytes); + dtype->Commit(); + created_type_nbytes_ = type_nbytes; + } + if (prepare_fun != NULL) prepare_fun(prepare_arg); + MPI::COMM_WORLD.Allreduce(MPI_IN_PLACE, sendrecvbuf, count, *dtype, *op); +} } // namespace engine } // namespace rabit diff --git a/src/rabit-inl.h b/src/rabit-inl.h index 8d681d32c..679f6d49e 100644 --- a/src/rabit-inl.h +++ b/src/rabit-inl.h @@ -8,6 +8,7 @@ #define RABIT_RABIT_INL_H // use engine for implementation #include "./engine.h" +#include "./io.h" #include "./utils.h" namespace rabit { @@ -170,5 +171,71 @@ inline void CheckPoint(const ISerializable *global_model, inline int VersionNumber(void) { return engine::GetEngine()->VersionNumber(); } +// --------------------------------- +// Code to handle customized Reduce +// --------------------------------- +// function to perform reduction for Reducer +template +inline void Reducer::ReduceFunc(const void *src_, void *dst_, int len_, const MPI::Datatype &dtype) { + const size_t kUnit = sizeof(DType); + const char *psrc = reinterpret_cast(src_); + char *pdst = reinterpret_cast(dst_); + DType tdst, tsrc; + for (size_t i = 0; i < len_; ++i) { + // use memcpy to avoid alignment issue + std::memcpy(&tdst, pdst + i * kUnit, sizeof(tdst)); + std::memcpy(&tsrc, psrc + i * kUnit, sizeof(tsrc)); + tdst.Reduce(tsrc); + std::memcpy(pdst + i * kUnit, &tdst, sizeof(tdst)); + } +} +template +inline Reducer::Reducer(void) { + handle_.Init(Reducer::ReduceFunc, sizeof(DType)); +} +template +inline void Reducer::Allreduce(DType *sendrecvbuf, size_t count, + void (*prepare_fun)(void *arg), + void *prepare_arg) { + handle_.Allreduce(sendrecvbuf, sizeof(DType), count, prepare_fun, prepare_arg); +} +// function to perform reduction for SerializeReducer +template +inline void +SerializeReducer::ReduceFunc(const void *src_, void *dst_, int len_, const MPI::Datatype &dtype) { + int nbytes = engine::ReduceHandle::TypeSize(dtype); + // temp space + DType tsrc, tdst; + for (int i = 0; i < len_; ++i) { + utils::MemoryFixSizeBuffer fsrc((char*)(src_) + i * nbytes, nbytes); + utils::MemoryFixSizeBuffer fdst((char*)(dst_) + i * nbytes, nbytes); + tsrc.Load(fsrc); + tdst.Load(fdst); + // govern const check + tdst.Reduce(static_cast(tsrc), nbytes); + fdst.Seek(0); + tdst.Save(fdst); + } +} +template +inline SerializeReducer::SerializeReducer(void) { + handle_.Init(SerializeReducer::ReduceFunc, sizeof(DType)); +} +template +inline void SerializeReducer::Allreduce(DType *sendrecvobj, + size_t max_nbyte, size_t count, + void (*prepare_fun)(void *arg), + void *prepare_arg) { + buffer_.resize(max_nbyte); + for (size_t i = 0; i < count; ++i) { + utils::MemoryFixSizeBuffer fs(BeginPtr(buffer_) + i * max_nbyte, max_nbyte); + sendrecvobj[i].Save(fs); + } + handle_.Allreduce(BeginPtr(buffer_), max_nbyte, count, prepare_fun, prepare_arg); + for (size_t i = 0; i < count; ++i) { + utils::MemoryFixSizeBuffer fs(BeginPtr(buffer_) + i * max_nbyte, max_nbyte); + sendrecvobj[i].Load(fs); + } +} } // namespace rabit #endif diff --git a/src/rabit.h b/src/rabit.h index bdf80e259..316da65c9 100644 --- a/src/rabit.h +++ b/src/rabit.h @@ -183,6 +183,77 @@ inline void CheckPoint(const ISerializable *global_model, * \sa LoadCheckPoint, CheckPoint */ inline int VersionNumber(void); +// ----- extensions that allow customized reducer ------ +// helper class to do customized reduce, user do not need to know the type +namespace engine { +class ReduceHandle; +} // namespace engine +/*! + * \brief template class to make customized reduce and all reduce easy + * Do not use reducer directly in the function you call Finalize, because the destructor can happen after Finalize + * \tparam DType data type that to be reduced + * DType must be a struct, with no pointer, and contains a function Reduce(const DType &d); + */ +template +class Reducer { + public: + Reducer(void); + /*! + * \brief customized in-place all reduce operation + * \param sendrecvbuf the in place send-recv buffer + * \param count number of elements to be reduced + * \param prepare_func Lazy preprocessing function, if it is not NULL, prepare_fun(prepare_arg) + * will be called by the function before performing Allreduce, to intialize the data in sendrecvbuf_. + * If the result of Allreduce can be recovered directly, then prepare_func will NOT be called + * \param prepare_arg argument used to passed into the lazy preprocessing function + */ + inline void Allreduce(DType *sendrecvbuf, size_t count, + void (*prepare_fun)(void *arg) = NULL, + void *prepare_arg = NULL); + + private: + // inner implementation of reducer + inline static void ReduceFunc(const void *src_, void *dst_, int len_, const MPI::Datatype &dtype); + /*! \brief function handle to do reduce */ + engine::ReduceHandle handle_; +}; +/*! + * \brief template class to make customized reduce, + * this class defines complex reducer handles all the data structure that can be + * serialized/deserialzed into fixed size buffer + * Do not use reducer directly in the function you call Finalize, because the destructor can happen after Finalize + * + * \tparam DType data type that to be reduced, DType must contain following functions: + * (1) Save(IStream &fs) (2) Load(IStream &fs) (3) Reduce(const DType &d); + */ +template +class SerializeReducer { + public: + SerializeReducer(void); + /*! + * \brief customized in-place all reduce operation + * \param sendrecvobj pointer to the array of objects to be reduced + * \param max_nbyte maximum amount of memory needed to serialize each object + * this includes budget limit for intermediate and final result + * \param count number of elements to be reduced + * \param prepare_func Lazy preprocessing function, if it is not NULL, prepare_fun(prepare_arg) + * will be called by the function before performing Allreduce, to intialize the data in sendrecvbuf_. + * If the result of Allreduce can be recovered directly, then prepare_func will NOT be called + * \param prepare_arg argument used to passed into the lazy preprocessing function + */ + inline void Allreduce(DType *sendrecvobj, + size_t max_nbyte, size_t count, + void (*prepare_fun)(void *arg) = NULL, + void *prepare_arg = NULL); + + private: + // inner implementation of reducer + inline static void ReduceFunc(const void *src_, void *dst_, int len_, const MPI::Datatype &dtype); + /*! \brief function handle to do reduce */ + engine::ReduceHandle handle_; + /*! \brief temporal buffer used to do reduce*/ + std::string buffer_; +}; } // namespace rabit // implementation of template functions #include "./rabit-inl.h"