From f2577fec862ae0f7bf0897586b42814d0cf14215 Mon Sep 17 00:00:00 2001 From: tqchen Date: Wed, 15 Oct 2014 21:39:42 -0700 Subject: [PATCH] intial version of sync wrapper --- Makefile | 16 ++++++-- src/sync/sync.cpp | 61 ++++++++++++++++++++++++++++++ src/sync/sync.h | 95 ++++++++++++++++++++++++++++++++++++++++++++--- 3 files changed, 164 insertions(+), 8 deletions(-) create mode 100644 src/sync/sync.cpp diff --git a/Makefile b/Makefile index 3230661d4..2852b7ac5 100644 --- a/Makefile +++ b/Makefile @@ -1,8 +1,8 @@ export CC = gcc export CXX = g++ +export MPICXX = mpicxx export LDFLAGS= -pthread -lm - -export CFLAGS = -Wall -O3 -msse2 -Wno-unknown-pragmas -fPIC -pedantic +export CFLAGS = -Wall -O3 -msse2 -Wno-unknown-pragmas -fPIC ifeq ($(no_omp),1) CFLAGS += -DDISABLE_OPENMP @@ -13,11 +13,13 @@ endif # specify tensor path BIN = xgboost OBJ = updater.o gbm.o io.o +MPIOBJ = sync.o +MPIBIN = test/test SLIB = wrapper/libxgboostwrapper.so .PHONY: clean all python Rpack -all: $(BIN) $(OBJ) $(SLIB) +all: $(BIN) $(OBJ) $(SLIB) $(MPIOBJ) $(MPIBIN) python: wrapper/libxgboostwrapper.so # now the wrapper takes in two files. io and wrapper part @@ -25,8 +27,10 @@ wrapper/libxgboostwrapper.so: wrapper/xgboost_wrapper.cpp $(OBJ) updater.o: src/tree/updater.cpp src/tree/*.hpp src/*.h src/tree/*.h gbm.o: src/gbm/gbm.cpp src/gbm/*.hpp src/gbm/*.h io.o: src/io/io.cpp src/io/*.hpp src/utils/*.h src/learner/dmatrix.h src/*.h +sync.o: src/sync/sync.cpp xgboost: src/xgboost_main.cpp src/utils/*.h src/*.h src/learner/*.hpp src/learner/*.h $(OBJ) wrapper/libxgboostwrapper.so: wrapper/xgboost_wrapper.cpp src/utils/*.h src/*.h src/learner/*.hpp src/learner/*.h $(OBJ) +test/test: test/test.cpp sync.o $(BIN) : $(CXX) $(CFLAGS) $(LDFLAGS) -o $@ $(filter %.cpp %.o %.c, $^) @@ -37,6 +41,12 @@ $(SLIB) : $(OBJ) : $(CXX) -c $(CFLAGS) -o $@ $(firstword $(filter %.cpp %.c, $^) ) +$(MPIOBJ) : + $(MPICXX) -c $(CFLAGS) -o $@ $(firstword $(filter %.cpp %.c, $^) ) + +$(MPIBIN) : + $(MPICXX) $(CFLAGS) $(LDFLAGS) -o $@ $(filter %.cpp %.o %.c, $^) + install: cp -f -r $(BIN) $(INSTALL_PATH) diff --git a/src/sync/sync.cpp b/src/sync/sync.cpp new file mode 100644 index 000000000..705d19fae --- /dev/null +++ b/src/sync/sync.cpp @@ -0,0 +1,61 @@ +#include "./sync.h" +#include "../utils/utils.h" +#include "mpi.h" + +namespace xgboost { +namespace sync { + +// code for reduce handle +ReduceHandle::ReduceHandle(void) : handle(NULL) { +} +ReduceHandle::~ReduceHandle(void) { + if (handle != NULL) { + MPI::Op *op = reinterpret_cast(handle); + op->Free(); + delete op; + } +} +void ReduceHandle::Init(ReduceFunction redfunc, bool commute) { + utils::Assert(handle == NULL, "cannot initialize reduce handle twice"); + MPI::Op *op = new MPI::Op(); + MPI::User_function *pf = reinterpret_cast(redfunc); + op->Init(pf, commute); + handle = op; +} +void ReduceHandle::AllReduce(void *sendrecvbuf, size_t n4byte) { + utils::Assert(handle != NULL, "must intialize handle to call AllReduce"); + MPI::Op *op = reinterpret_cast(handle); + MPI::COMM_WORLD.Allreduce(MPI_IN_PLACE, sendrecvbuf, n4byte, MPI_INT, *op); +} + +int GetRank(void) { + return MPI::COMM_WORLD.Get_rank(); +} + +void Init(int argc, char *argv[]) { + MPI::Init(argc, argv); +} + +void Finalize(void) { + MPI::Finalize(); +} + +void AllReduce_(void *sendrecvbuf, int count, const MPI::Datatype &dtype, ReduceOp op) { + switch(op) { + case kBitwiseOR: MPI::COMM_WORLD.Allreduce(MPI_IN_PLACE, sendrecvbuf, count, dtype, MPI::BOR); return; + case kSum: MPI::COMM_WORLD.Allreduce(MPI_IN_PLACE, sendrecvbuf, count, dtype, MPI::SUM); return; + } +} + +template<> +void AllReduce(uint32_t *sendrecvbuf, int count, ReduceOp op) { + AllReduce_(sendrecvbuf, count, MPI::UNSIGNED, op); +} + +template<> +void AllReduce(float *sendrecvbuf, int count, ReduceOp op) { + AllReduce_(sendrecvbuf, count, MPI::FLOAT, op); +} + +} // namespace sync +} // namespace xgboost diff --git a/src/sync/sync.h b/src/sync/sync.h index 1d9be719c..0548a3c80 100644 --- a/src/sync/sync.h +++ b/src/sync/sync.h @@ -5,14 +5,99 @@ * \brief interface to do synchronization * \author Tianqi Chen */ +#include +#include +#include "../utils/utils.h" + namespace xgboost { +/*! \brief syncrhonizer module that minimum wraps MPI */ namespace sync { -/*! - * \brief synchronization context interface of xgboost, - * will be provided as a singleton - */ -class IContext { +/*! \brief reduce operator supported */ +enum ReduceOp { + kBitwiseOR, + kSum +}; + +typedef void (ReduceFunction) (const void *src, void *dst, int len); + +/* !\brief handle for customized reducer */ +class ReduceHandle { + public: + // constructor + ReduceHandle(void); + // destructor + ~ReduceHandle(void); + // initialize the reduce function + void Init(ReduceFunction redfunc, bool commute = true); + /*! + * \brief customized in-place all reduce operation + * \param sendrecvbuf the in place send-recv buffer + * \param n4bytes number of nbytes send through all reduce + */ + void AllReduce(void *sendrecvbuf, size_t n4bytes); + private: + // handle data field + void *handle; +}; + +/*! \brief get rank of current process */ +int GetRank(void); +/*! \brief intiialize the synchronization module */ +void Init(int argc, char *argv[]); +/*! \brief finalize syncrhonization module */ +void Finalize(void); +/*! + * \brief in-place all reduce operation + * \param sendrecvbuf the in place send-recv buffer + * \param count count of data + * \param op reduction function + */ +template +void AllReduce(DType *sendrecvbuf, int count, ReduceOp op); + +/*! + * \brief template class to make customized reduce and all reduce easy + * Do not use reducer directly in the function you call Finalize, because the destructor can happen after Finalize + * \tparam DType data type that to be reduced + * DType must be a struct, with no pointer, and contains a function Reduce(const DType &d); + */ +template +class Reducer { + public: + Reducer(void) { + handle.Init(ReduceInner); + utils::Assert(sizeof(DType) % sizeof(int) == 0, "struct must be multiple of int"); + } + /*! + * \brief customized in-place all reduce operation + * \param sendrecvbuf the in place send-recv buffer + * \param bytes number of 4bytes send through all reduce + * \param reducer the reducer function + */ + inline void AllReduce(DType *sendrecvbuf, int count) { + handle.AllReduce(sendrecvbuf, count * kUnit); + } + + private: + // unit size + static const size_t kUnit = sizeof(DType) / sizeof(int); + // inner implementation of reducer + inline static void ReduceInner(const void *src_, void *dst_, int len_) { + const int *psrc = reinterpret_cast(src_); + int *pdst = reinterpret_cast(dst_); + DType tdst, tsrc; + utils::Assert(len_ % kUnit == 0, "length not divide by size"); + for (size_t i = 0; i < len_; i += kUnit) { + // use memcpy to avoid alignment issue + std::memcpy(&tdst, pdst + i, sizeof(tdst)); + std::memcpy(&tsrc, psrc + i, sizeof(tsrc)); + tdst.Reduce(tsrc); + std::memcpy(pdst + i, &tdst, sizeof(tdst)); + } + } + // function handle + ReduceHandle handle; }; } // namespace sync