intial version of sync wrapper
This commit is contained in:
parent
e295128973
commit
f2577fec86
16
Makefile
16
Makefile
@ -1,8 +1,8 @@
|
|||||||
export CC = gcc
|
export CC = gcc
|
||||||
export CXX = g++
|
export CXX = g++
|
||||||
|
export MPICXX = mpicxx
|
||||||
export LDFLAGS= -pthread -lm
|
export LDFLAGS= -pthread -lm
|
||||||
|
export CFLAGS = -Wall -O3 -msse2 -Wno-unknown-pragmas -fPIC
|
||||||
export CFLAGS = -Wall -O3 -msse2 -Wno-unknown-pragmas -fPIC -pedantic
|
|
||||||
|
|
||||||
ifeq ($(no_omp),1)
|
ifeq ($(no_omp),1)
|
||||||
CFLAGS += -DDISABLE_OPENMP
|
CFLAGS += -DDISABLE_OPENMP
|
||||||
@ -13,11 +13,13 @@ endif
|
|||||||
# specify tensor path
|
# specify tensor path
|
||||||
BIN = xgboost
|
BIN = xgboost
|
||||||
OBJ = updater.o gbm.o io.o
|
OBJ = updater.o gbm.o io.o
|
||||||
|
MPIOBJ = sync.o
|
||||||
|
MPIBIN = test/test
|
||||||
SLIB = wrapper/libxgboostwrapper.so
|
SLIB = wrapper/libxgboostwrapper.so
|
||||||
|
|
||||||
.PHONY: clean all python Rpack
|
.PHONY: clean all python Rpack
|
||||||
|
|
||||||
all: $(BIN) $(OBJ) $(SLIB)
|
all: $(BIN) $(OBJ) $(SLIB) $(MPIOBJ) $(MPIBIN)
|
||||||
|
|
||||||
python: wrapper/libxgboostwrapper.so
|
python: wrapper/libxgboostwrapper.so
|
||||||
# now the wrapper takes in two files. io and wrapper part
|
# now the wrapper takes in two files. io and wrapper part
|
||||||
@ -25,8 +27,10 @@ wrapper/libxgboostwrapper.so: wrapper/xgboost_wrapper.cpp $(OBJ)
|
|||||||
updater.o: src/tree/updater.cpp src/tree/*.hpp src/*.h src/tree/*.h
|
updater.o: src/tree/updater.cpp src/tree/*.hpp src/*.h src/tree/*.h
|
||||||
gbm.o: src/gbm/gbm.cpp src/gbm/*.hpp src/gbm/*.h
|
gbm.o: src/gbm/gbm.cpp src/gbm/*.hpp src/gbm/*.h
|
||||||
io.o: src/io/io.cpp src/io/*.hpp src/utils/*.h src/learner/dmatrix.h src/*.h
|
io.o: src/io/io.cpp src/io/*.hpp src/utils/*.h src/learner/dmatrix.h src/*.h
|
||||||
|
sync.o: src/sync/sync.cpp
|
||||||
xgboost: src/xgboost_main.cpp src/utils/*.h src/*.h src/learner/*.hpp src/learner/*.h $(OBJ)
|
xgboost: src/xgboost_main.cpp src/utils/*.h src/*.h src/learner/*.hpp src/learner/*.h $(OBJ)
|
||||||
wrapper/libxgboostwrapper.so: wrapper/xgboost_wrapper.cpp src/utils/*.h src/*.h src/learner/*.hpp src/learner/*.h $(OBJ)
|
wrapper/libxgboostwrapper.so: wrapper/xgboost_wrapper.cpp src/utils/*.h src/*.h src/learner/*.hpp src/learner/*.h $(OBJ)
|
||||||
|
test/test: test/test.cpp sync.o
|
||||||
|
|
||||||
$(BIN) :
|
$(BIN) :
|
||||||
$(CXX) $(CFLAGS) $(LDFLAGS) -o $@ $(filter %.cpp %.o %.c, $^)
|
$(CXX) $(CFLAGS) $(LDFLAGS) -o $@ $(filter %.cpp %.o %.c, $^)
|
||||||
@ -37,6 +41,12 @@ $(SLIB) :
|
|||||||
$(OBJ) :
|
$(OBJ) :
|
||||||
$(CXX) -c $(CFLAGS) -o $@ $(firstword $(filter %.cpp %.c, $^) )
|
$(CXX) -c $(CFLAGS) -o $@ $(firstword $(filter %.cpp %.c, $^) )
|
||||||
|
|
||||||
|
$(MPIOBJ) :
|
||||||
|
$(MPICXX) -c $(CFLAGS) -o $@ $(firstword $(filter %.cpp %.c, $^) )
|
||||||
|
|
||||||
|
$(MPIBIN) :
|
||||||
|
$(MPICXX) $(CFLAGS) $(LDFLAGS) -o $@ $(filter %.cpp %.o %.c, $^)
|
||||||
|
|
||||||
install:
|
install:
|
||||||
cp -f -r $(BIN) $(INSTALL_PATH)
|
cp -f -r $(BIN) $(INSTALL_PATH)
|
||||||
|
|
||||||
|
|||||||
61
src/sync/sync.cpp
Normal file
61
src/sync/sync.cpp
Normal file
@ -0,0 +1,61 @@
|
|||||||
|
#include "./sync.h"
|
||||||
|
#include "../utils/utils.h"
|
||||||
|
#include "mpi.h"
|
||||||
|
|
||||||
|
namespace xgboost {
|
||||||
|
namespace sync {
|
||||||
|
|
||||||
|
// code for reduce handle
|
||||||
|
ReduceHandle::ReduceHandle(void) : handle(NULL) {
|
||||||
|
}
|
||||||
|
ReduceHandle::~ReduceHandle(void) {
|
||||||
|
if (handle != NULL) {
|
||||||
|
MPI::Op *op = reinterpret_cast<MPI::Op*>(handle);
|
||||||
|
op->Free();
|
||||||
|
delete op;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
void ReduceHandle::Init(ReduceFunction redfunc, bool commute) {
|
||||||
|
utils::Assert(handle == NULL, "cannot initialize reduce handle twice");
|
||||||
|
MPI::Op *op = new MPI::Op();
|
||||||
|
MPI::User_function *pf = reinterpret_cast<MPI::User_function*>(redfunc);
|
||||||
|
op->Init(pf, commute);
|
||||||
|
handle = op;
|
||||||
|
}
|
||||||
|
void ReduceHandle::AllReduce(void *sendrecvbuf, size_t n4byte) {
|
||||||
|
utils::Assert(handle != NULL, "must intialize handle to call AllReduce");
|
||||||
|
MPI::Op *op = reinterpret_cast<MPI::Op*>(handle);
|
||||||
|
MPI::COMM_WORLD.Allreduce(MPI_IN_PLACE, sendrecvbuf, n4byte, MPI_INT, *op);
|
||||||
|
}
|
||||||
|
|
||||||
|
int GetRank(void) {
|
||||||
|
return MPI::COMM_WORLD.Get_rank();
|
||||||
|
}
|
||||||
|
|
||||||
|
void Init(int argc, char *argv[]) {
|
||||||
|
MPI::Init(argc, argv);
|
||||||
|
}
|
||||||
|
|
||||||
|
void Finalize(void) {
|
||||||
|
MPI::Finalize();
|
||||||
|
}
|
||||||
|
|
||||||
|
void AllReduce_(void *sendrecvbuf, int count, const MPI::Datatype &dtype, ReduceOp op) {
|
||||||
|
switch(op) {
|
||||||
|
case kBitwiseOR: MPI::COMM_WORLD.Allreduce(MPI_IN_PLACE, sendrecvbuf, count, dtype, MPI::BOR); return;
|
||||||
|
case kSum: MPI::COMM_WORLD.Allreduce(MPI_IN_PLACE, sendrecvbuf, count, dtype, MPI::SUM); return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
template<>
|
||||||
|
void AllReduce<uint32_t>(uint32_t *sendrecvbuf, int count, ReduceOp op) {
|
||||||
|
AllReduce_(sendrecvbuf, count, MPI::UNSIGNED, op);
|
||||||
|
}
|
||||||
|
|
||||||
|
template<>
|
||||||
|
void AllReduce<float>(float *sendrecvbuf, int count, ReduceOp op) {
|
||||||
|
AllReduce_(sendrecvbuf, count, MPI::FLOAT, op);
|
||||||
|
}
|
||||||
|
|
||||||
|
} // namespace sync
|
||||||
|
} // namespace xgboost
|
||||||
@ -5,14 +5,99 @@
|
|||||||
* \brief interface to do synchronization
|
* \brief interface to do synchronization
|
||||||
* \author Tianqi Chen
|
* \author Tianqi Chen
|
||||||
*/
|
*/
|
||||||
|
#include <cstdio>
|
||||||
|
#include <cstring>
|
||||||
|
#include "../utils/utils.h"
|
||||||
|
|
||||||
namespace xgboost {
|
namespace xgboost {
|
||||||
|
/*! \brief syncrhonizer module that minimum wraps MPI */
|
||||||
namespace sync {
|
namespace sync {
|
||||||
/*!
|
/*! \brief reduce operator supported */
|
||||||
* \brief synchronization context interface of xgboost,
|
enum ReduceOp {
|
||||||
* will be provided as a singleton
|
kBitwiseOR,
|
||||||
*/
|
kSum
|
||||||
class IContext {
|
};
|
||||||
|
|
||||||
|
typedef void (ReduceFunction) (const void *src, void *dst, int len);
|
||||||
|
|
||||||
|
/* !\brief handle for customized reducer */
|
||||||
|
class ReduceHandle {
|
||||||
|
public:
|
||||||
|
// constructor
|
||||||
|
ReduceHandle(void);
|
||||||
|
// destructor
|
||||||
|
~ReduceHandle(void);
|
||||||
|
// initialize the reduce function
|
||||||
|
void Init(ReduceFunction redfunc, bool commute = true);
|
||||||
|
/*!
|
||||||
|
* \brief customized in-place all reduce operation
|
||||||
|
* \param sendrecvbuf the in place send-recv buffer
|
||||||
|
* \param n4bytes number of nbytes send through all reduce
|
||||||
|
*/
|
||||||
|
void AllReduce(void *sendrecvbuf, size_t n4bytes);
|
||||||
|
|
||||||
|
private:
|
||||||
|
// handle data field
|
||||||
|
void *handle;
|
||||||
|
};
|
||||||
|
|
||||||
|
/*! \brief get rank of current process */
|
||||||
|
int GetRank(void);
|
||||||
|
/*! \brief intiialize the synchronization module */
|
||||||
|
void Init(int argc, char *argv[]);
|
||||||
|
/*! \brief finalize syncrhonization module */
|
||||||
|
void Finalize(void);
|
||||||
|
/*!
|
||||||
|
* \brief in-place all reduce operation
|
||||||
|
* \param sendrecvbuf the in place send-recv buffer
|
||||||
|
* \param count count of data
|
||||||
|
* \param op reduction function
|
||||||
|
*/
|
||||||
|
template<typename DType>
|
||||||
|
void AllReduce(DType *sendrecvbuf, int count, ReduceOp op);
|
||||||
|
|
||||||
|
/*!
|
||||||
|
* \brief template class to make customized reduce and all reduce easy
|
||||||
|
* Do not use reducer directly in the function you call Finalize, because the destructor can happen after Finalize
|
||||||
|
* \tparam DType data type that to be reduced
|
||||||
|
* DType must be a struct, with no pointer, and contains a function Reduce(const DType &d);
|
||||||
|
*/
|
||||||
|
template<typename DType>
|
||||||
|
class Reducer {
|
||||||
|
public:
|
||||||
|
Reducer(void) {
|
||||||
|
handle.Init(ReduceInner);
|
||||||
|
utils::Assert(sizeof(DType) % sizeof(int) == 0, "struct must be multiple of int");
|
||||||
|
}
|
||||||
|
/*!
|
||||||
|
* \brief customized in-place all reduce operation
|
||||||
|
* \param sendrecvbuf the in place send-recv buffer
|
||||||
|
* \param bytes number of 4bytes send through all reduce
|
||||||
|
* \param reducer the reducer function
|
||||||
|
*/
|
||||||
|
inline void AllReduce(DType *sendrecvbuf, int count) {
|
||||||
|
handle.AllReduce(sendrecvbuf, count * kUnit);
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
// unit size
|
||||||
|
static const size_t kUnit = sizeof(DType) / sizeof(int);
|
||||||
|
// inner implementation of reducer
|
||||||
|
inline static void ReduceInner(const void *src_, void *dst_, int len_) {
|
||||||
|
const int *psrc = reinterpret_cast<const int*>(src_);
|
||||||
|
int *pdst = reinterpret_cast<int*>(dst_);
|
||||||
|
DType tdst, tsrc;
|
||||||
|
utils::Assert(len_ % kUnit == 0, "length not divide by size");
|
||||||
|
for (size_t i = 0; i < len_; i += kUnit) {
|
||||||
|
// use memcpy to avoid alignment issue
|
||||||
|
std::memcpy(&tdst, pdst + i, sizeof(tdst));
|
||||||
|
std::memcpy(&tsrc, psrc + i, sizeof(tsrc));
|
||||||
|
tdst.Reduce(tsrc);
|
||||||
|
std::memcpy(pdst + i, &tdst, sizeof(tdst));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// function handle
|
||||||
|
ReduceHandle handle;
|
||||||
};
|
};
|
||||||
|
|
||||||
} // namespace sync
|
} // namespace sync
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user