xgboost/include/rabit/rabit-inl.h
tqchen a16289b204 Squashed 'subtree/rabit/' changes from fa99857..e81a11d
e81a11d Merge pull request #25 from daiyl0320/master
35c3b37 add retry mechanism to ConnectTracker and modify Listen backlog to 128 in rabit_traker.py
c71ed6f try deply doxygen
62e5647 try deply doxygen
732f1c6 try
2fa6e02 ok
0537665 minor
7b59dcb minor
5934950 new doc
f538187 ok
44b6049 new doc
387339b add more
9d4397a chg
2879a48 chg
30e3110 ok
9ff0301 add link translation
6b629c2 k
32e1955 ok
8f4839d fix
93137b2 ok
7eeeb79 reload recommonmark
a8f00cc minor
19b0f01 ok
dd01184 minor
c1cdc19 minor
fcf0f43 try rst
cbc21ae try
62ddfa7 tiny
aefc05c final change
2aee9b4 minor
fe4e7c2 ok
8001983 change to subtitle
5ca33e4 ok
88f7d24 update guide
29d43ab add code
fe8bb3b minor hack for readthedocs
229c71d Merge branch 'master' of ssh://github.com/dmlc/rabit
7424218 ok
d1d45bb Update README.md
1e8813f Update README.md
1ccc990 Update README.md
0323e06 remove readme
679a835 remove theme
7ea5b7c remove numpydoc to napoleon
b73e2be Merge branch 'master' of ssh://github.com/dmlc/rabit
1742283 ok
1838e25 Update python-requirements.txt
bc4e957 ok
fba6fc2 ok
0251101 ok
d50b905 ok
d4f2509 ok
cdf401a ok
fef0ef2 new doc
cef360d ok
c125d2a ok
270a49e add requirments
744f901 get the basic doc
1cb5cad Merge branch 'master' of ssh://github.com/dmlc/rabit
8cc07ba minor
d74f126 Update .travis.yml
52b3dcd Update .travis.yml
099581b Update .travis.yml
1258046 Update .travis.yml
7addac9 Update Makefile
0ea7adf Update .travis.yml
f858856 Update travis_script.sh
d8eac4a Update README.md
3cc49ad lint and travis
ceedf4e fix
fd8920c fix win32
8bbed35 modify
9520b90 Merge pull request #14 from dmlc/hjk41
df14bb1 fix type
f441dc7 replace tab with blankspace
2467942 remove unnecessary include
181ef47 defined long long and ulonglong
1582180 use int32_t to define int and int64_t to define long. in VC long is 32bit
e0b7da0 fix

git-subtree-dir: subtree/rabit
git-subtree-split: e81a11dd7ee3cff87a38a42901315821df018bae
2015-10-20 19:37:47 -07:00

329 lines
11 KiB
C++

