Chen Qin 5797dcb64e support bootstrap allreduce/broadcast (#98)
* support run rabit tests as xgboost subproject using xgboost/dmlc-core

* support tracker config set/get

* remove redudant printf

* remove redudant printf

* add c++0x declaration

* log allreduce/broadcast caller, engine should track caller stack for
investigation

* tracker support binary config format

* Revert "tracker support binary config format"

This reverts commit 2a28e5e2b55c200cb621af8d19f17ab1bc62503b.

* remove caller, prototype fetch allreduce/broadcast results from resbuf

* store cached allreduce/broadcast seq_no to tracker

* allow restore all caches from other nodes

* try new rabit collective cache, todo: recv_link seems down

* link up cache restore with main recovery

* cleanup load cache state

* update cache api

* pass test.mk

* have a working tests

* try to unify check into actionsummary

* more logging to debug distributed hist three method issue

* update rabit interface to support caller signature matching

* splite seq_counter from cur_cache_seq to different variables

* still see issue with inf loop

* support debug print caller as well as allreduce op

* cleanup

* remove get/set cache from model_recover, adding recover in
loadcheckpoint

* clarify rabit cache strategy, cache is set only by successful collective
call involving all nodes with unique cache key. if all nodes call
getcache at same time, we keep rabit run collective call. If some nodes
call getcache while others not, we backfill cache from those nodes with
most entries

* revert caller logs

* fix lint error

* fix engine mpi signature

* support getcache by ref

* allow result buffer presiet to filestream

* add loging

* try fix checkpoint failure recovery case

* use int64_t to avoid overflow caused seq fault

* try avoid int overflow

* try fix checkpoint failure recovery case

* try avoid seqno overflow to negative by offseting specifial flag value
adding cache seq no to checkpoint/load checkpoint/check point ack to avoid
confusion from cache recovery

* fix cache seq assert error

* remove loging, handle edge case

* add extensive log to checkpoint state  with different seq no

* fix lint errors

* clean up comments before merge back to master

* add logs to allreduce/broadcast/checkpoint

* use unsinged int 32 and give seq no larger range

* address remove allreduce dropseq code segment

* using caller signature to filter bootstrapallreduces

* remove get/set cache from empty

* apply signature to reducer

* apply signature to broadcast

* add key to broadcat log

* fix broadcast signature

* fix default _line value for non linux system

* adding comments, remove sleep(1)

* fix osx build issue

* try fix mpi

* fix doc

* fix engine_empty api

* logging, adding more logs, restore immutable assertion

* print unsinged int with ud

* fix lint

* rename seqtype to kSeq and KCache indicating it's usage
apply kDiffSeq check to load_cache routine

* comment allreduce/broadcast log

* allow tests run on arm

* enable flag to turn on / off cache

* add log info alert if user choose to enable rabit bootstrap cache

* add rabit_debug setting so user can use config to turn on

* log flags when user turn on rabit_debug

* force rabit restart if tracker assign -1 rank

* use OPENMP to vecotrize reducer

* address comment

* Revert "address comment"

This reverts commit 1dc61f33e7357dad8fa65528abeb81db92c5f9ed.

* fix checkpoint size print 0

* per feedback, remove DISABLEOPEMP, address race condition

* - remove openmp from this pr
- update name from cache to boostrapcache

* add default value of signature macros

* remove openmp from cmake file

* Update src/allreduce_robust.cc

Co-Authored-By: Philip Hyunsu Cho <chohyu01@cs.washington.edu>

* Update src/allreduce_robust.cc

Co-Authored-By: Philip Hyunsu Cho <chohyu01@cs.washington.edu>

* run test with cmake

* remove openmp

* fix cmake based tests

* use cmake test fix darwin .dylib issue

* move around rabit_signature definition due to windows build

* misc, add c++ check in CMakeFile

* per feedback

* resolve CMake file

* update rabit version
2019-08-27 18:12:33 -07:00

377 lines
13 KiB
C++

/*!
* Copyright (c) 2014-2019 by Contributors
* \file rabit-inl.h
* \brief implementation of inline template function for rabit interface
*
* \author Tianqi Chen
*/
#ifndef RABIT_INTERNAL_RABIT_INL_H_
#define RABIT_INTERNAL_RABIT_INL_H_
// use engine for implementation
#include <vector>
#include <string>
#include "./io.h"
#include "./utils.h"
#include "../rabit.h"
namespace rabit {
namespace engine {
namespace mpi {
// template function to translate type to enum indicator
template<typename DType>
inline DataType GetType(void);
template<>
inline DataType GetType<char>(void) {
return kChar;
}
template<>
inline DataType GetType<unsigned char>(void) {
return kUChar;
}
template<>
inline DataType GetType<int>(void) {
return kInt;
}
template<>
inline DataType GetType<unsigned int>(void) { // NOLINT(*)
return kUInt;
}
template<>
inline DataType GetType<long>(void) { // NOLINT(*)
return kLong;
}
template<>
inline DataType GetType<unsigned long>(void) { // NOLINT(*)
return kULong;
}
template<>
inline DataType GetType<float>(void) {
return kFloat;
}
template<>
inline DataType GetType<double>(void) {
return kDouble;
}
template<>
inline DataType GetType<long long>(void) { // NOLINT(*)
return kLongLong;
}
template<>
inline DataType GetType<unsigned long long>(void) { // NOLINT(*)
return kULongLong;
}
} // namespace mpi
} // namespace engine
namespace op {
struct Max {
static const engine::mpi::OpType kType = engine::mpi::kMax;
template<typename DType>
inline static void Reduce(DType &dst, const DType &src) { // NOLINT(*)
if (dst < src) dst = src;
}
};
struct Min {
static const engine::mpi::OpType kType = engine::mpi::kMin;
template<typename DType>
inline static void Reduce(DType &dst, const DType &src) { // NOLINT(*)
if (dst > src) dst = src;
}
};
struct Sum {
static const engine::mpi::OpType kType = engine::mpi::kSum;
template<typename DType>
inline static void Reduce(DType &dst, const DType &src) { // NOLINT(*)
dst += src;
}
};
struct BitOR {
static const engine::mpi::OpType kType = engine::mpi::kBitwiseOR;
template<typename DType>
inline static void Reduce(DType &dst, const DType &src) { // NOLINT(*)
dst |= src;
}
};
template<typename OP, typename DType>
inline void Reducer(const void *src_, void *dst_, int len, const MPI::Datatype &dtype) {
const DType *src = (const DType*)src_;
DType *dst = (DType*)dst_; // NOLINT(*)
for (int i = 0; i < len; ++i) {
OP::Reduce(dst[i], src[i]);
}
}
} // namespace op
// intialize the rabit engine
inline bool Init(int argc, char *argv[]) {
return engine::Init(argc, argv);
}
// finalize the rabit engine
inline bool Finalize(void) {
return engine::Finalize();
}
// get the rank of current process
inline int GetRank(void) {
return engine::GetEngine()->GetRank();
}
// the the size of the world
inline int GetWorldSize(void) {
return engine::GetEngine()->GetWorldSize();
}
// whether rabit is distributed
inline bool IsDistributed(void) {
return engine::GetEngine()->IsDistributed();
}
// get the name of current processor
inline std::string GetProcessorName(void) {
return engine::GetEngine()->GetHost();
}
// broadcast data to all other nodes from root
inline void Broadcast(void *sendrecv_data, size_t size, int root,
bool is_bootstrap,
const char* _file,
const int _line,
const char* _caller) {
engine::GetEngine()->Broadcast(sendrecv_data, size, root,
is_bootstrap, _file, _line, _caller);
}
template<typename DType>
inline void Broadcast(std::vector<DType> *sendrecv_data, int root,
bool is_bootstrap,
const char* _file,
const int _line,
const char* _caller) {
size_t size = sendrecv_data->size();
Broadcast(&size, sizeof(size), root, is_bootstrap, _file, _line, _caller);
if (sendrecv_data->size() != size) {
sendrecv_data->resize(size);
}
if (size != 0) {
Broadcast(&(*sendrecv_data)[0], size * sizeof(DType), root,
is_bootstrap, _file, _line, _caller);
}
}
inline void Broadcast(std::string *sendrecv_data, int root,
bool is_bootstrap,
const char* _file,
const int _line,
const char* _caller) {
size_t size = sendrecv_data->length();
Broadcast(&size, sizeof(size), root, is_bootstrap, _file, _line, _caller);
if (sendrecv_data->length() != size) {
sendrecv_data->resize(size);
}
if (size != 0) {
Broadcast(&(*sendrecv_data)[0], size * sizeof(char), root,
is_bootstrap, _file, _line, _caller);
}
}
// perform inplace Allreduce
template<typename OP, typename DType>
inline void Allreduce(DType *sendrecvbuf, size_t count,
void (*prepare_fun)(void *arg),
void *prepare_arg,
bool is_bootstrap,
const char* _file,
const int _line,
const char* _caller) {
engine::Allreduce_(sendrecvbuf, sizeof(DType), count, op::Reducer<OP, DType>,
engine::mpi::GetType<DType>(), OP::kType, prepare_fun, prepare_arg,
is_bootstrap, _file, _line, _caller);
}
// C++11 support for lambda prepare function
#if DMLC_USE_CXX11
inline void InvokeLambda_(void *fun) {
(*static_cast<std::function<void()>*>(fun))();
}
template<typename OP, typename DType>
inline void Allreduce(DType *sendrecvbuf, size_t count,
std::function<void()> prepare_fun,
bool is_bootstrap,
const char* _file,
const int _line,
const char* _caller) {
engine::Allreduce_(sendrecvbuf, sizeof(DType), count, op::Reducer<OP, DType>,
engine::mpi::GetType<DType>(), OP::kType, InvokeLambda_, &prepare_fun,
is_bootstrap, _file, _line, _caller);
}
#endif // C++11
// print message to the tracker
inline void TrackerPrint(const std::string &msg) {
engine::GetEngine()->TrackerPrint(msg);
}
#ifndef RABIT_STRICT_CXX98_
inline void TrackerPrintf(const char *fmt, ...) {
const int kPrintBuffer = 1 << 10;
std::string msg(kPrintBuffer, '\0');
va_list args;
va_start(args, fmt);
vsnprintf(&msg[0], kPrintBuffer, fmt, args);
va_end(args);
msg.resize(strlen(msg.c_str()));
TrackerPrint(msg);
}
#endif // RABIT_STRICT_CXX98_
// load latest check point
inline int LoadCheckPoint(Serializable *global_model,
Serializable *local_model) {
return engine::GetEngine()->LoadCheckPoint(global_model, local_model);
}
// checkpoint the model, meaning we finished a stage of execution
inline void CheckPoint(const Serializable *global_model,
const Serializable *local_model) {
engine::GetEngine()->CheckPoint(global_model, local_model);
}
// lazy checkpoint the model, only remember the pointer to global_model
inline void LazyCheckPoint(const Serializable *global_model) {
engine::GetEngine()->LazyCheckPoint(global_model);
}
// return the version number of currently stored model
inline int VersionNumber(void) {
return engine::GetEngine()->VersionNumber();
}
// ---------------------------------
// Code to handle customized Reduce
// ---------------------------------
// function to perform reduction for Reducer
template<typename DType, void (*freduce)(DType &dst, const DType &src)>
inline void ReducerSafe_(const void *src_, void *dst_, int len_, const MPI::Datatype &dtype) {
const size_t kUnit = sizeof(DType);
const char *psrc = reinterpret_cast<const char*>(src_);
char *pdst = reinterpret_cast<char*>(dst_);
for (int i = 0; i < len_; ++i) {
DType tdst, tsrc;
// use memcpy to avoid alignment issue
std::memcpy(&tdst, pdst + i * kUnit, sizeof(tdst));
std::memcpy(&tsrc, psrc + i * kUnit, sizeof(tsrc));
freduce(tdst, tsrc);
std::memcpy(pdst + i * kUnit, &tdst, sizeof(tdst));
}
}
// function to perform reduction for Reducer
template<typename DType, void (*freduce)(DType &dst, const DType &src)> // NOLINT(*)
inline void ReducerAlign_(const void *src_, void *dst_,
int len_, const MPI::Datatype &dtype) {
const DType *psrc = reinterpret_cast<const DType*>(src_);
DType *pdst = reinterpret_cast<DType*>(dst_);
for (int i = 0; i < len_; ++i) {
freduce(pdst[i], psrc[i]);
}
}
template<typename DType, void (*freduce)(DType &dst, const DType &src)> // NOLINT(*)
inline Reducer<DType, freduce>::Reducer(void) {
// it is safe to directly use handle for aligned data types
if (sizeof(DType) == 8 || sizeof(DType) == 4 || sizeof(DType) == 1) {
this->handle_.Init(ReducerAlign_<DType, freduce>, sizeof(DType));
} else {
this->handle_.Init(ReducerSafe_<DType, freduce>, sizeof(DType));
}
}
template<typename DType, void (*freduce)(DType &dst, const DType &src)> // NOLINT(*)
inline void Reducer<DType, freduce>::Allreduce(DType *sendrecvbuf, size_t count,
void (*prepare_fun)(void *arg),
void *prepare_arg,
bool is_bootstrap,
const char* _file,
const int _line,
const char* _caller) {
handle_.Allreduce(sendrecvbuf, sizeof(DType), count, prepare_fun,
prepare_arg, is_bootstrap, _file, _line, _caller);
}
// function to perform reduction for SerializeReducer
template<typename DType>
inline void SerializeReducerFunc_(const void *src_, void *dst_,
int len_, const MPI::Datatype &dtype) {
int nbytes = engine::ReduceHandle::TypeSize(dtype);
// temp space
for (int i = 0; i < len_; ++i) {
DType tsrc, tdst;
utils::MemoryFixSizeBuffer fsrc((char*)(src_) + i * nbytes, nbytes); // NOLINT(*)
utils::MemoryFixSizeBuffer fdst((char*)(dst_) + i * nbytes, nbytes); // NOLINT(*)
tsrc.Load(fsrc);
tdst.Load(fdst);
// govern const check
tdst.Reduce(static_cast<const DType &>(tsrc), nbytes);
fdst.Seek(0);
tdst.Save(fdst);
}
}
template<typename DType>
inline SerializeReducer<DType>::SerializeReducer(void) {
handle_.Init(SerializeReducerFunc_<DType>, sizeof(DType));
}
// closure to call Allreduce
template<typename DType>
struct SerializeReduceClosure {
DType *sendrecvobj;
size_t max_nbyte, count;
void (*prepare_fun)(void *arg);
void *prepare_arg;
std::string *p_buffer;
// invoke the closure
inline void Run(void) {
if (prepare_fun != NULL) prepare_fun(prepare_arg);
for (size_t i = 0; i < count; ++i) {
utils::MemoryFixSizeBuffer fs(BeginPtr(*p_buffer) + i * max_nbyte, max_nbyte);
sendrecvobj[i].Save(fs);
}
}
inline static void Invoke(void *c) {
static_cast<SerializeReduceClosure<DType>*>(c)->Run();
}
};
template<typename DType>
inline void SerializeReducer<DType>::Allreduce(DType *sendrecvobj,
size_t max_nbyte, size_t count,
void (*prepare_fun)(void *arg),
void *prepare_arg,
bool is_bootstrap,
const char* _file,
const int _line,
const char* _caller) {
buffer_.resize(max_nbyte * count);
// setup closure
SerializeReduceClosure<DType> c;
c.sendrecvobj = sendrecvobj; c.max_nbyte = max_nbyte; c.count = count;
c.prepare_fun = prepare_fun; c.prepare_arg = prepare_arg; c.p_buffer = &buffer_;
// invoke here
handle_.Allreduce(BeginPtr(buffer_), max_nbyte, count,
SerializeReduceClosure<DType>::Invoke, &c,
is_bootstrap, _file, _line, _caller);
for (size_t i = 0; i < count; ++i) {
utils::MemoryFixSizeBuffer fs(BeginPtr(buffer_) + i * max_nbyte, max_nbyte);
sendrecvobj[i].Load(fs);
}
}
#if DMLC_USE_CXX11
template<typename DType, void (*freduce)(DType &dst, const DType &src)> // NOLINT(*)g
inline void Reducer<DType, freduce>::Allreduce(DType *sendrecvbuf, size_t count,
std::function<void()> prepare_fun,
bool is_bootstrap,
const char* _file,
const int _line,
const char* _caller) {
this->Allreduce(sendrecvbuf, count, InvokeLambda_, &prepare_fun,
is_bootstrap, _file, _line, _caller);
}
template<typename DType>
inline void SerializeReducer<DType>::Allreduce(DType *sendrecvobj,
size_t max_nbytes, size_t count,
std::function<void()> prepare_fun,
bool is_bootstrap,
const char* _file,
const int _line,
const char* _caller) {
this->Allreduce(sendrecvobj, max_nbytes, count, InvokeLambda_, &prepare_fun,
is_bootstrap, _file, _line, _caller);
}
#endif // DMLC_USE_CXX11
} // namespace rabit
#endif // RABIT_INTERNAL_RABIT_INL_H_