diff --git a/Makefile b/Makefile index c5d052b7c..da9bb22fb 100644 --- a/Makefile +++ b/Makefile @@ -2,7 +2,8 @@ export CC = gcc export CXX = g++ export MPICXX = mpicxx export LDFLAGS= -Llib -export CFLAGS = -Wall -O3 -msse2 -Wno-unknown-pragmas -fPIC -Iinclude +export WARNFLAGS= -Wall -Wextra -Wno-unused-parameter -Wno-unknown-pragmas -pedantic +export CFLAGS = -O3 -msse2 -fPIC -Iinclude $(WARNFLAGS) # build path BPATH=. @@ -15,7 +16,7 @@ ALIB= lib/librabit.a lib/librabit_mpi.a lib/librabit_empty.a lib/librabit_mock.a HEADERS=src/*.h include/*.h include/rabit/*.h .PHONY: clean all install mpi python -all: lib/librabit.a lib/librabit_mock.a $(SLIB) +all: lib/librabit.a lib/librabit_mock.a wrapper/librabit_wrapper.so wrapper/librabit_wrapper_mock.so mpi: lib/librabit_mpi.a wrapper/librabit_wrapper_mpi.so python: wrapper/librabit_wrapper.so wrapper/librabit_wrapper_mock.so diff --git a/include/rabit/engine.h b/include/rabit/engine.h index fbbdaa8f0..dd85b508b 100644 --- a/include/rabit/engine.h +++ b/include/rabit/engine.h @@ -242,8 +242,10 @@ class ReduceHandle { static int TypeSize(const MPI::Datatype &dtype); protected: - // handle data field + // handle function field void *handle_; + // reduce function of the reducer + IEngine::ReduceFunction *redfunc_; // handle to the type field void *htype_; // the created type in 4 bytes diff --git a/include/rabit/rabit-inl.h b/include/rabit/rabit-inl.h index 4ee1a42b5..4ffd812e7 100644 --- a/include/rabit/rabit-inl.h +++ b/include/rabit/rabit-inl.h @@ -201,7 +201,7 @@ inline void ReducerFunc_(const void *src_, void *dst_, int len_, const MPI::Data const char *psrc = reinterpret_cast(src_); char *pdst = reinterpret_cast(dst_); DType tdst, tsrc; - for (size_t i = 0; i < len_; ++i) { + for (int i = 0; i < len_; ++i) { // use memcpy to avoid alignment issue std::memcpy(&tdst, pdst + i * kUnit, sizeof(tdst)); std::memcpy(&tsrc, psrc + i * kUnit, sizeof(tsrc)); diff --git a/src/allreduce_base.cc b/src/allreduce_base.cc index 590041f8d..b905c2217 100644 --- a/src/allreduce_base.cc +++ b/src/allreduce_base.cc @@ -27,6 +27,7 @@ AllreduceBase::AllreduceBase(void) { hadoop_mode = 0; version_number = 0; task_id = "NULL"; + err_link = NULL; this->SetParam("rabit_reduce_buffer", "256MB"); } diff --git a/src/allreduce_base.h b/src/allreduce_base.h index 78e69bd99..b1cc71ef1 100644 --- a/src/allreduce_base.h +++ b/src/allreduce_base.h @@ -249,7 +249,9 @@ class AllreduceBase : public IEngine { // buffer size, in bytes size_t buffer_size; // constructor - LinkRecord(void) {} + LinkRecord(void) + : buffer_head(NULL), buffer_size(0) { + } // initialize buffer inline void InitBuffer(size_t type_nbytes, size_t count, size_t reduce_buffer_size) { @@ -276,6 +278,7 @@ class AllreduceBase : public IEngine { * \return the type of reading */ inline ReturnType ReadToRingBuffer(size_t protect_start) { + utils::Assert(buffer_head != NULL, "ReadToRingBuffer: buffer not allocated"); size_t ngap = size_read - protect_start; utils::Assert(ngap <= buffer_size, "Allreduce: boundary check"); size_t offset = size_read % buffer_size; diff --git a/src/allreduce_robust.cc b/src/allreduce_robust.cc index 341ea558f..38cadbdb2 100644 --- a/src/allreduce_robust.cc +++ b/src/allreduce_robust.cc @@ -27,6 +27,7 @@ AllreduceRobust::AllreduceRobust(void) { result_buffer_round = 1; global_lazycheck = NULL; use_local_model = -1; + recover_counter = 0; } void AllreduceRobust::Init(void) { AllreduceBase::Init(); @@ -421,6 +422,8 @@ AllreduceRobust::ReturnType AllreduceRobust::TryResetLinks(void) { */ bool AllreduceRobust::CheckAndRecover(ReturnType err_type) { if (err_type == kSuccess) return true; + utils::Assert(err_link != NULL, "must know the error source"); + recover_counter += 1; { // simple way, shutdown all links for (size_t i = 0; i < all_links.size(); ++i) { @@ -602,6 +605,9 @@ AllreduceRobust::TryRecoverData(RecoverType role, if (!req_data) return kSuccess; } utils::Assert(recv_link >= 0 || role == kHaveData, "recv_link must be active"); + if (role == kPassData) { + links[recv_link].InitBuffer(1, size, reduce_buffer_size); + } for (int i = 0; i < nlink; ++i) { links[i].ResetSize(); } diff --git a/src/allreduce_robust.h b/src/allreduce_robust.h index 078ff1598..23a3964d8 100644 --- a/src/allreduce_robust.h +++ b/src/allreduce_robust.h @@ -531,6 +531,8 @@ o * the input state must exactly one saved state(local state of current node) int use_local_model; // number of replica for global state/model int num_global_replica; + // number of times recovery happens + int recover_counter; // --- recovery data structure for local checkpoint // there is two version of the data structure, // at one time one version is valid and another is used as temp memory diff --git a/src/engine.cc b/src/engine.cc index 45bef329c..1aa220f0e 100644 --- a/src/engine.cc +++ b/src/engine.cc @@ -56,7 +56,8 @@ void Allreduce_(void *sendrecvbuf, } // code for reduce handle -ReduceHandle::ReduceHandle(void) : handle_(NULL), htype_(NULL) { +ReduceHandle::ReduceHandle(void) + : handle_(NULL), redfunc_(NULL), htype_(NULL) { } ReduceHandle::~ReduceHandle(void) {} @@ -64,17 +65,16 @@ int ReduceHandle::TypeSize(const MPI::Datatype &dtype) { return static_cast(dtype.type_size); } void ReduceHandle::Init(IEngine::ReduceFunction redfunc, size_t type_nbytes) { - utils::Assert(handle_ == NULL, "cannot initialize reduce handle twice"); - handle_ = reinterpret_cast(redfunc); + utils::Assert(redfunc_ == NULL, "cannot initialize reduce handle twice"); + redfunc_ = redfunc; } void ReduceHandle::Allreduce(void *sendrecvbuf, size_t type_nbytes, size_t count, IEngine::PreprocFunction prepare_fun, void *prepare_arg) { - utils::Assert(handle_ != NULL, "must intialize handle to call AllReduce"); + utils::Assert(redfunc_ != NULL, "must intialize handle to call AllReduce"); GetEngine()->Allreduce(sendrecvbuf, type_nbytes, count, - reinterpret_cast(handle_), - prepare_fun, prepare_arg); + redfunc_, prepare_fun, prepare_arg); } } // namespace engine } // namespace rabit diff --git a/src/engine_mpi.cc b/src/engine_mpi.cc index 9c6206ebf..006a91006 100644 --- a/src/engine_mpi.cc +++ b/src/engine_mpi.cc @@ -137,7 +137,8 @@ void Allreduce_(void *sendrecvbuf, } // code for reduce handle -ReduceHandle::ReduceHandle(void) : handle_(NULL), htype_(NULL) { +ReduceHandle::ReduceHandle(void) + : handle_(NULL), redfunc_(NULL), htype_(NULL) { } ReduceHandle::~ReduceHandle(void) { if (handle_ != NULL) {