diff --git a/.gitignore b/.gitignore index 517cce2b7..736c321b1 100644 --- a/.gitignore +++ b/.gitignore @@ -23,6 +23,7 @@ *.lib # Executables +*.miss *.exe *.out *.app diff --git a/CMakeLists.txt b/CMakeLists.txt index 974c44e67..cd46cbd52 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -12,16 +12,23 @@ endif() option(RABIT_BUILD_TESTS "Build rabit tests" OFF) option(RABIT_BUILD_MPI "Build MPI" OFF) -option(RABIT_BUILD_DMLC "Include DMLC_CORE in build" ON) +option(RABIT_BUILD_DMLC "Include DMLC_CORE in build" OFF) -add_library(rabit src/allreduce_base.cc src/allreduce_robust.cc src/engine.cc src/c_api.cc) -add_library(rabit_base src/allreduce_base.cc src/engine_base.cc src/c_api.cc) -add_library(rabit_empty src/engine_empty.cc src/c_api.cc) -add_library(rabit_mock_static src/allreduce_base.cc src/allreduce_robust.cc src/engine_mock.cc src/c_api.cc) -add_library(rabit_mock SHARED src/allreduce_base.cc src/allreduce_robust.cc src/engine_mock.cc src/c_api.cc) +# moved from xgboost build +if(R_LIB OR MINGW OR WIN32) + add_library(rabit_base src/allreduce_base.cc src/engine_base.cc src/c_api.cc) + set(rabit_libs rabit) + set_target_properties(rabit PROPERTIES CXX_STANDARD 11 CXX_STANDARD_REQUIRED ON) +ELSE() + add_library(rabit src/allreduce_base.cc src/allreduce_robust.cc src/engine.cc src/c_api.cc) + add_library(rabit_base src/allreduce_base.cc src/engine_base.cc src/c_api.cc) + add_library(rabit_empty src/engine_empty.cc src/c_api.cc) + add_library(rabit_mock_static src/allreduce_base.cc src/allreduce_robust.cc src/engine_mock.cc src/c_api.cc) + add_library(rabit_mock SHARED src/allreduce_base.cc src/allreduce_robust.cc src/engine_mock.cc src/c_api.cc) -set(rabit_libs rabit rabit_base rabit_empty rabit_mock rabit_mock_static) -set_target_properties(rabit rabit_base rabit_empty rabit_mock rabit_mock_static PROPERTIES CXX_STANDARD 11 CXX_STANDARD_REQUIRED ON) + set(rabit_libs rabit rabit_base rabit_empty rabit_mock rabit_mock_static) + set_target_properties(rabit rabit_base rabit_empty rabit_mock rabit_mock_static PROPERTIES CXX_STANDARD 11 CXX_STANDARD_REQUIRED ON) +ENDIF(R_LIB OR MINGW OR WIN32) if(RABIT_BUILD_MPI) find_package(MPI REQUIRED) diff --git a/README.md b/README.md index bdac35ab7..eb0ce7181 100644 --- a/README.md +++ b/README.md @@ -22,6 +22,10 @@ All these features comes from the facts about small rabbit:) - Programs persist over all the iterations, unless they fail and recover. * Reliable: rabit dig burrows to avoid disasters - Rabit programs can recover the model and results using synchronous function calls. + - Rabit programs can set rabit_boostrap_cache=1 to support allreduce/broadcast operations before loadcheckpoint + ` + rabit::Init(); -> rabit::AllReduce(); -> rabit::loadCheckpoint(); -> for () { rabit::AllReduce(); rabit::Checkpoint();} -> rabit::Shutdown(); + ` ## Use Rabit * Type make in the root folder will compile the rabit library in lib folder diff --git a/include/rabit/internal/engine.h b/include/rabit/internal/engine.h index 1177be302..79727c42a 100644 --- a/include/rabit/internal/engine.h +++ b/include/rabit/internal/engine.h @@ -9,39 +9,15 @@ #include #include "../serializable.h" -// keeps rabit api caller signature -#ifndef RABIT_API_CALLER_SIGNATURE -#define RABIT_API_CALLER_SIGNATURE - -#ifdef __has_builtin - -#if __has_builtin(__builtin_FILE) +#if (defined(__GNUC__) && !defined(__clang__)) #define _FILE __builtin_FILE() -#else -#define _FILE "N/A" -#endif // __has_builtin(__builtin_FILE) - -#if __has_builtin(__builtin_LINE) #define _LINE __builtin_LINE() -#else -#define _LINE -1 -#endif // __has_builtin(__builtin_LINE) - -#if __has_builtin(__builtin_FUNCTION) #define _CALLER __builtin_FUNCTION() #else -#define _CALLER "N/A" -#endif // __has_builtin(__builtin_FUNCTION) - -#else - #define _FILE "N/A" #define _LINE -1 #define _CALLER "N/A" - -#endif // __has_builtin - -#endif // RABIT_API_CALLER_SIGNATURE +#endif // (defined(__GNUC__) && !defined(__clang__)) namespace MPI { /*! \brief MPI data type just to be compatible with MPI reduce function*/ @@ -88,7 +64,6 @@ class IEngine { * will be called by the function before performing Allreduce in order to initialize the data in sendrecvbuf. * If the result of Allreduce can be recovered directly, then prepare_func will NOT be called * \param prepare_arg argument used to pass into the lazy preprocessing function - * \param is_bootstrap if this allreduce is needed to bootstrap failed node * \param _file caller file name used to generate unique cache key * \param _line caller line number used to generate unique cache key * \param _caller caller function name used to generate unique cache key @@ -99,7 +74,6 @@ class IEngine { ReduceFunction reducer, PreprocFunction prepare_fun = NULL, void *prepare_arg = NULL, - bool is_bootstrap = false, const char* _file = _FILE, const int _line = _LINE, const char* _caller = _CALLER) = 0; @@ -108,13 +82,11 @@ class IEngine { * \param sendrecvbuf_ buffer for both sending and receiving data * \param size the size of the data to be broadcasted * \param root the root worker id to broadcast the data - * \param is_bootstrap if this broadcast is needed to bootstrap failed node * \param _file caller file name used to generate unique cache key * \param _line caller line number used to generate unique cache key * \param _caller caller function name used to generate unique cache key */ virtual void Broadcast(void *sendrecvbuf_, size_t size, int root, - bool is_bootstrap = false, const char* _file = _FILE, const int _line = _LINE, const char* _caller = _CALLER) = 0; @@ -254,7 +226,6 @@ enum DataType { * will be called by the function before performing Allreduce, to initialize the data in sendrecvbuf_. * If the result of Allreduce can be recovered directly, then prepare_func will NOT be called * \param prepare_arg argument used to pass into the lazy preprocessing function. - * \param is_bootstrap if this allreduce is needed to bootstrap failed node * \param _file caller file name used to generate unique cache key * \param _line caller line number used to generate unique cache key * \param _caller caller function name used to generate unique cache key @@ -267,7 +238,6 @@ void Allreduce_(void *sendrecvbuf, mpi::OpType op, IEngine::PreprocFunction prepare_fun = NULL, void *prepare_arg = NULL, - bool is_bootstrap = false, const char* _file = _FILE, const int _line = _LINE, const char* _caller = _CALLER); @@ -296,7 +266,6 @@ class ReduceHandle { * will be called by the function before performing Allreduce in order to initialize the data in sendrecvbuf_. * If the result of Allreduce can be recovered directly, then prepare_func will NOT be called * \param prepare_arg argument used to pass into the lazy preprocessing function - * \param is_bootstrap if this allreduce is needed to bootstrap failed node * \param _file caller file name used to generate unique cache key * \param _line caller line number used to generate unique cache key * \param _caller caller function name used to generate unique cache key @@ -306,7 +275,6 @@ class ReduceHandle { size_t count, IEngine::PreprocFunction prepare_fun = NULL, void *prepare_arg = NULL, - bool is_bootstrap = false, const char* _file = _FILE, const int _line = _LINE, const char* _caller = _CALLER); diff --git a/include/rabit/internal/rabit-inl.h b/include/rabit/internal/rabit-inl.h index 61b01b9f7..6858f0515 100644 --- a/include/rabit/internal/rabit-inl.h +++ b/include/rabit/internal/rabit-inl.h @@ -94,10 +94,9 @@ struct BitOR { }; template inline void Reducer(const void *src_, void *dst_, int len, const MPI::Datatype &dtype) { - const DType *src = (const DType*)src_; - DType *dst = (DType*)dst_; // NOLINT(*) - - for (int i = 0; i < len; ++i) { + const DType* src = (const DType*)src_; + DType* dst = (DType*)dst_; // NOLINT(*) + for (int i = 0; i < len; i++) { OP::Reduce(dst[i], src[i]); } } @@ -129,42 +128,39 @@ inline std::string GetProcessorName(void) { } // broadcast data to all other nodes from root inline void Broadcast(void *sendrecv_data, size_t size, int root, - bool is_bootstrap, const char* _file, const int _line, const char* _caller) { engine::GetEngine()->Broadcast(sendrecv_data, size, root, - is_bootstrap, _file, _line, _caller); + _file, _line, _caller); } template inline void Broadcast(std::vector *sendrecv_data, int root, - bool is_bootstrap, const char* _file, const int _line, const char* _caller) { size_t size = sendrecv_data->size(); - Broadcast(&size, sizeof(size), root, is_bootstrap, _file, _line, _caller); + Broadcast(&size, sizeof(size), root, _file, _line, _caller); if (sendrecv_data->size() != size) { sendrecv_data->resize(size); } if (size != 0) { Broadcast(&(*sendrecv_data)[0], size * sizeof(DType), root, - is_bootstrap, _file, _line, _caller); + _file, _line, _caller); } } inline void Broadcast(std::string *sendrecv_data, int root, - bool is_bootstrap, const char* _file, const int _line, const char* _caller) { size_t size = sendrecv_data->length(); - Broadcast(&size, sizeof(size), root, is_bootstrap, _file, _line, _caller); + Broadcast(&size, sizeof(size), root, _file, _line, _caller); if (sendrecv_data->length() != size) { sendrecv_data->resize(size); } if (size != 0) { Broadcast(&(*sendrecv_data)[0], size * sizeof(char), root, - is_bootstrap, _file, _line, _caller); + _file, _line, _caller); } } @@ -173,13 +169,12 @@ template inline void Allreduce(DType *sendrecvbuf, size_t count, void (*prepare_fun)(void *arg), void *prepare_arg, - bool is_bootstrap, const char* _file, const int _line, const char* _caller) { engine::Allreduce_(sendrecvbuf, sizeof(DType), count, op::Reducer, engine::mpi::GetType(), OP::kType, prepare_fun, prepare_arg, - is_bootstrap, _file, _line, _caller); + _file, _line, _caller); } // C++11 support for lambda prepare function @@ -190,13 +185,12 @@ inline void InvokeLambda_(void *fun) { template inline void Allreduce(DType *sendrecvbuf, size_t count, std::function prepare_fun, - bool is_bootstrap, const char* _file, const int _line, const char* _caller) { engine::Allreduce_(sendrecvbuf, sizeof(DType), count, op::Reducer, engine::mpi::GetType(), OP::kType, InvokeLambda_, &prepare_fun, - is_bootstrap, _file, _line, _caller); + _file, _line, _caller); } #endif // C++11 @@ -244,11 +238,12 @@ inline void ReducerSafe_(const void *src_, void *dst_, int len_, const MPI::Data const size_t kUnit = sizeof(DType); const char *psrc = reinterpret_cast(src_); char *pdst = reinterpret_cast(dst_); + for (int i = 0; i < len_; ++i) { DType tdst, tsrc; // use memcpy to avoid alignment issue - std::memcpy(&tdst, pdst + i * kUnit, sizeof(tdst)); - std::memcpy(&tsrc, psrc + i * kUnit, sizeof(tsrc)); + std::memcpy(&tdst, pdst + (i * kUnit), sizeof(tdst)); + std::memcpy(&tsrc, psrc + (i * kUnit), sizeof(tsrc)); freduce(tdst, tsrc); std::memcpy(pdst + i * kUnit, &tdst, sizeof(tdst)); } @@ -276,12 +271,11 @@ template // NOLIN inline void Reducer::Allreduce(DType *sendrecvbuf, size_t count, void (*prepare_fun)(void *arg), void *prepare_arg, - bool is_bootstrap, const char* _file, const int _line, const char* _caller) { handle_.Allreduce(sendrecvbuf, sizeof(DType), count, prepare_fun, - prepare_arg, is_bootstrap, _file, _line, _caller); + prepare_arg, _file, _line, _caller); } // function to perform reduction for SerializeReducer template @@ -330,7 +324,6 @@ inline void SerializeReducer::Allreduce(DType *sendrecvobj, size_t max_nbyte, size_t count, void (*prepare_fun)(void *arg), void *prepare_arg, - bool is_bootstrap, const char* _file, const int _line, const char* _caller) { @@ -342,7 +335,7 @@ inline void SerializeReducer::Allreduce(DType *sendrecvobj, // invoke here handle_.Allreduce(BeginPtr(buffer_), max_nbyte, count, SerializeReduceClosure::Invoke, &c, - is_bootstrap, _file, _line, _caller); + _file, _line, _caller); for (size_t i = 0; i < count; ++i) { utils::MemoryFixSizeBuffer fs(BeginPtr(buffer_) + i * max_nbyte, max_nbyte); sendrecvobj[i].Load(fs); @@ -353,23 +346,21 @@ inline void SerializeReducer::Allreduce(DType *sendrecvobj, template // NOLINT(*)g inline void Reducer::Allreduce(DType *sendrecvbuf, size_t count, std::function prepare_fun, - bool is_bootstrap, const char* _file, const int _line, const char* _caller) { this->Allreduce(sendrecvbuf, count, InvokeLambda_, &prepare_fun, - is_bootstrap, _file, _line, _caller); + _file, _line, _caller); } template inline void SerializeReducer::Allreduce(DType *sendrecvobj, size_t max_nbytes, size_t count, std::function prepare_fun, - bool is_bootstrap, const char* _file, const int _line, const char* _caller) { this->Allreduce(sendrecvobj, max_nbytes, count, InvokeLambda_, &prepare_fun, - is_bootstrap, _file, _line, _caller); + _file, _line, _caller); } #endif // DMLC_USE_CXX11 } // namespace rabit diff --git a/include/rabit/rabit.h b/include/rabit/rabit.h index a4500a65c..da963ecbf 100644 --- a/include/rabit/rabit.h +++ b/include/rabit/rabit.h @@ -26,33 +26,15 @@ #ifndef RABIT_API_CALLER_SIGNATURE #define RABIT_API_CALLER_SIGNATURE -#ifdef __has_builtin - -#if __has_builtin(__builtin_FILE) +#if (defined(__GNUC__) && !defined(__clang__)) #define _FILE __builtin_FILE() -#else -#define _FILE "N/A" -#endif // __has_builtin(__builtin_FILE) - -#if __has_builtin(__builtin_LINE) #define _LINE __builtin_LINE() -#else -#define _LINE -1 -#endif // __has_builtin(__builtin_LINE) - -#if __has_builtin(__builtin_FUNCTION) #define _CALLER __builtin_FUNCTION() #else -#define _CALLER "N/A" -#endif // __has_builtin(__builtin_FUNCTION) - -#else - #define _FILE "N/A" #define _LINE -1 #define _CALLER "N/A" - -#endif // __has_builtin +#endif // (defined(__GNUC__) && !defined(__clang__)) #endif // RABIT_API_CALLER_SIGNATURE @@ -153,13 +135,11 @@ inline void TrackerPrintf(const char *fmt, ...); * \param sendrecv_data the pointer to the send/receive buffer, * \param size the data size * \param root the process root - * \param is_bootstrap if this allreduce is needed to bootstrap failed node * \param _file caller file name used to generate unique cache key * \param _line caller line number used to generate unique cache key * \param _caller caller function name used to generate unique cache key */ inline void Broadcast(void *sendrecv_data, size_t size, int root, - bool is_bootstrap = false, const char* _file = _FILE, const int _line = _LINE, const char* _caller = _CALLER); @@ -169,7 +149,6 @@ inline void Broadcast(void *sendrecv_data, size_t size, int root, * \param sendrecv_data the pointer to send/receive vector, * for the receiver, the vector does not need to be pre-allocated * \param root the process root - * \param is_bootstrap if this allreduce is needed to bootstrap failed node * \param _file caller file name used to generate unique cache key * \param _line caller line number used to generate unique cache key * \param _caller caller function name used to generate unique cache key @@ -178,7 +157,6 @@ inline void Broadcast(void *sendrecv_data, size_t size, int root, */ template inline void Broadcast(std::vector *sendrecv_data, int root, - bool is_bootstrap = false, const char* _file = _FILE, const int _line = _LINE, const char* _caller = _CALLER); @@ -186,14 +164,12 @@ inline void Broadcast(std::vector *sendrecv_data, int root, * \brief broadcasts a std::string to every node from the root * \param sendrecv_data the pointer to the send/receive buffer, * for the receiver, the vector does not need to be pre-allocated - * \param is_bootstrap if this allreduce is needed to bootstrap failed node * \param _file caller file name used to generate unique cache key * \param _line caller line number used to generate unique cache key * \param _caller caller function name used to generate unique cache key * \param root the process root */ inline void Broadcast(std::string *sendrecv_data, int root, - bool is_bootstrap = false, const char* _file = _FILE, const int _line = _LINE, const char* _caller = _CALLER); @@ -215,7 +191,6 @@ inline void Broadcast(std::string *sendrecv_data, int root, * will be called by the function before performing Allreduce in order to initialize the data in sendrecvbuf. * If the result of Allreduce can be recovered directly, then prepare_func will NOT be called * \param prepare_arg argument used to pass into the lazy preprocessing function - * \param is_bootstrap if this allreduce is needed to bootstrap filed node * \param _file caller file name used to generate unique cache key * \param _line caller line number used to generate unique cache key * \param _caller caller function name used to generate unique cache key @@ -226,7 +201,6 @@ template inline void Allreduce(DType *sendrecvbuf, size_t count, void (*prepare_fun)(void *) = NULL, void *prepare_arg = NULL, - bool is_bootstrap = false, const char* _file = _FILE, const int _line = _LINE, const char* _caller = _CALLER); @@ -254,7 +228,6 @@ inline void Allreduce(DType *sendrecvbuf, size_t count, * \param prepare_fun Lazy lambda preprocessing function, prepare_fun() will be invoked * by the function before performing Allreduce in order to initialize the data in sendrecvbuf. * If the result of Allreduce can be recovered directly, then prepare_func will NOT be called - * \param is_bootstrap if this allreduce is needed to bootstrap failed node * \param _file caller file name used to generate unique cache key * \param _line caller line number used to generate unique cache key * \param _caller caller function name used to generate unique cache key @@ -264,7 +237,6 @@ inline void Allreduce(DType *sendrecvbuf, size_t count, template inline void Allreduce(DType *sendrecvbuf, size_t count, std::function prepare_fun, - bool is_bootstrap = false, const char* _file = _FILE, const int _line = _LINE, const char* _caller = _CALLER); @@ -363,7 +335,6 @@ class Reducer { * will be called by the function before performing Allreduce, to initialize the data in sendrecvbuf. * If the result of Allreduce can be recovered directly, then prepare_func will NOT be called * \param prepare_arg argument used to pass into the lazy preprocessing function - * \param is_bootstrap if this allreduce is needed to bootstrap filed node * \param _file caller file name used to generate unique cache key * \param _line caller line number used to generate unique cache key * \param _caller caller function name used to generate unique cache key @@ -371,7 +342,6 @@ class Reducer { inline void Allreduce(DType *sendrecvbuf, size_t count, void (*prepare_fun)(void *) = NULL, void *prepare_arg = NULL, - bool is_bootstrap = false, const char* _file = _FILE, const int _line = _LINE, const char* _caller = _CALLER); @@ -381,14 +351,12 @@ class Reducer { * \param sendrecvbuf pointer to the array of objects to be reduced * \param count number of elements to be reduced * \param prepare_fun lambda function executed to prepare the data, if necessary - * \param is_bootstrap if this allreduce is needed to bootstrap filed node * \param _file caller file name used to generate unique cache key * \param _line caller line number used to generate unique cache key * \param _caller caller function name used to generate unique cache key */ inline void Allreduce(DType *sendrecvbuf, size_t count, std::function prepare_fun, - bool is_bootstrap = false, const char* _file = _FILE, const int _line = _LINE, const char* _caller = _CALLER); @@ -422,7 +390,6 @@ class SerializeReducer { * will be called by the function before performing Allreduce, to initialize the data in sendrecvbuf. * If the result of Allreduce can be recovered directly, then the prepare_func will NOT be called * \param prepare_arg argument used to pass into the lazy preprocessing function - * \param is_bootstrap if this allreduce is needed to bootstrap failed node * \param _file caller file name used to generate unique cache key * \param _line caller line number used to generate unique cache key * \param _caller caller function name used to generate unique cache key @@ -431,7 +398,6 @@ class SerializeReducer { size_t max_nbyte, size_t count, void (*prepare_fun)(void *) = NULL, void *prepare_arg = NULL, - bool is_bootstrap = false, const char* _file = _FILE, const int _line = _LINE, const char* _caller = _CALLER); @@ -444,7 +410,6 @@ class SerializeReducer { * this includes budget limit for intermediate and final result * \param count number of elements to be reduced * \param prepare_fun lambda function executed to prepare the data, if necessary - * \param is_bootstrap if this allreduce is needed to bootstrap failed node * \param _file caller file name used to generate unique cache key * \param _line caller line number used to generate unique cache key * \param _caller caller function name used to generate unique cache key @@ -452,7 +417,6 @@ class SerializeReducer { inline void Allreduce(DType *sendrecvobj, size_t max_nbyte, size_t count, std::function prepare_fun, - bool is_bootstrap = false, const char* _file = _FILE, const int _line = _LINE, const char* _caller = _CALLER); diff --git a/src/allreduce_base.h b/src/allreduce_base.h index 1cac6b708..aecdecdff 100644 --- a/src/allreduce_base.h +++ b/src/allreduce_base.h @@ -83,7 +83,6 @@ class AllreduceBase : public IEngine { * will be called by the function before performing Allreduce, to intialize the data in sendrecvbuf_. * If the result of Allreduce can be recovered directly, then prepare_func will NOT be called * \param prepare_arg argument used to passed into the lazy preprocessing function - * \param is_bootstrap if this allreduce is needed to bootstrap filed node * \param _file caller file name used to generate unique cache key * \param _line caller line number used to generate unique cache key * \param _caller caller function name used to generate unique cache key @@ -94,7 +93,6 @@ class AllreduceBase : public IEngine { ReduceFunction reducer, PreprocFunction prepare_fun = NULL, void *prepare_arg = NULL, - bool is_bootstrap = false, const char* _file = _FILE, const int _line = _LINE, const char* _caller = _CALLER) { @@ -109,14 +107,12 @@ class AllreduceBase : public IEngine { * \param sendrecvbuf_ buffer for both sending and recving data * \param size the size of the data to be broadcasted * \param root the root worker id to broadcast the data - * \param is_bootstrap if this broadcast is needed to bootstrap filed node * \param _file caller file name used to generate unique cache key * \param _line caller line number used to generate unique cache key * \param _caller caller function name used to generate unique cache key */ virtual void Broadcast(void *sendrecvbuf_, size_t total_size, int root, - bool is_bootstrap = false, const char* _file = _FILE, - const int _line = _LINE, const char* _caller = _CALLER) { + const char* _file = _FILE, const int _line = _LINE, const char* _caller = _CALLER) { if (world_size == 1 || world_size == -1) return; utils::Assert(TryBroadcast(sendrecvbuf_, total_size, root) == kSuccess, "Broadcast failed"); diff --git a/src/allreduce_robust.cc b/src/allreduce_robust.cc index a8d08aa9c..fa732bd26 100644 --- a/src/allreduce_robust.cc +++ b/src/allreduce_robust.cc @@ -30,6 +30,7 @@ AllreduceRobust::AllreduceRobust(void) { global_lazycheck = NULL; use_local_model = -1; recover_counter = 0; + checkpoint_loaded = false; env_vars.push_back("rabit_global_replica"); env_vars.push_back("rabit_local_replica"); } @@ -38,6 +39,7 @@ bool AllreduceRobust::Init(int argc, char* argv[]) { // chenqin: alert user opted in experimental feature. if (rabit_bootstrap_cache) utils::HandleLogInfo( "[EXPERIMENTAL] rabit bootstrap cache has been enabled\n"); + checkpoint_loaded = false; if (num_global_replica == 0) { result_buffer_round = -1; } else { @@ -157,7 +159,6 @@ int AllreduceRobust::GetBootstrapCache(const std::string &key, void* buf, * will be called by the function before performing Allreduce, to intialize the data in sendrecvbuf_. * If the result of Allreduce can be recovered directly, then prepare_func will NOT be called * \param prepare_arg argument used to passed into the lazy preprocessing function - * \param is_bootstrap if this allreduce is needed to bootstrap filed node * \param _file caller file name used to generate unique cache key * \param _line caller line number used to generate unique cache key * \param _caller caller function name used to generate unique cache key @@ -168,7 +169,6 @@ void AllreduceRobust::Allreduce(void *sendrecvbuf_, ReduceFunction reducer, PreprocFunction prepare_fun, void *prepare_arg, - bool is_bootstrap, const char* _file, const int _line, const char* _caller) { @@ -183,7 +183,7 @@ void AllreduceRobust::Allreduce(void *sendrecvbuf_, + std::string(_caller) + "#" +std::to_string(type_nbytes) + "x" + std::to_string(count); // try fetch bootstrap allreduce results from cache - if (is_bootstrap && rabit_bootstrap_cache && + if (!checkpoint_loaded && rabit_bootstrap_cache && GetBootstrapCache(key, sendrecvbuf_, type_nbytes, count, true) != -1) return; double start = utils::GetTime(); @@ -217,7 +217,7 @@ void AllreduceRobust::Allreduce(void *sendrecvbuf_, } // if bootstrap allreduce, store and fetch through cache - if (!is_bootstrap || !rabit_bootstrap_cache) { + if (checkpoint_loaded || !rabit_bootstrap_cache) { resbuf.PushTemp(seq_counter, type_nbytes, count); seq_counter += 1; } else { @@ -229,13 +229,11 @@ void AllreduceRobust::Allreduce(void *sendrecvbuf_, * \param sendrecvbuf_ buffer for both sending and recving data * \param size the size of the data to be broadcasted * \param root the root worker id to broadcast the data - * \param is_bootstrap if this allreduce is needed to bootstrap filed node * \param _file caller file name used to generate unique cache key * \param _line caller line number used to generate unique cache key * \param _caller caller function name used to generate unique cache key */ void AllreduceRobust::Broadcast(void *sendrecvbuf_, size_t total_size, int root, - bool is_bootstrap, const char* _file, const int _line, const char* _caller) { @@ -245,7 +243,7 @@ void AllreduceRobust::Broadcast(void *sendrecvbuf_, size_t total_size, int root, std::string key = std::string(_file) + "::" + std::to_string(_line) + "::" + std::string(_caller) + "#" +std::to_string(total_size) + "@" + std::to_string(root); // try fetch bootstrap allreduce results from cache - if (is_bootstrap && rabit_bootstrap_cache && + if (!checkpoint_loaded && rabit_bootstrap_cache && GetBootstrapCache(key, sendrecvbuf_, total_size, 1, true) != -1) return; double start = utils::GetTime(); @@ -277,7 +275,7 @@ void AllreduceRobust::Broadcast(void *sendrecvbuf_, size_t total_size, int root, rank, key.c_str(), root, version_number, seq_counter, delta); } // if bootstrap broadcast, store and fetch through cache - if (!is_bootstrap || !rabit_bootstrap_cache) { + if (checkpoint_loaded || !rabit_bootstrap_cache) { resbuf.PushTemp(seq_counter, 1, total_size); seq_counter += 1; } else { @@ -308,6 +306,7 @@ void AllreduceRobust::Broadcast(void *sendrecvbuf_, size_t total_size, int root, */ int AllreduceRobust::LoadCheckPoint(Serializable *global_model, Serializable *local_model) { + checkpoint_loaded = true; // skip action in single node if (world_size == 1) return 0; this->LocalModelCheck(local_model != NULL); diff --git a/src/allreduce_robust.h b/src/allreduce_robust.h index d23707987..c263aa47d 100644 --- a/src/allreduce_robust.h +++ b/src/allreduce_robust.h @@ -62,7 +62,6 @@ class AllreduceRobust : public AllreduceBase { * If the result of Allreduce can be recovered directly, then prepare_func will NOT be called * \param prepare_arg argument used to passed into the lazy preprocessing function * \param prepare_arg argument used to passed into the lazy preprocessing function - * \param is_bootstrap if this allreduce is needed to bootstrap filed node * \param _file caller file name used to generate unique cache key * \param _line caller line number used to generate unique cache key * \param _caller caller function name used to generate unique cache key @@ -73,7 +72,6 @@ class AllreduceRobust : public AllreduceBase { ReduceFunction reducer, PreprocFunction prepare_fun = NULL, void *prepare_arg = NULL, - bool is_bootstrap = false, const char* _file = _FILE, const int _line = _LINE, const char* _caller = _CALLER); @@ -82,13 +80,11 @@ class AllreduceRobust : public AllreduceBase { * \param sendrecvbuf_ buffer for both sending and recving data * \param size the size of the data to be broadcasted * \param root the root worker id to broadcast the data - * \param is_bootstrap if this broadcast is needed to bootstrap filed node * \param _file caller file name used to generate unique cache key * \param _line caller line number used to generate unique cache key * \param _caller caller function name used to generate unique cache key */ virtual void Broadcast(void *sendrecvbuf_, size_t total_size, int root, - bool is_bootstrap = false, const char* _file = _FILE, const int _line = _LINE, const char* _caller = _CALLER); @@ -643,6 +639,8 @@ o * the input state must exactly one saved state(local state of current node) std::string local_chkpt[2]; // version of local checkpoint can be 1 or 0 int local_chkpt_version; + // if checkpoint were loaded, used to distinguish results boostrap cache from seqno cache + bool checkpoint_loaded; }; } // namespace engine } // namespace rabit diff --git a/src/engine.cc b/src/engine.cc index a880eb93c..fe43acb1f 100644 --- a/src/engine.cc +++ b/src/engine.cc @@ -93,12 +93,11 @@ void Allreduce_(void *sendrecvbuf, mpi::OpType op, IEngine::PreprocFunction prepare_fun, void *prepare_arg, - bool is_bootstrap, const char* _file, const int _line, const char* _caller) { GetEngine()->Allreduce(sendrecvbuf, type_nbytes, count, red, prepare_fun, - prepare_arg, is_bootstrap, _file, _line, _caller); + prepare_arg, _file, _line, _caller); } // code for reduce handle @@ -121,14 +120,13 @@ void ReduceHandle::Allreduce(void *sendrecvbuf, size_t type_nbytes, size_t count, IEngine::PreprocFunction prepare_fun, void *prepare_arg, - bool is_bootstrap, const char* _file, const int _line, const char* _caller) { utils::Assert(redfunc_ != NULL, "must intialize handle to call AllReduce"); GetEngine()->Allreduce(sendrecvbuf, type_nbytes, count, redfunc_, prepare_fun, prepare_arg, - is_bootstrap, _file, _line, _caller); + _file, _line, _caller); } } // namespace engine } // namespace rabit diff --git a/src/engine_empty.cc b/src/engine_empty.cc index 1a711f82b..c635b0208 100644 --- a/src/engine_empty.cc +++ b/src/engine_empty.cc @@ -31,7 +31,6 @@ class EmptyEngine : public IEngine { ReduceFunction reducer, PreprocFunction prepare_fun, void *prepare_arg, - bool is_bootstrap, const char* _file, const int _line, const char* _caller) { @@ -39,8 +38,7 @@ class EmptyEngine : public IEngine { "use Allreduce_ instead"); } virtual void Broadcast(void *sendrecvbuf_, size_t size, int root, - bool is_bootstrap, const char* _file, - const int _line, const char* _caller) { + const char* _file, const int _line, const char* _caller) { } virtual void InitAfterException(void) { utils::Error("EmptyEngine is not fault tolerant"); @@ -109,7 +107,6 @@ void Allreduce_(void *sendrecvbuf, mpi::OpType op, IEngine::PreprocFunction prepare_fun, void *prepare_arg, - bool is_bootstrap, const char* _file, const int _line, const char* _caller) { @@ -129,7 +126,6 @@ void ReduceHandle::Allreduce(void *sendrecvbuf, size_t type_nbytes, size_t count, IEngine::PreprocFunction prepare_fun, void *prepare_arg, - bool is_bootstrap, const char* _file, const int _line, const char* _caller) { diff --git a/src/engine_mpi.cc b/src/engine_mpi.cc index 60948de53..026b30f3a 100644 --- a/src/engine_mpi.cc +++ b/src/engine_mpi.cc @@ -33,7 +33,6 @@ class MPIEngine : public IEngine { ReduceFunction reducer, PreprocFunction prepare_fun, void *prepare_arg, - bool is_bootstrap, const char* _file, const int _line, const char* _caller) { @@ -41,7 +40,7 @@ class MPIEngine : public IEngine { "use Allreduce_ instead"); } virtual void Broadcast(void *sendrecvbuf_, size_t size, int root, - bool is_bootstrap, const char* _file, const int _line, + const char* _file, const int _line, const char* _caller) { MPI::COMM_WORLD.Bcast(sendrecvbuf_, size, MPI::CHAR, root); } @@ -160,7 +159,6 @@ void Allreduce_(void *sendrecvbuf, mpi::OpType op, IEngine::PreprocFunction prepare_fun, void *prepare_arg, - bool is_bootstrap, const char* _file, const int _line, const char* _caller) { @@ -212,7 +210,6 @@ void ReduceHandle::Allreduce(void *sendrecvbuf, size_t type_nbytes, size_t count, IEngine::PreprocFunction prepare_fun, void *prepare_arg, - bool is_bootstrap, const char* _file, const int _line, const char* _caller) { diff --git a/test/model_recover.cc b/test/model_recover.cc index ae4fe4083..3745caf5a 100644 --- a/test/model_recover.cc +++ b/test/model_recover.cc @@ -96,7 +96,7 @@ int main(int argc, char *argv[]) { std::string name = rabit::GetProcessorName(); int max_rank = rank; - rabit::Allreduce(&max_rank, sizeof(int), NULL, NULL, true); + rabit::Allreduce(&max_rank, sizeof(int)); utils::Check(max_rank == nproc - 1, "max rank is world size-1"); Model model;