/*!
* Copyright by Contributors
* \file rabit-inl.h
* \brief implementation of inline template function for rabit interface
*
* \author Tianqi Chen
*/
#ifndef RABIT_RABIT_INL_H_
#define RABIT_RABIT_INL_H_
// use engine for implementation
#include <vector>
#include <string>
#include "./io.h"
#include "./utils.h"
#include "../rabit.h"
namespace rabit {
namespace engine {
namespace mpi {
// template function to translate type to enum indicator
template<typename DType>
inline DataType GetType(void);
template<>
inline DataType GetType<char>(void) {
return kChar;
}
template<>
inline DataType GetType<unsigned char>(void) {
return kUChar;
}
template<>
inline DataType GetType<int>(void) {
return kInt;
}
template<>
inline DataType GetType<unsigned int>(void) { // NOLINT(*)
return kUInt;
}
template<>
inline DataType GetType<long>(void) { // NOLINT(*)
return kLong;
}
template<>
inline DataType GetType<unsigned long>(void) { // NOLINT(*)
return kULong;
}
template<>
inline DataType GetType<float>(void) {
return kFloat;
}
template<>
inline DataType GetType<double>(void) {
return kDouble;
}
template<>
inline DataType GetType<long long>(void) { // NOLINT(*)
return kLongLong;
}
template<>
inline DataType GetType<unsigned long long>(void) { // NOLINT(*)
return kULongLong;
}
} // namespace mpi
} // namespace engine
namespace op {
struct Max {
static const engine::mpi::OpType kType = engine::mpi::kMax;
template<typename DType>
inline static void Reduce(DType &dst, const DType &src) { // NOLINT(*)
if (dst < src) dst = src;
}
};
struct Min {
static const engine::mpi::OpType kType = engine::mpi::kMin;
template<typename DType>
inline static void Reduce(DType &dst, const DType &src) { // NOLINT(*)
if (dst > src) dst = src;
}
};
struct Sum {
static const engine::mpi::OpType kType = engine::mpi::kSum;
template<typename DType>
inline static void Reduce(DType &dst, const DType &src) { // NOLINT(*)
dst += src;
}
};
struct BitOR {
static const engine::mpi::OpType kType = engine::mpi::kBitwiseOR;
template<typename DType>
inline static void Reduce(DType &dst, const DType &src) { // NOLINT(*)
dst |= src;
}
};
template<typename OP, typename DType>
inline void Reducer(const void *src_, void *dst_, int len, const MPI::Datatype &dtype) {
const DType *src = (const DType*)src_;
DType *dst = (DType*)dst_; // NOLINT(*)
for (int i = 0; i < len; ++i) {
OP::Reduce(dst[i], src[i]);
}
}
} // namespace op
// intialize the rabit engine
inline void Init(int argc, char *argv[]) {
engine::Init(argc, argv);
}
// finalize the rabit engine
inline void Finalize(void) {
engine::Finalize();
}
// get the rank of current process
inline int GetRank(void) {
return engine::GetEngine()->GetRank();
}
// the the size of the world
inline int GetWorldSize(void) {
return engine::GetEngine()->GetWorldSize();
}
// whether rabit is distributed
inline bool IsDistributed(void) {
return engine::GetEngine()->IsDistributed();
}
// get the name of current processor
inline std::string GetProcessorName(void) {
return engine::GetEngine()->GetHost();
}
// broadcast data to all other nodes from root
inline void Broadcast(void *sendrecv_data, size_t size, int root) {
engine::GetEngine()->Broadcast(sendrecv_data, size, root);
}
template<typename DType>
inline void Broadcast(std::vector<DType> *sendrecv_data, int root) {
size_t size = sendrecv_data->size();
Broadcast(&size, sizeof(size), root);
if (sendrecv_data->size() != size) {
sendrecv_data->resize(size);
}
if (size != 0) {
Broadcast(&(*sendrecv_data)[0], size * sizeof(DType), root);
}
}
inline void Broadcast(std::string *sendrecv_data, int root) {
size_t size = sendrecv_data->length();
Broadcast(&size, sizeof(size), root);
if (sendrecv_data->length() != size) {
sendrecv_data->resize(size);
}
if (size != 0) {
Broadcast(&(*sendrecv_data)[0], size * sizeof(char), root);
}
}
// perform inplace Allreduce
template<typename OP, typename DType>
inline void Allreduce(DType *sendrecvbuf, size_t count,
void (*prepare_fun)(void *arg),
void *prepare_arg) {
engine::Allreduce_(sendrecvbuf, sizeof(DType), count, op::Reducer<OP, DType>,
engine::mpi::GetType<DType>(), OP::kType, prepare_fun, prepare_arg);
}
// C++11 support for lambda prepare function
#if DMLC_USE_CXX11
inline void InvokeLambda_(void *fun) {
(*static_cast<std::function<void()>*>(fun))();
}
template<typename OP, typename DType>
inline void Allreduce(DType *sendrecvbuf, size_t count, std::function<void()> prepare_fun) {
engine::Allreduce_(sendrecvbuf, sizeof(DType), count, op::Reducer<OP, DType>,
engine::mpi::GetType<DType>(), OP::kType, InvokeLambda_, &prepare_fun);
}
#endif // C++11
// print message to the tracker
inline void TrackerPrint(const std::string &msg) {
engine::GetEngine()->TrackerPrint(msg);
}
#ifndef RABIT_STRICT_CXX98_
inline void TrackerPrintf(const char *fmt, ...) {
const int kPrintBuffer = 1 << 10;
std::string msg(kPrintBuffer, '\0');
va_list args;
va_start(args, fmt);
vsnprintf(&msg[0], kPrintBuffer, fmt, args);
va_end(args);
msg.resize(strlen(msg.c_str()));
TrackerPrint(msg);
}
#endif
// load latest check point
inline int LoadCheckPoint(Serializable *global_model,
Serializable *local_model) {
return engine::GetEngine()->LoadCheckPoint(global_model, local_model);
}
// checkpoint the model, meaning we finished a stage of execution
inline void CheckPoint(const Serializable *global_model,
const Serializable *local_model) {
engine::GetEngine()->CheckPoint(global_model, local_model);
}
// lazy checkpoint the model, only remember the pointer to global_model
inline void LazyCheckPoint(const Serializable *global_model) {
engine::GetEngine()->LazyCheckPoint(global_model);
}
// return the version number of currently stored model
inline int VersionNumber(void) {
return engine::GetEngine()->VersionNumber();
}
// ---------------------------------
// Code to handle customized Reduce
// ---------------------------------
// function to perform reduction for Reducer
template<typename DType, void (*freduce)(DType &dst, const DType &src)>
inline void ReducerSafe_(const void *src_, void *dst_, int len_, const MPI::Datatype &dtype) {
const size_t kUnit = sizeof(DType);
const char *psrc = reinterpret_cast<const char*>(src_);
char *pdst = reinterpret_cast<char*>(dst_);
DType tdst, tsrc;
for (int i = 0; i < len_; ++i) {
// use memcpy to avoid alignment issue
std::memcpy(&tdst, pdst + i * kUnit, sizeof(tdst));
std::memcpy(&tsrc, psrc + i * kUnit, sizeof(tsrc));
freduce(tdst, tsrc);
std::memcpy(pdst + i * kUnit, &tdst, sizeof(tdst));
}
}
// function to perform reduction for Reducer
template<typename DType, void (*freduce)(DType &dst, const DType &src)> // NOLINT(*)
inline void ReducerAlign_(const void *src_, void *dst_,
int len_, const MPI::Datatype &dtype) {
const DType *psrc = reinterpret_cast<const DType*>(src_);
DType *pdst = reinterpret_cast<DType*>(dst_);
for (int i = 0; i < len_; ++i) {
freduce(pdst[i], psrc[i]);
}
}
template<typename DType, void (*freduce)(DType &dst, const DType &src)> // NOLINT(*)
inline Reducer<DType, freduce>::Reducer(void) {
// it is safe to directly use handle for aligned data types
if (sizeof(DType) == 8 || sizeof(DType) == 4 || sizeof(DType) == 1) {
this->handle_.Init(ReducerAlign_<DType, freduce>, sizeof(DType));
} else {
this->handle_.Init(ReducerSafe_<DType, freduce>, sizeof(DType));
}
}
template<typename DType, void (*freduce)(DType &dst, const DType &src)> // NOLINT(*)
inline void Reducer<DType, freduce>::Allreduce(DType *sendrecvbuf, size_t count,
void (*prepare_fun)(void *arg),
void *prepare_arg) {
handle_.Allreduce(sendrecvbuf, sizeof(DType), count, prepare_fun, prepare_arg);
}
// function to perform reduction for SerializeReducer
template<typename DType>
inline void SerializeReducerFunc_(const void *src_, void *dst_,
int len_, const MPI::Datatype &dtype) {
int nbytes = engine::ReduceHandle::TypeSize(dtype);
// temp space
DType tsrc, tdst;
for (int i = 0; i < len_; ++i) {
utils::MemoryFixSizeBuffer fsrc((char*)(src_) + i * nbytes, nbytes); // NOLINT(*)
utils::MemoryFixSizeBuffer fdst((char*)(dst_) + i * nbytes, nbytes); // NOLINT(*)
tsrc.Load(fsrc);
tdst.Load(fdst);
// govern const check
tdst.Reduce(static_cast<const DType &>(tsrc), nbytes);
fdst.Seek(0);
tdst.Save(fdst);
}
}
template<typename DType>
inline SerializeReducer<DType>::SerializeReducer(void) {
handle_.Init(SerializeReducerFunc_<DType>, sizeof(DType));
}
// closure to call Allreduce
template<typename DType>
struct SerializeReduceClosure {
DType *sendrecvobj;
size_t max_nbyte, count;
void (*prepare_fun)(void *arg);
void *prepare_arg;
std::string *p_buffer;
// invoke the closure
inline void Run(void) {
if (prepare_fun != NULL) prepare_fun(prepare_arg);
for (size_t i = 0; i < count; ++i) {
utils::MemoryFixSizeBuffer fs(BeginPtr(*p_buffer) + i * max_nbyte, max_nbyte);
sendrecvobj[i].Save(fs);
}
}
inline static void Invoke(void *c) {
static_cast<SerializeReduceClosure<DType>*>(c)->Run();
}
};
template<typename DType>
inline void SerializeReducer<DType>::Allreduce(DType *sendrecvobj,
size_t max_nbyte, size_t count,
void (*prepare_fun)(void *arg),
void *prepare_arg) {
buffer_.resize(max_nbyte * count);
// setup closure
SerializeReduceClosure<DType> c;
c.sendrecvobj = sendrecvobj; c.max_nbyte = max_nbyte; c.count = count;
c.prepare_fun = prepare_fun; c.prepare_arg = prepare_arg; c.p_buffer = &buffer_;
// invoke here
handle_.Allreduce(BeginPtr(buffer_), max_nbyte, count,
SerializeReduceClosure<DType>::Invoke, &c);
for (size_t i = 0; i < count; ++i) {
utils::MemoryFixSizeBuffer fs(BeginPtr(buffer_) + i * max_nbyte, max_nbyte);
sendrecvobj[i].Load(fs);
}
}
#if DMLC_USE_CXX11
template<typename DType, void (*freduce)(DType &dst, const DType &src)> // NOLINT(*)g
inline void Reducer<DType, freduce>::Allreduce(DType *sendrecvbuf, size_t count,
std::function<void()> prepare_fun) {
this->Allreduce(sendrecvbuf, count, InvokeLambda_, &prepare_fun);
}
template<typename DType>
inline void SerializeReducer<DType>::Allreduce(DType *sendrecvobj,
size_t max_nbytes, size_t count,
std::function<void()> prepare_fun) {
this->Allreduce(sendrecvobj, max_nbytes, count, InvokeLambda_, &prepare_fun);
}
#endif
} // namespace rabit
#endif // RABIT_RABIT_INL_H_