diff --git a/jvm-packages/xgboost4j/src/native/xgboost4j.cpp b/jvm-packages/xgboost4j/src/native/xgboost4j.cpp index 804e3653c..8c2eeb691 100644 --- a/jvm-packages/xgboost4j/src/native/xgboost4j.cpp +++ b/jvm-packages/xgboost4j/src/native/xgboost4j.cpp @@ -964,7 +964,7 @@ JNIEXPORT jint JNICALL Java_ml_dmlc_xgboost4j_java_XGBoostJNI_RabitVersionNumber JNIEXPORT jint JNICALL Java_ml_dmlc_xgboost4j_java_XGBoostJNI_RabitAllreduce (JNIEnv *jenv, jclass jcls, jobject jsendrecvbuf, jint jcount, jint jenum_dtype, jint jenum_op) { void *ptr_sendrecvbuf = jenv->GetDirectBufferAddress(jsendrecvbuf); - RabitAllreduce(ptr_sendrecvbuf, (size_t) jcount, jenum_dtype, jenum_op, NULL, NULL); + JVM_CHECK_CALL(RabitAllreduce(ptr_sendrecvbuf, (size_t) jcount, jenum_dtype, jenum_op, NULL, NULL)); return 0; } diff --git a/python-package/xgboost/rabit.py b/python-package/xgboost/rabit.py index a54a46f48..5225be2cd 100644 --- a/python-package/xgboost/rabit.py +++ b/python-package/xgboost/rabit.py @@ -120,18 +120,18 @@ def broadcast(data, root): s = pickle.dumps(data, protocol=pickle.HIGHEST_PROTOCOL) length.value = len(s) # run first broadcast - _LIB.RabitBroadcast(ctypes.byref(length), - ctypes.sizeof(ctypes.c_ulong), root) + _check_call(_LIB.RabitBroadcast(ctypes.byref(length), + ctypes.sizeof(ctypes.c_ulong), root)) if root != rank: dptr = (ctypes.c_char * length.value)() # run second - _LIB.RabitBroadcast(ctypes.cast(dptr, ctypes.c_void_p), - length.value, root) + _check_call(_LIB.RabitBroadcast(ctypes.cast(dptr, ctypes.c_void_p), + length.value, root)) data = pickle.loads(dptr.raw) del dptr else: - _LIB.RabitBroadcast(ctypes.cast(ctypes.c_char_p(s), ctypes.c_void_p), - length.value, root) + _check_call(_LIB.RabitBroadcast(ctypes.cast(ctypes.c_char_p(s), ctypes.c_void_p), + length.value, root)) del s return data @@ -189,18 +189,18 @@ def allreduce(data, op, prepare_fun=None): if buf.dtype not in DTYPE_ENUM__: raise Exception('data type %s not supported' % str(buf.dtype)) if prepare_fun is None: - _LIB.RabitAllreduce(buf.ctypes.data_as(ctypes.c_void_p), - buf.size, DTYPE_ENUM__[buf.dtype], - op, None, None) + _check_call(_LIB.RabitAllreduce(buf.ctypes.data_as(ctypes.c_void_p), + buf.size, DTYPE_ENUM__[buf.dtype], + op, None, None)) else: func_ptr = ctypes.CFUNCTYPE(None, ctypes.c_void_p) def pfunc(_): """prepare function.""" prepare_fun(data) - _LIB.RabitAllreduce(buf.ctypes.data_as(ctypes.c_void_p), - buf.size, DTYPE_ENUM__[buf.dtype], - op, func_ptr(pfunc), None) + _check_call(_LIB.RabitAllreduce(buf.ctypes.data_as(ctypes.c_void_p), + buf.size, DTYPE_ENUM__[buf.dtype], + op, func_ptr(pfunc), None)) return buf diff --git a/rabit/.travis.yml b/rabit/.travis.yml deleted file mode 100644 index 3f6c10f2a..000000000 --- a/rabit/.travis.yml +++ /dev/null @@ -1,90 +0,0 @@ -sudo: true - -os: - - linux - - osx - -osx_image: xcode10.2 - -dist: xenial - -language: cpp - -# Use Build Matrix to do lint and build seperately -env: - matrix: - - TASK=lint LINT_LANG=cpp - - TASK=lint LINT_LANG=python - - TASK=doc - # - TASK=build - - TASK=mpi-build - - TASK=cmake-test - -matrix: - exclude: - - os: osx - env: TASK=lint LINT_LANG=cpp - - os: osx - env: TASK=lint LINT_LANG=python - - os: osx - env: TASK=doc - - os: osx - env: TASK=build - -# dependent apt packages -addons: - apt: - sources: - - llvm-toolchain-trusty-5.0 - - ubuntu-toolchain-r-test - - george-edison55-precise-backports - packages: - - doxygen - - wget - - git - - libcurl4-openssl-dev - - unzip - - python-numpy - - gcc-4.8 - - g++-4.8 - - openssh-client - - openssh-server - - python3 - - python3-setuptools - - python3-pip - - tree - homebrew: - packages: - - gcc49 - - openssl - - libgit2 - - python3 - update: true - -before_install: - - git clone https://github.com/dmlc/dmlc-core - - export TRAVIS=./scripts/ - - source ${TRAVIS}/travis_setup_env.sh - - ${TRAVIS}/travis_osx_install.sh - - source ./scripts/travis_setup.sh - -script: scripts/travis_script.sh - -cache: - directories: - - ${HOME}/.cache/usr - - ${HOME}/.cache/pip - - mpich - -before_cache: - - ${TRAVIS}/travis_before_cache.sh - -after_success: - - tree build - - bash <(curl -s https://codecov.io/bash) -a '-o src/ src/*.c' - -notifications: -# Emails are sent to the committer's git-configured email address by default, - email: - on_success: change - on_failure: always diff --git a/rabit/README.md b/rabit/README.md index eb0ce7181..0be1b7015 100644 --- a/rabit/README.md +++ b/rabit/README.md @@ -1,40 +1 @@ -# Rabit: Reliable Allreduce and Broadcast Interface -[![Build Status](https://travis-ci.org/dmlc/rabit.svg?branch=master)](https://travis-ci.org/dmlc/rabit) -[![Documentation Status](https://readthedocs.org/projects/rabit/badge/?version=latest)](http://rabit.readthedocs.org/) - -rabit is a light weight library that provides a fault tolerant interface of Allreduce and Broadcast. It is designed to support easy implementations of distributed machine learning programs, many of which fall naturally under the Allreduce abstraction. The goal of rabit is to support ***portable*** , ***scalable*** and ***reliable*** distributed machine learning programs. - -* [Tutorial](guide) -* [API Documentation](http://homes.cs.washington.edu/~tqchen/rabit/doc) -* You can also directly read the [interface header](include/rabit.h) -* [XGBoost](https://github.com/dmlc/xgboost) - - Rabit is one of the backbone library to support distributed XGBoost - -## Features -All these features comes from the facts about small rabbit:) -* Portable: rabit is light weight and runs everywhere - - Rabit is a library instead of a framework, a program only needs to link the library to run - - Rabit only replies on a mechanism to start program, which was provided by most framework - - You can run rabit programs on many platforms, including Yarn(Hadoop), MPI using the same code -* Scalable and Flexible: rabit runs fast - * Rabit program use Allreduce to communicate, and do not suffer the cost between iterations of MapReduce abstraction. - - Programs can call rabit functions in any order, as opposed to frameworks where callbacks are offered and called by the framework, i.e. inversion of control principle. - - Programs persist over all the iterations, unless they fail and recover. -* Reliable: rabit dig burrows to avoid disasters - - Rabit programs can recover the model and results using synchronous function calls. - - Rabit programs can set rabit_boostrap_cache=1 to support allreduce/broadcast operations before loadcheckpoint - ` - rabit::Init(); -> rabit::AllReduce(); -> rabit::loadCheckpoint(); -> for () { rabit::AllReduce(); rabit::Checkpoint();} -> rabit::Shutdown(); - ` - -## Use Rabit -* Type make in the root folder will compile the rabit library in lib folder -* Add lib to the library path and include to the include path of compiler -* Languages: You can use rabit in C++ and python - - It is also possible to port the library to other languages - -## Contributing -Rabit is an open-source library, contributions are welcomed, including: -* The rabit core library. -* Customized tracker script for new platforms and interface of new languages. -* Tutorial and examples about the library. +# This directory contains the CPU network module for XGBoost. The library originates from [RABIT](https://github.com/dmlc/rabit) \ No newline at end of file diff --git a/rabit/cmake/Config.cmake.in b/rabit/cmake/Config.cmake.in deleted file mode 100644 index 38bbde7b3..000000000 --- a/rabit/cmake/Config.cmake.in +++ /dev/null @@ -1,4 +0,0 @@ -@PACKAGE_INIT@ - -include("${CMAKE_CURRENT_LIST_DIR}/@TARGETS_EXPORT_NAME@.cmake") -check_required_components("@PROJECT_NAME@") diff --git a/rabit/cmake/googletest-download.cmake b/rabit/cmake/googletest-download.cmake deleted file mode 100644 index 7cf367cd8..000000000 --- a/rabit/cmake/googletest-download.cmake +++ /dev/null @@ -1,20 +0,0 @@ -# code copied from https://crascit.com/2015/07/25/cmake-gtest/ -cmake_minimum_required(VERSION 3.5 FATAL_ERROR) - -project(googletest-download NONE) - -include(ExternalProject) - -ExternalProject_Add( - googletest - SOURCE_DIR "@GOOGLETEST_DOWNLOAD_ROOT@/googletest-src" - BINARY_DIR "@GOOGLETEST_DOWNLOAD_ROOT@/googletest-build" - GIT_REPOSITORY - https://github.com/google/googletest.git - GIT_TAG - release-1.8.0 - CONFIGURE_COMMAND "" - BUILD_COMMAND "" - INSTALL_COMMAND "" - TEST_COMMAND "" -) \ No newline at end of file diff --git a/rabit/cmake/googletest.cmake b/rabit/cmake/googletest.cmake deleted file mode 100644 index 0a0b1a55c..000000000 --- a/rabit/cmake/googletest.cmake +++ /dev/null @@ -1,32 +0,0 @@ -# the following code to fetch googletest -# is inspired by and adapted after https://crascit.com/2015/07/25/cmake-gtest/ -# download and unpack googletest at configure time - -macro(fetch_googletest _download_module_path _download_root) - set(GOOGLETEST_DOWNLOAD_ROOT ${_download_root}) - configure_file( - ${_download_module_path}/googletest-download.cmake - ${_download_root}/CMakeLists.txt - @ONLY - ) - unset(GOOGLETEST_DOWNLOAD_ROOT) - - execute_process( - COMMAND - "${CMAKE_COMMAND}" -G "${CMAKE_GENERATOR}" . - WORKING_DIRECTORY - ${_download_root} - ) - execute_process( - COMMAND - "${CMAKE_COMMAND}" --build . - WORKING_DIRECTORY - ${_download_root} - ) - - # adds the targers: gtest, gtest_main, gmock, gmock_main - add_subdirectory( - ${_download_root}/googletest-src - ${_download_root}/googletest-build - ) -endmacro() \ No newline at end of file diff --git a/rabit/doc/python-requirements.txt b/rabit/doc/python-requirements.txt index 5970c4367..244b8378f 100644 --- a/rabit/doc/python-requirements.txt +++ b/rabit/doc/python-requirements.txt @@ -1,4 +1,3 @@ numpy breathe commonmark - diff --git a/rabit/include/rabit/c_api.h b/rabit/include/rabit/c_api.h index 404981457..deb21d06c 100644 --- a/rabit/include/rabit/c_api.h +++ b/rabit/include/rabit/c_api.h @@ -41,7 +41,7 @@ RABIT_DLL bool RabitInit(int argc, char *argv[]); * call this function after you finished all jobs. * \return true if rabit is initialized successfully otherwise false */ -RABIT_DLL bool RabitFinalize(void); +RABIT_DLL int RabitFinalize(void); /*! * \brief get rank of previous process in ring topology @@ -91,8 +91,7 @@ RABIT_DLL void RabitGetProcessorName(char *out_name, * \param size the size of the data * \param root the root of process */ -RABIT_DLL void RabitBroadcast(void *sendrecv_data, - rbt_ulong size, int root); +RABIT_DLL int RabitBroadcast(void *sendrecv_data, rbt_ulong size, int root); /*! * \brief Allgather function, each node have a segment of data in the ring of sendrecvbuf, @@ -110,12 +109,9 @@ RABIT_DLL void RabitBroadcast(void *sendrecv_data, * \return this function can return kSuccess, kSockError, kGetExcept, see ReturnType for details * \sa ReturnType */ -RABIT_DLL void RabitAllgather(void *sendrecvbuf, - size_t total_size, - size_t beginIndex, - size_t size_node_slice, - size_t size_prev_slice, - int enum_dtype); +RABIT_DLL int RabitAllgather(void *sendrecvbuf, size_t total_size, + size_t beginIndex, size_t size_node_slice, + size_t size_prev_slice, int enum_dtype); /*! * \brief perform in-place allreduce, on sendrecvbuf @@ -133,14 +129,11 @@ RABIT_DLL void RabitAllgather(void *sendrecvbuf, * \param prepare_fun Lazy preprocessing function, if it is not NULL, prepare_fun(prepare_arg) * will be called by the function before performing Allreduce, to intialize the data in sendrecvbuf_. * If the result of Allreduce can be recovered directly, then prepare_func will NOT be called - * \param prepare_arg argument used to passed into the lazy preprocessing function - */ -RABIT_DLL void RabitAllreduce(void *sendrecvbuf, - size_t count, - int enum_dtype, - int enum_op, - void (*prepare_fun)(void *arg), - void *prepare_arg); + * \param prepare_arg argument used to passed into the lazy preprocessing function + */ +RABIT_DLL int RabitAllreduce(void *sendrecvbuf, size_t count, int enum_dtype, + int enum_op, void (*prepare_fun)(void *arg), + void *prepare_arg); /*! * \brief load latest check point diff --git a/rabit/include/rabit/internal/engine.h b/rabit/include/rabit/internal/engine.h index 8cb03f35f..50b452f8d 100644 --- a/rabit/include/rabit/internal/engine.h +++ b/rabit/include/rabit/internal/engine.h @@ -9,16 +9,6 @@ #include #include "rabit/serializable.h" -#if (defined(__GNUC__) && !defined(__clang__)) -#define _FILE __builtin_FILE() -#define _LINE __builtin_LINE() -#define _CALLER __builtin_FUNCTION() -#else -#define _FILE "N/A" -#define _LINE -1 -#define _CALLER "N/A" -#endif // (defined(__GNUC__) && !defined(__clang__)) - namespace MPI { // NOLINT /*! \brief MPI data type just to be compatible with MPI reduce function*/ class Datatype; @@ -65,18 +55,12 @@ class IEngine { * \param slice_begin beginning of the current slice * \param slice_end end of the current slice * \param size_prev_slice size of the previous slice i.e. slice of node (rank - 1) % world_size - * \param _file caller file name used to generate unique cache key - * \param _line caller line number used to generate unique cache key - * \param _caller caller function name used to generate unique cache key */ virtual void Allgather(void *sendrecvbuf, size_t total_size, size_t slice_begin, size_t slice_end, - size_t size_prev_slice, - const char* _file = _FILE, - const int _line = _LINE, - const char* _caller = _CALLER) = 0; + size_t size_prev_slice) = 0; /*! * \brief performs in-place Allreduce, on sendrecvbuf * this function is NOT thread-safe @@ -88,38 +72,20 @@ class IEngine { * will be called by the function before performing Allreduce in order to initialize the data in sendrecvbuf. * If the result of Allreduce can be recovered directly, then prepare_func will NOT be called * \param prepare_arg argument used to pass into the lazy preprocessing function - * \param _file caller file name used to generate unique cache key - * \param _line caller line number used to generate unique cache key - * \param _caller caller function name used to generate unique cache key */ virtual void Allreduce(void *sendrecvbuf_, size_t type_nbytes, size_t count, ReduceFunction reducer, PreprocFunction prepare_fun = nullptr, - void *prepare_arg = nullptr, - const char* _file = _FILE, - const int _line = _LINE, - const char* _caller = _CALLER) = 0; + void *prepare_arg = nullptr) = 0; /*! * \brief broadcasts data from root to every other node * \param sendrecvbuf_ buffer for both sending and receiving data * \param size the size of the data to be broadcasted * \param root the root worker id to broadcast the data - * \param _file caller file name used to generate unique cache key - * \param _line caller line number used to generate unique cache key - * \param _caller caller function name used to generate unique cache key */ - virtual void Broadcast(void *sendrecvbuf_, size_t size, int root, - const char* _file = _FILE, - const int _line = _LINE, - const char* _caller = _CALLER) = 0; - /*! - * \brief explicitly re-initialize everything before calling LoadCheckPoint - * call this function when IEngine throws an exception, - * this function should only be used for test purposes - */ - virtual void InitAfterException() = 0; + virtual void Broadcast(void *sendrecvbuf_, size_t size, int root) = 0; /*! * \brief loads the latest check point * \param global_model pointer to the globally shared model/state @@ -250,18 +216,12 @@ enum DataType { * \param slice_begin beginning of the current slice * \param slice_end end of the current slice * \param size_prev_slice size of the previous slice i.e. slice of node (rank - 1) % world_size - * \param _file caller file name used to generate unique cache key - * \param _line caller line number used to generate unique cache key - * \param _caller caller function name used to generate unique cache key */ void Allgather(void* sendrecvbuf, size_t total_size, size_t slice_begin, size_t slice_end, - size_t size_prev_slice, - const char* _file = _FILE, - const int _line = _LINE, - const char* _caller = _CALLER); + size_t size_prev_slice); /*! * \brief perform in-place Allreduce, on sendrecvbuf * this is an internal function used by rabit to be able to compile with MPI @@ -276,9 +236,6 @@ void Allgather(void* sendrecvbuf, * will be called by the function before performing Allreduce, to initialize the data in sendrecvbuf_. * If the result of Allreduce can be recovered directly, then prepare_func will NOT be called * \param prepare_arg argument used to pass into the lazy preprocessing function. - * \param _file caller file name used to generate unique cache key - * \param _line caller line number used to generate unique cache key - * \param _caller caller function name used to generate unique cache key */ void Allreduce_(void *sendrecvbuf, // NOLINT size_t type_nbytes, @@ -287,10 +244,7 @@ void Allreduce_(void *sendrecvbuf, // NOLINT mpi::DataType dtype, mpi::OpType op, IEngine::PreprocFunction prepare_fun = nullptr, - void *prepare_arg = nullptr, - const char* _file = _FILE, - const int _line = _LINE, - const char* _caller = _CALLER); + void *prepare_arg = nullptr); /*! * \brief handle for customized reducer, used to handle customized reduce * this class is mainly created for compatiblity issues with MPI's customized reduce @@ -316,18 +270,13 @@ class ReduceHandle { * will be called by the function before performing Allreduce in order to initialize the data in sendrecvbuf_. * If the result of Allreduce can be recovered directly, then prepare_func will NOT be called * \param prepare_arg argument used to pass into the lazy preprocessing function - * \param _file caller file name used to generate unique cache key - * \param _line caller line number used to generate unique cache key - * \param _caller caller function name used to generate unique cache key */ void Allreduce(void *sendrecvbuf, size_t type_nbytes, size_t count, IEngine::PreprocFunction prepare_fun = nullptr, - void *prepare_arg = nullptr, - const char* _file = _FILE, - const int _line = _LINE, - const char* _caller = _CALLER); + void *prepare_arg = nullptr); + /*! \return the number of bytes occupied by the type */ static int TypeSize(const MPI::Datatype &dtype); diff --git a/rabit/include/rabit/internal/rabit-inl.h b/rabit/include/rabit/internal/rabit-inl.h index fa6902f84..701f801e8 100644 --- a/rabit/include/rabit/internal/rabit-inl.h +++ b/rabit/include/rabit/internal/rabit-inl.h @@ -131,40 +131,28 @@ inline std::string GetProcessorName() { return engine::GetEngine()->GetHost(); } // broadcast data to all other nodes from root -inline void Broadcast(void *sendrecv_data, size_t size, int root, - const char* _file, - const int _line, - const char* _caller) { - engine::GetEngine()->Broadcast(sendrecv_data, size, root, - _file, _line, _caller); +inline void Broadcast(void *sendrecv_data, size_t size, int root) { + engine::GetEngine()->Broadcast(sendrecv_data, size, root); } template -inline void Broadcast(std::vector *sendrecv_data, int root, - const char* _file, - const int _line, - const char* _caller) { +inline void Broadcast(std::vector *sendrecv_data, int root) { size_t size = sendrecv_data->size(); - Broadcast(&size, sizeof(size), root, _file, _line, _caller); + Broadcast(&size, sizeof(size), root); if (sendrecv_data->size() != size) { sendrecv_data->resize(size); } if (size != 0) { - Broadcast(&(*sendrecv_data)[0], size * sizeof(DType), root, - _file, _line, _caller); + Broadcast(&(*sendrecv_data)[0], size * sizeof(DType), root); } } -inline void Broadcast(std::string *sendrecv_data, int root, - const char* _file, - const int _line, - const char* _caller) { +inline void Broadcast(std::string *sendrecv_data, int root) { size_t size = sendrecv_data->length(); - Broadcast(&size, sizeof(size), root, _file, _line, _caller); + Broadcast(&size, sizeof(size), root); if (sendrecv_data->length() != size) { sendrecv_data->resize(size); } if (size != 0) { - Broadcast(&(*sendrecv_data)[0], size * sizeof(char), root, - _file, _line, _caller); + Broadcast(&(*sendrecv_data)[0], size * sizeof(char), root); } } @@ -172,13 +160,9 @@ inline void Broadcast(std::string *sendrecv_data, int root, template inline void Allreduce(DType *sendrecvbuf, size_t count, void (*prepare_fun)(void *arg), - void *prepare_arg, - const char* _file, - const int _line, - const char* _caller) { + void *prepare_arg) { engine::Allreduce_(sendrecvbuf, sizeof(DType), count, op::Reducer, - engine::mpi::GetType(), OP::kType, prepare_fun, prepare_arg, - _file, _line, _caller); + engine::mpi::GetType(), OP::kType, prepare_fun, prepare_arg); } // C++11 support for lambda prepare function @@ -188,13 +172,9 @@ inline void InvokeLambda(void *fun) { } template inline void Allreduce(DType *sendrecvbuf, size_t count, - std::function prepare_fun, - const char* _file, - const int _line, - const char* _caller) { + std::function prepare_fun) { engine::Allreduce_(sendrecvbuf, sizeof(DType), count, op::Reducer, - engine::mpi::GetType(), OP::kType, InvokeLambda, &prepare_fun, - _file, _line, _caller); + engine::mpi::GetType(), OP::kType, InvokeLambda, &prepare_fun); } // Performs inplace Allgather @@ -203,13 +183,10 @@ inline void Allgather(DType *sendrecvbuf, size_t totalSize, size_t beginIndex, size_t sizeNodeSlice, - size_t sizePrevSlice, - const char* _file, - const int _line, - const char* _caller) { + size_t sizePrevSlice) { engine::GetEngine()->Allgather(sendrecvbuf, totalSize * sizeof(DType), beginIndex * sizeof(DType), (beginIndex + sizeNodeSlice) * sizeof(DType), - sizePrevSlice * sizeof(DType), _file, _line, _caller); + sizePrevSlice * sizeof(DType)); } #endif // C++11 @@ -289,12 +266,9 @@ inline Reducer::Reducer() { template // NOLINT(*) inline void Reducer::Allreduce(DType *sendrecvbuf, size_t count, void (*prepare_fun)(void *arg), - void *prepare_arg, - const char* _file, - const int _line, - const char* _caller) { + void *prepare_arg) { handle_.Allreduce(sendrecvbuf, sizeof(DType), count, prepare_fun, - prepare_arg, _file, _line, _caller); + prepare_arg); } // function to perform reduction for SerializeReducer template @@ -342,10 +316,7 @@ template inline void SerializeReducer::Allreduce(DType *sendrecvobj, size_t max_nbyte, size_t count, void (*prepare_fun)(void *arg), - void *prepare_arg, - const char* _file, - const int _line, - const char* _caller) { + void *prepare_arg) { buffer_.resize(max_nbyte * count); // setup closure SerializeReduceClosure c; @@ -353,34 +324,23 @@ inline void SerializeReducer::Allreduce(DType *sendrecvobj, c.prepare_fun = prepare_fun; c.prepare_arg = prepare_arg; c.p_buffer = &buffer_; // invoke here handle_.Allreduce(BeginPtr(buffer_), max_nbyte, count, - SerializeReduceClosure::Invoke, &c, - _file, _line, _caller); + SerializeReduceClosure::Invoke, &c); for (size_t i = 0; i < count; ++i) { utils::MemoryFixSizeBuffer fs(BeginPtr(buffer_) + i * max_nbyte, max_nbyte); sendrecvobj[i].Load(fs); } } -#if DMLC_USE_CXX11 template // NOLINT(*)g inline void Reducer::Allreduce(DType *sendrecvbuf, size_t count, - std::function prepare_fun, - const char* _file, - const int _line, - const char* _caller) { - this->Allreduce(sendrecvbuf, count, InvokeLambda, &prepare_fun, - _file, _line, _caller); + std::function prepare_fun) { + this->Allreduce(sendrecvbuf, count, InvokeLambda, &prepare_fun); } template inline void SerializeReducer::Allreduce(DType *sendrecvobj, size_t max_nbytes, size_t count, - std::function prepare_fun, - const char* _file, - const int _line, - const char* _caller) { - this->Allreduce(sendrecvobj, max_nbytes, count, InvokeLambda, &prepare_fun, - _file, _line, _caller); + std::function prepare_fun) { + this->Allreduce(sendrecvobj, max_nbytes, count, InvokeLambda, &prepare_fun); } -#endif // DMLC_USE_CXX11 } // namespace rabit #endif // RABIT_INTERNAL_RABIT_INL_H_ diff --git a/rabit/include/rabit/internal/timer.h b/rabit/include/rabit/internal/timer.h deleted file mode 100644 index 95a371027..000000000 --- a/rabit/include/rabit/internal/timer.h +++ /dev/null @@ -1,41 +0,0 @@ -/*! - * Copyright (c) 2014-2019 by Contributors - * \file timer.h - * \brief This file defines the utils for timing - * \author Tianqi Chen, Nacho, Tianyi - */ -#ifndef RABIT_INTERNAL_TIMER_H_ -#define RABIT_INTERNAL_TIMER_H_ -#include -#ifdef __MACH__ -#include -#include -#endif // __MACH__ -#include "./utils.h" - -namespace rabit { -namespace utils { -/*! - * \brief return time in seconds, not cross platform, avoid to use this in most places - */ -inline double GetTime() { -#ifdef __MACH__ - clock_serv_t cclock; - mach_timespec_t mts; - host_get_clock_service(mach_host_self(), CALENDAR_CLOCK, &cclock); - utils::Check(clock_get_time(cclock, &mts) == 0, "failed to get time"); - mach_port_deallocate(mach_task_self(), cclock); - return static_cast(mts.tv_sec) + static_cast(mts.tv_nsec) * 1e-9; -#else -#if defined(__unix__) || defined(__linux__) - timespec ts; - utils::Check(clock_gettime(CLOCK_REALTIME, &ts) == 0, "failed to get time"); - return static_cast(ts.tv_sec) + static_cast(ts.tv_nsec) * 1e-9; -#else - return static_cast(time(NULL)); -#endif // defined(__unix__) || defined(__linux__) -#endif // __MACH__ -} -} // namespace utils -} // namespace rabit -#endif // RABIT_INTERNAL_TIMER_H_ diff --git a/rabit/include/rabit/internal/utils.h b/rabit/include/rabit/internal/utils.h index 4fbb7c369..fd0b040bc 100644 --- a/rabit/include/rabit/internal/utils.h +++ b/rabit/include/rabit/internal/utils.h @@ -69,35 +69,10 @@ inline bool StringToBool(const char* s) { return CompareStringsCaseInsensitive(s, "true") == 0 || atoi(s) != 0; } -/*! - * \brief handling of Assert error, caused by inappropriate input - * \param msg error message - */ -inline void HandleAssertError(const char *msg) { - LOG(FATAL) << msg; -} -/*! - * \brief handling of Check error, caused by inappropriate input - * \param msg error message - */ -inline void HandleCheckError(const char *msg) { - LOG(FATAL) << msg; -} - inline void HandlePrint(const char *msg) { printf("%s", msg); } -inline void HandleLogInfo(const char *fmt, ...) { - std::string msg(kPrintBuffer, '\0'); - va_list args; - va_start(args, fmt); - vsnprintf(&msg[0], kPrintBuffer, fmt, args); - va_end(args); - fprintf(stdout, "%s", msg.c_str()); - fflush(stdout); -} - /*! \brief printf, prints messages to the console */ inline void Printf(const char *fmt, ...) { std::string msg(kPrintBuffer, '\0'); @@ -108,15 +83,6 @@ inline void Printf(const char *fmt, ...) { HandlePrint(msg.c_str()); } -/*! \brief portable version of snprintf */ -inline int SPrintf(char *buf, size_t size, const char *fmt, ...) { - va_list args; - va_start(args, fmt); - int ret = vsnprintf(buf, size, fmt, args); - va_end(args); - return ret; -} - /*! \brief assert a condition is true, use this to handle debug information */ inline void Assert(bool exp, const char *fmt, ...) { if (!exp) { @@ -125,7 +91,7 @@ inline void Assert(bool exp, const char *fmt, ...) { va_start(args, fmt); vsnprintf(&msg[0], kPrintBuffer, fmt, args); va_end(args); - HandleAssertError(msg.c_str()); + LOG(FATAL) << msg; } } @@ -137,7 +103,7 @@ inline void Check(bool exp, const char *fmt, ...) { va_start(args, fmt); vsnprintf(&msg[0], kPrintBuffer, fmt, args); va_end(args); - HandleCheckError(msg.c_str()); + LOG(FATAL) << msg; } } @@ -149,7 +115,7 @@ inline void Error(const char *fmt, ...) { va_start(args, fmt); vsnprintf(&msg[0], kPrintBuffer, fmt, args); va_end(args); - HandleCheckError(msg.c_str()); + LOG(FATAL) << msg; } } } // namespace utils @@ -176,15 +142,6 @@ inline T *BeginPtr(std::vector &vec) { // NOLINT(*) return &vec[0]; } } -/*! \brief get the beginning address of a vector */ -template -inline const T *BeginPtr(const std::vector &vec) { // NOLINT(*) - if (vec.size() == 0) { - return nullptr; - } else { - return &vec[0]; - } -} inline char* BeginPtr(std::string &str) { // NOLINT(*) if (str.length() == 0) return nullptr; return &str[0]; diff --git a/rabit/include/rabit/rabit.h b/rabit/include/rabit/rabit.h index 23c96c47f..8f10cf3f3 100644 --- a/rabit/include/rabit/rabit.h +++ b/rabit/include/rabit/rabit.h @@ -12,36 +12,7 @@ #define RABIT_RABIT_H_ // NOLINT(*) #include #include - -// whether or not use c++11 support -#ifndef DMLC_USE_CXX11 -#if defined(__GXX_EXPERIMENTAL_CXX0X__) || defined(_MSC_VER) -#define DMLC_USE_CXX11 1 -#else -#define DMLC_USE_CXX11 (__cplusplus >= 201103L) -#endif // defined(__GXX_EXPERIMENTAL_CXX0X__) || defined(_MSC_VER) -#endif // DMLC_USE_CXX11 - -// keeps rabit api caller signature -#ifndef RABIT_API_CALLER_SIGNATURE -#define RABIT_API_CALLER_SIGNATURE - -#if (defined(__GNUC__) && !defined(__clang__)) -#define _FILE __builtin_FILE() -#define _LINE __builtin_LINE() -#define _CALLER __builtin_FUNCTION() -#else -#define _FILE "N/A" -#define _LINE -1 -#define _CALLER "N/A" -#endif // (defined(__GNUC__) && !defined(__clang__)) - -#endif // RABIT_API_CALLER_SIGNATURE - -// optionally support of lambda functions in C++11, if available -#if DMLC_USE_CXX11 #include -#endif // C++11 // engine definition of rabit, defines internal implementation // to use rabit interface, there is no need to read engine.h // rabit.h and serializable.h are enough to use the interface @@ -135,31 +106,19 @@ inline void TrackerPrintf(const char *fmt, ...); * \param sendrecv_data the pointer to the send/receive buffer, * \param size the data size * \param root the process root - * \param _file caller file name used to generate unique cache key - * \param _line caller line number used to generate unique cache key - * \param _caller caller function name used to generate unique cache key */ -inline void Broadcast(void *sendrecv_data, size_t size, int root, - const char* _file = _FILE, - const int _line = _LINE, - const char* _caller = _CALLER); +inline void Broadcast(void *sendrecv_data, size_t size, int root); /*! * \brief broadcasts an std::vector to every node from root * \param sendrecv_data the pointer to send/receive vector, * for the receiver, the vector does not need to be pre-allocated * \param root the process root - * \param _file caller file name used to generate unique cache key - * \param _line caller line number used to generate unique cache key - * \param _caller caller function name used to generate unique cache key * \tparam DType the data type stored in the vector, has to be a simple data type * that can be directly transmitted by sending the sizeof(DType) */ template -inline void Broadcast(std::vector *sendrecv_data, int root, - const char* _file = _FILE, - const int _line = _LINE, - const char* _caller = _CALLER); +inline void Broadcast(std::vector *sendrecv_data, int root); /*! * \brief broadcasts a std::string to every node from the root * \param sendrecv_data the pointer to the send/receive buffer, @@ -169,10 +128,7 @@ inline void Broadcast(std::vector *sendrecv_data, int root, * \param _caller caller function name used to generate unique cache key * \param root the process root */ -inline void Broadcast(std::string *sendrecv_data, int root, - const char* _file = _FILE, - const int _line = _LINE, - const char* _caller = _CALLER); +inline void Broadcast(std::string *sendrecv_data, int root); /*! * \brief performs in-place Allreduce on sendrecvbuf * this function is NOT thread-safe @@ -191,19 +147,13 @@ inline void Broadcast(std::string *sendrecv_data, int root, * will be called by the function before performing Allreduce in order to initialize the data in sendrecvbuf. * If the result of Allreduce can be recovered directly, then prepare_func will NOT be called * \param prepare_arg argument used to pass into the lazy preprocessing function - * \param _file caller file name used to generate unique cache key - * \param _line caller line number used to generate unique cache key - * \param _caller caller function name used to generate unique cache key * \tparam OP see namespace op, reduce operator * \tparam DType data type */ template inline void Allreduce(DType *sendrecvbuf, size_t count, void (*prepare_fun)(void *) = nullptr, - void *prepare_arg = nullptr, - const char* _file = _FILE, - const int _line = _LINE, - const char* _caller = _CALLER); + void *prepare_arg = nullptr); /*! * \brief Allgather function, each node have a segment of data in the ring of sendrecvbuf, @@ -217,19 +167,13 @@ inline void Allreduce(DType *sendrecvbuf, size_t count, * \param slice_begin beginning of the current slice * \param slice_end end of the current slice * \param size_prev_slice size of the previous slice i.e. slice of node (rank - 1) % world_size -* \param _file caller file name used to generate unique cache key -* \param _line caller line number used to generate unique cache key -* \param _caller caller function name used to generate unique cache key */ template inline void Allgather(DType *sendrecvbuf_, size_t total_size, size_t slice_begin, size_t slice_end, - size_t size_prev_slice, - const char* _file = _FILE, - const int _line = _LINE, - const char* _caller = _CALLER); + size_t size_prev_slice); // C++11 support for lambda prepare function #if DMLC_USE_CXX11 @@ -254,18 +198,12 @@ inline void Allgather(DType *sendrecvbuf_, * \param prepare_fun Lazy lambda preprocessing function, prepare_fun() will be invoked * by the function before performing Allreduce in order to initialize the data in sendrecvbuf. * If the result of Allreduce can be recovered directly, then prepare_func will NOT be called - * \param _file caller file name used to generate unique cache key - * \param _line caller line number used to generate unique cache key - * \param _caller caller function name used to generate unique cache key * \tparam OP see namespace op, reduce operator * \tparam DType data type */ template inline void Allreduce(DType *sendrecvbuf, size_t count, - std::function prepare_fun, - const char* _file = _FILE, - const int _line = _LINE, - const char* _caller = _CALLER); + std::function prepare_fun); #endif // C++11 /*! * \brief loads the latest check point @@ -361,31 +299,19 @@ class Reducer { * will be called by the function before performing Allreduce, to initialize the data in sendrecvbuf. * If the result of Allreduce can be recovered directly, then prepare_func will NOT be called * \param prepare_arg argument used to pass into the lazy preprocessing function - * \param _file caller file name used to generate unique cache key - * \param _line caller line number used to generate unique cache key - * \param _caller caller function name used to generate unique cache key */ inline void Allreduce(DType *sendrecvbuf, size_t count, void (*prepare_fun)(void *) = nullptr, - void *prepare_arg = nullptr, - const char* _file = _FILE, - const int _line = _LINE, - const char* _caller = _CALLER); + void *prepare_arg = nullptr); #if DMLC_USE_CXX11 /*! * \brief customized in-place all reduce operation, with lambda function as preprocessor * \param sendrecvbuf pointer to the array of objects to be reduced * \param count number of elements to be reduced * \param prepare_fun lambda function executed to prepare the data, if necessary - * \param _file caller file name used to generate unique cache key - * \param _line caller line number used to generate unique cache key - * \param _caller caller function name used to generate unique cache key */ inline void Allreduce(DType *sendrecvbuf, size_t count, - std::function prepare_fun, - const char* _file = _FILE, - const int _line = _LINE, - const char* _caller = _CALLER); + std::function prepare_fun); #endif // DMLC_USE_CXX11 private: @@ -416,17 +342,11 @@ class SerializeReducer { * will be called by the function before performing Allreduce, to initialize the data in sendrecvbuf. * If the result of Allreduce can be recovered directly, then the prepare_func will NOT be called * \param prepare_arg argument used to pass into the lazy preprocessing function - * \param _file caller file name used to generate unique cache key - * \param _line caller line number used to generate unique cache key - * \param _caller caller function name used to generate unique cache key */ inline void Allreduce(DType *sendrecvobj, size_t max_nbyte, size_t count, void (*prepare_fun)(void *) = nullptr, - void *prepare_arg = nullptr, - const char* _file = _FILE, - const int _line = _LINE, - const char* _caller = _CALLER); + void *prepare_arg = nullptr); // C++11 support for lambda prepare function #if DMLC_USE_CXX11 /*! @@ -436,16 +356,10 @@ class SerializeReducer { * this includes budget limit for intermediate and final result * \param count number of elements to be reduced * \param prepare_fun lambda function executed to prepare the data, if necessary - * \param _file caller file name used to generate unique cache key - * \param _line caller line number used to generate unique cache key - * \param _caller caller function name used to generate unique cache key */ inline void Allreduce(DType *sendrecvobj, size_t max_nbyte, size_t count, - std::function prepare_fun, - const char* _file = _FILE, - const int _line = _LINE, - const char* _caller = _CALLER); + std::function prepare_fun); #endif // DMLC_USE_CXX11 private: diff --git a/rabit/lib/README.md b/rabit/lib/README.md deleted file mode 100644 index b6a5aa8b2..000000000 --- a/rabit/lib/README.md +++ /dev/null @@ -1,15 +0,0 @@ -Rabit Library -===== -This folder holds the library file generated by the compiler. To generate the library file, type ```make``` in the project root folder. If you want mpi compatible library, type ```make mpi``` - -***List of Files*** -* rabit.a The rabit package library - - Normally you need to link with this one -* rabit_mock.a The rabit package library with mock test - - This library allows additional mock-test -* rabit_mpi.a The MPI backed library - - Link against this library makes the program use MPI Allreduce - - This library is not fault-tolerant -* rabit_empty.a Dummy package implementation - - This is an empty library that does not provide anything - - Only introduced to minimize code dependency for projects that only need single machine code diff --git a/rabit/python/rabit.py b/rabit/python/rabit.py deleted file mode 100644 index a56cfccfb..000000000 --- a/rabit/python/rabit.py +++ /dev/null @@ -1,364 +0,0 @@ -""" -Reliable Allreduce and Broadcast Library. - -Author: Tianqi Chen -""" -# pylint: disable=unused-argument,invalid-name,global-statement,dangerous-default-value, -import pickle -import ctypes -import os -import platform -import sys -import warnings -import numpy as np - -# version information about the doc -__version__ = '1.0' - -_LIB = None - -def _find_lib_path(dll_name): - """Find the rabit dynamic library files. - - Returns - ------- - lib_path: list(string) - List of all found library path to rabit - """ - curr_path = os.path.dirname(os.path.abspath(os.path.expanduser(__file__))) - # make pythonpack hack: copy this directory one level upper for setup.py - dll_path = [curr_path, - os.path.join(curr_path, '../lib/'), - os.path.join(curr_path, './lib/')] - if os.name == 'nt': - dll_path = [os.path.join(p, dll_name) for p in dll_path] - else: - dll_path = [os.path.join(p, dll_name) for p in dll_path] - lib_path = [p for p in dll_path if os.path.exists(p) and os.path.isfile(p)] - #From github issues, most of installation errors come from machines w/o compilers - if len(lib_path) == 0 and not os.environ.get('XGBOOST_BUILD_DOC', False): - raise RuntimeError( - 'Cannot find Rabit Libarary in the candicate path, ' + - 'did you install compilers and run build.sh in root path?\n' - 'List of candidates:\n' + ('\n'.join(dll_path))) - return lib_path - -# load in xgboost library -def _loadlib(lib='standard', lib_dll=None): - """Load rabit library.""" - global _LIB - if _LIB is not None: - warnings.warn('rabit.int call was ignored because it has'\ - ' already been initialized', level=2) - return - - if lib_dll is not None: - _LIB = lib_dll - return - - if lib == 'standard': - dll_name = 'librabit' - else: - dll_name = 'librabit_' + lib - - if os.name == 'nt': - dll_name += '.dll' - elif platform.system() == 'Darwin': - dll_name += '.dylib' - else: - dll_name += '.so' - - _LIB = ctypes.cdll.LoadLibrary(_find_lib_path(dll_name)[0]) - _LIB.RabitGetRank.restype = ctypes.c_int - _LIB.RabitGetWorldSize.restype = ctypes.c_int - _LIB.RabitVersionNumber.restype = ctypes.c_int - -def _unloadlib(): - """Unload rabit library.""" - global _LIB - del _LIB - _LIB = None - -# reduction operators -MAX = 0 -MIN = 1 -SUM = 2 -BITOR = 3 - -def init(args=None, lib='standard', lib_dll=None): - """Intialize the rabit module, call this once before using anything. - - Parameters - ---------- - args: list of str, optional - The list of arguments used to initialized the rabit - usually you need to pass in sys.argv. - Defaults to sys.argv when it is None. - lib: {'standard', 'mock', 'mpi'}, optional - Type of library we want to load - When cdll is specified - lib_dll: ctypes.DLL, optional - The DLL object used as lib. - When this is presented argument lib will be ignored. - """ - if args is None: - args = [] - _loadlib(lib, lib_dll) - arr = (ctypes.c_char_p * len(args))() - - arr[:] = args - _LIB.RabitInit(len(args), arr) - -def finalize(): - """Finalize the rabit engine. - - Call this function after you finished all jobs. - """ - _LIB.RabitFinalize() - _unloadlib() - -def get_rank(): - """Get rank of current process. - - Returns - ------- - rank : int - Rank of current process. - """ - ret = _LIB.RabitGetRank() - return ret - -def get_world_size(): - """Get total number workers. - - Returns - ------- - n : int - Total number of process. - """ - ret = _LIB.RabitGetWorldSize() - return ret - -def tracker_print(msg): - """Print message to the tracker. - - This function can be used to communicate the information of - the progress to the tracker - - Parameters - ---------- - msg : str - The message to be printed to tracker. - """ - if not isinstance(msg, str): - msg = str(msg) - _LIB.RabitTrackerPrint(ctypes.c_char_p(msg).encode('utf-8')) - -def get_processor_name(): - """Get the processor name. - - Returns - ------- - name : str - the name of processor(host) - """ - mxlen = 256 - length = ctypes.c_ulong() - buf = ctypes.create_string_buffer(mxlen) - _LIB.RabitGetProcessorName(buf, ctypes.byref(length), mxlen) - return buf.value - -def broadcast(data, root): - """Broadcast object from one node to all other nodes. - - Parameters - ---------- - data : any type that can be pickled - Input data, if current rank does not equal root, this can be None - root : int - Rank of the node to broadcast data from. - - Returns - ------- - object : int - the result of broadcast. - """ - rank = get_rank() - length = ctypes.c_ulong() - if root == rank: - assert data is not None, 'need to pass in data when broadcasting' - s = pickle.dumps(data, protocol=pickle.HIGHEST_PROTOCOL) - length.value = len(s) - # run first broadcast - _LIB.RabitBroadcast(ctypes.byref(length), - ctypes.sizeof(ctypes.c_ulong), root) - if root != rank: - dptr = (ctypes.c_char * length.value)() - # run second - _LIB.RabitBroadcast(ctypes.cast(dptr, ctypes.c_void_p), - length.value, root) - data = pickle.loads(dptr.raw) - del dptr - else: - _LIB.RabitBroadcast(ctypes.cast(ctypes.c_char_p(s), ctypes.c_void_p), - length.value, root) - del s - return data - -# enumeration of dtypes -DTYPE_ENUM__ = { - np.dtype('int8') : 0, - np.dtype('uint8') : 1, - np.dtype('int32') : 2, - np.dtype('uint32') : 3, - np.dtype('int64') : 4, - np.dtype('uint64') : 5, - np.dtype('float32') : 6, - np.dtype('float64') : 7 -} - -def allreduce(data, op, prepare_fun=None): - """Perform allreduce, return the result. - - Parameters - ---------- - data: numpy array - Input data. - op: int - Reduction operators, can be MIN, MAX, SUM, BITOR - prepare_fun: function - Lazy preprocessing function, if it is not None, prepare_fun(data) - will be called by the function before performing allreduce, to intialize the data - If the result of Allreduce can be recovered directly, - then prepare_fun will NOT be called - - Returns - ------- - result : array_like - The result of allreduce, have same shape as data - - Notes - ----- - This function is not thread-safe. - """ - if not isinstance(data, np.ndarray): - raise Exception('allreduce only takes in numpy.ndarray') - buf = data.ravel() - if buf.base is data.base: - buf = buf.copy() - if buf.dtype not in DTYPE_ENUM__: - raise Exception('data type %s not supported' % str(buf.dtype)) - if prepare_fun is None: - _LIB.RabitAllreduce(buf.ctypes.data_as(ctypes.c_void_p), - buf.size, DTYPE_ENUM__[buf.dtype], - op, None, None) - else: - func_ptr = ctypes.CFUNCTYPE(None, ctypes.c_void_p) - def pfunc(args): - """prepare function.""" - prepare_fun(data) - _LIB.RabitAllreduce(buf.ctypes.data_as(ctypes.c_void_p), - buf.size, DTYPE_ENUM__[buf.dtype], - op, func_ptr(pfunc), None) - return buf - - -def _load_model(ptr, length): - """ - Internal function used by the module, - unpickle a model from a buffer specified by ptr, length - Arguments: - ptr: ctypes.POINTER(ctypes._char) - pointer to the memory region of buffer - length: int - the length of buffer - """ - data = (ctypes.c_char * length).from_address(ctypes.addressof(ptr.contents)) - return pickle.loads(data.raw) - -def load_checkpoint(with_local=False): - """Load latest check point. - - Parameters - ---------- - with_local: bool, optional - whether the checkpoint contains local model - - Returns - ------- - tuple : tuple - if with_local: return (version, gobal_model, local_model) - else return (version, gobal_model) - if returned version == 0, this means no model has been CheckPointed - and global_model, local_model returned will be None - """ - gptr = ctypes.POINTER(ctypes.c_char)() - global_len = ctypes.c_ulong() - if with_local: - lptr = ctypes.POINTER(ctypes.c_char)() - local_len = ctypes.c_ulong() - version = _LIB.RabitLoadCheckPoint( - ctypes.byref(gptr), - ctypes.byref(global_len), - ctypes.byref(lptr), - ctypes.byref(local_len)) - if version == 0: - return (version, None, None) - return (version, - _load_model(gptr, global_len.value), - _load_model(lptr, local_len.value)) - else: - version = _LIB.RabitLoadCheckPoint( - ctypes.byref(gptr), - ctypes.byref(global_len), - None, None) - if version == 0: - return (version, None) - return (version, - _load_model(gptr, global_len.value)) - -def checkpoint(global_model, local_model=None): - """Checkpoint the model. - - This means we finished a stage of execution. - Every time we call check point, there is a version number which will increase by one. - - Parameters - ---------- - global_model: anytype that can be pickled - globally shared model/state when calling this function, - the caller need to gauranttees that global_model is the same in all nodes - - local_model: anytype that can be pickled - Local model, that is specific to current node/rank. - This can be None when no local state is needed. - - Notes - ----- - local_model requires explicit replication of the model for fault-tolerance. - This will bring replication cost in checkpoint function. - while global_model do not need explicit replication. - It is recommended to use global_model if possible. - """ - sglobal = pickle.dumps(global_model) - if local_model is None: - _LIB.RabitCheckPoint(sglobal, len(sglobal), None, 0) - del sglobal - else: - slocal = pickle.dumps(local_model) - _LIB.RabitCheckPoint(sglobal, len(sglobal), slocal, len(slocal)) - del slocal - del sglobal - -def version_number(): - """Returns version number of current stored model. - - This means how many calls to CheckPoint we made so far. - - Returns - ------- - version : int - Version number of currently stored model - """ - ret = _LIB.RabitVersionNumber() - return ret diff --git a/rabit/scripts/mpi_build.sh b/rabit/scripts/mpi_build.sh deleted file mode 100755 index b1e70be97..000000000 --- a/rabit/scripts/mpi_build.sh +++ /dev/null @@ -1,27 +0,0 @@ -#!/usr/bin/env bash - -if [ -f mpich/lib/libmpich.so ]; then - echo "libmpich.so found -- nothing to build." -else - echo "Downloading mpich source." - wget http://www.mpich.org/static/downloads/3.2/mpich-3.2.tar.gz - tar xfz mpich-3.2.tar.gz - rm mpich-3.2.tar.gz* - echo "configuring and building mpich." - cd mpich-3.2 - #CC=gcc CXX=g++ CFLAGS=-m64 CXXFLAGS=-m64 FFLAGS=-m64 - ./configure \ - --prefix=`pwd`/../mpich \ - --enable-static=false \ - --enable-alloca=true \ - --disable-long-double \ - --enable-threads=single \ - --enable-fortran=no \ - --enable-fast=all \ - --enable-g=none \ - --enable-timing=none \ - --enable-cxx - make -j4 - make install - cd - -fi \ No newline at end of file diff --git a/rabit/scripts/travis_osx_install.sh b/rabit/scripts/travis_osx_install.sh deleted file mode 100755 index cae633d5c..000000000 --- a/rabit/scripts/travis_osx_install.sh +++ /dev/null @@ -1,7 +0,0 @@ -#!/bin/bash -set -e -set -x - -if [ ${TRAVIS_OS_NAME} != "osx" ]; then - exit 0 -fi diff --git a/rabit/scripts/travis_runtest.sh b/rabit/scripts/travis_runtest.sh deleted file mode 100755 index f3d63859e..000000000 --- a/rabit/scripts/travis_runtest.sh +++ /dev/null @@ -1,14 +0,0 @@ -#!/bin/bash - -conda activate python3 -conda --version -python --version - -make -f test.mk RABIT_BUILD_DMLC=1 model_recover_10_10k || exit -1 -make -f test.mk RABIT_BUILD_DMLC=1 model_recover_10_10k_die_same || exit -1 -make -f test.mk RABIT_BUILD_DMLC=1 model_recover_10_10k_die_hard || exit -1 -make -f test.mk RABIT_BUILD_DMLC=1 local_recover_10_10k || exit -1 -make -f test.mk RABIT_BUILD_DMLC=1 lazy_recover_10_10k_die_hard || exit -1 -make -f test.mk RABIT_BUILD_DMLC=1 lazy_recover_10_10k_die_same || exit -1 -make -f test.mk RABIT_BUILD_DMLC=1 ringallreduce_10_10k || exit -1 -make -f test.mk RABIT_BUILD_DMLC=1 pylocal_recover_10_10k || exit -1 diff --git a/rabit/scripts/travis_script.sh b/rabit/scripts/travis_script.sh deleted file mode 100755 index 19128224e..000000000 --- a/rabit/scripts/travis_script.sh +++ /dev/null @@ -1,36 +0,0 @@ -#!/bin/bash - -# main script of travis -if [ ${TASK} == "lint" ]; then - make lint RABIT_BUILD_DMLC=1 || exit -1 -fi - -if [ ${TASK} == "doc" ]; then - make doc 2>log.txt - (cat log.txt| grep -v ENABLE_PREPROCESSING |grep -v "unsupported tag" |grep warning) && exit -1 -fi - -# we should depreciate Makefile based build -if [ ${TASK} == "build" ]; then - make all RABIT_BUILD_DMLC=1 || exit -1 -fi - -if [ ${TASK} == "mpi-build" ]; then - ./scripts/mpi_build.sh - cd test - make mpi RABIT_BUILD_DMLC=1 && make speed_test.mpi RABIT_BUILD_DMLC=1 || exit -1 -fi -# -if [ ${TASK} == "cmake-test" ]; then - mkdir build - cd build - cmake -DRABIT_BUILD_TESTS=ON -DRABIT_BUILD_DMLC=ON -DGTEST_ROOT=${HOME}/.local .. - # known osx gtest 1.8 issue - cp ${HOME}/.local/lib/*.dylib . - make -j$(nproc) - make test - make install || exit -1 - cd ../test - ../scripts/travis_runtest.sh || exit -1 - rm -rf ../build -fi diff --git a/rabit/scripts/travis_setup.sh b/rabit/scripts/travis_setup.sh deleted file mode 100755 index f03f400c2..000000000 --- a/rabit/scripts/travis_setup.sh +++ /dev/null @@ -1,50 +0,0 @@ -#!/bin/bash - -echo "Testing on: ${TRAVIS_OS_NAME}, Home directory: ${HOME}" - -# Install Miniconda -if [ ${TRAVIS_OS_NAME} == "osx" ]; then - wget -O conda.sh https://repo.anaconda.com/miniconda/Miniconda3-latest-MacOSX-x86_64.sh -else - wget -O conda.sh https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh -fi -bash conda.sh -b -p $HOME/miniconda -source $HOME/miniconda/bin/activate -conda config --set always_yes yes --set changeps1 no -conda update -q conda -conda info -a -conda create -n python3 python=3.7 -conda activate python3 -conda --version -python --version -# Install Python packages -conda install -c conda-forge numpy scipy urllib3 websocket-client -python -m pip install cpplint pylint kubernetes - -# Install googletest under home directory -GTEST_VERSION=1.8.1 -GTEST_RELEASE=release-${GTEST_VERSION}.tar.gz -GTEST_TAR_BALL=googletest_${GTEST_RELEASE} - -wget https://github.com/google/googletest/archive/${GTEST_RELEASE} -O ${GTEST_TAR_BALL} -echo "152b849610d91a9dfa1401293f43230c2e0c33f8 ${GTEST_TAR_BALL}" | sha1sum -c -tar -xf ${GTEST_TAR_BALL} -pushd . - -cd googletest-release-${GTEST_VERSION} -mkdir build -cd build -echo "Installing to ${HOME}/.local" -cmake .. -DBUILD_SHARED_LIBS=ON -DCMAKE_INSTALL_PREFIX=${HOME}/.local -make -j$(nproc) -make install - -popd - -if [ ${TRAVIS_OS_NAME} == "linux" ]; then - sudo apt-get install tree -fi - -if [ ${TRAVIS_OS_NAME} == "osx" ]; then - brew install python3 -fi diff --git a/rabit/scripts/travis_setup_env.sh b/rabit/scripts/travis_setup_env.sh deleted file mode 100755 index 7f4af313e..000000000 --- a/rabit/scripts/travis_setup_env.sh +++ /dev/null @@ -1,40 +0,0 @@ -# script to be sourced in travis yml -# setup all enviroment variables - -export CACHE_PREFIX=${HOME}/.cache/usr -export PATH=${HOME}/.local/bin:${PATH} -export PATH=${PATH}:${CACHE_PREFIX}/bin -export CPLUS_INCLUDE_PATH=${CPLUS_INCLUDE_PATH}:${CACHE_PREFIX}/include -export C_INCLUDE_PATH=${C_INCLUDE_PATH}:${CACHE_PREFIX}/include -export LIBRARY_PATH=${LIBRARY_PATH}:${CACHE_PREFIX}/lib -export LD_LIBRARY_PATH=${LD_LIBRARY_PATH}:${CACHE_PREFIX}/lib -export DYLD_LIBRARY_PATH=${DYLD_LIBRARY_PATH}:${CACHE_PREFIX}/lib - -alias make="make -j4" - -# setup the cache prefix folder -if [ ! -d ${HOME}/.cache ]; then - mkdir ${HOME}/.cache -fi - -if [ ! -d ${CACHE_PREFIX} ]; then - mkdir ${CACHE_PREFIX} -fi -if [ ! -d ${CACHE_PREFIX}/include ]; then - mkdir ${CACHE_PREFIX}/include -fi -if [ ! -d ${CACHE_PREFIX}/lib ]; then - mkdir ${CACHE_PREFIX}/lib -fi -if [ ! -d ${CACHE_PREFIX}/bin ]; then - mkdir ${CACHE_PREFIX}/bin -fi - -# setup CUDA path if NVCC_PREFIX exists -if [ ! -z "$NVCC_PREFIX" ]; then - export PATH=${PATH}:${NVCC_PREFIX}/usr/local/cuda-7.5/bin - export CPLUS_INCLUDE_PATH=${CPLUS_INCLUDE_PATH}:${NVCC_PREFIX}/usr/local/cuda-7.5/include - export C_INCLUDE_PATH=${C_INCLUDE_PATH}:${NVCC_PREFIX}/usr/local/cuda-7.5/include - export LIBRARY_PATH=${LIBRARY_PATH}:${NVCC_PREFIX}/usr/local/cuda-7.5/lib64:${NVCC_PREFIX}/usr/lib/x86_64-linux-gnu - export LD_LIBRARY_PATH=${LD_LIBRARY_PATH}:${NVCC_PREFIX}/usr/local/cuda-7.5/lib64:${NVCC_PREFIX}/usr/lib/x86_64-linux-gnu -fi diff --git a/rabit/src/allreduce_base.cc b/rabit/src/allreduce_base.cc index 021a7eac3..3cfd3008f 100644 --- a/rabit/src/allreduce_base.cc +++ b/rabit/src/allreduce_base.cc @@ -275,7 +275,7 @@ bool AllreduceBase::ReConnectLinks(const char *cmd) { } try { utils::TCPSocket tracker = this->ConnectTracker(); - fprintf(stdout, "task %s connected to the tracker\n", task_id.c_str()); + LOG(INFO) << "task " << task_id << " connected to the tracker"; tracker.SendStr(std::string(cmd)); // the rank of previous link, next link in ring diff --git a/rabit/src/allreduce_base.h b/rabit/src/allreduce_base.h index 90c55e3de..8f1b30490 100644 --- a/rabit/src/allreduce_base.h +++ b/rabit/src/allreduce_base.h @@ -98,14 +98,9 @@ class AllreduceBase : public IEngine { * \param slice_begin beginning of the current slice * \param slice_end end of the current slice * \param size_prev_slice size of the previous slice i.e. slice of node (rank - 1) % world_size - * \param _file caller file name used to generate unique cache key - * \param _line caller line number used to generate unique cache key - * \param _caller caller function name used to generate unique cache key */ void Allgather(void *sendrecvbuf_, size_t total_size, size_t slice_begin, - size_t slice_end, size_t size_prev_slice, - const char *_file = _FILE, const int _line = _LINE, - const char *_caller = _CALLER) override { + size_t slice_end, size_t size_prev_slice) override { if (world_size == 1 || world_size == -1) { return; } @@ -124,15 +119,10 @@ class AllreduceBase : public IEngine { * will be called by the function before performing Allreduce, to intialize the data in sendrecvbuf_. * If the result of Allreduce can be recovered directly, then prepare_func will NOT be called * \param prepare_arg argument used to passed into the lazy preprocessing function - * \param _file caller file name used to generate unique cache key - * \param _line caller line number used to generate unique cache key - * \param _caller caller function name used to generate unique cache key */ void Allreduce(void *sendrecvbuf_, size_t type_nbytes, size_t count, ReduceFunction reducer, PreprocFunction prepare_fun = nullptr, - void *prepare_arg = nullptr, const char *_file = _FILE, - const int _line = _LINE, - const char *_caller = _CALLER) override { + void *prepare_arg = nullptr) override { if (prepare_fun != nullptr) prepare_fun(prepare_arg); if (world_size == 1 || world_size == -1) return; utils::Assert(TryAllreduce(sendrecvbuf_, type_nbytes, count, reducer) == @@ -148,9 +138,7 @@ class AllreduceBase : public IEngine { * \param _line caller line number used to generate unique cache key * \param _caller caller function name used to generate unique cache key */ - void Broadcast(void *sendrecvbuf_, size_t total_size, int root, - const char *_file = _FILE, const int _line = _LINE, - const char *_caller = _CALLER) override { + void Broadcast(void *sendrecvbuf_, size_t total_size, int root) override { if (world_size == 1 || world_size == -1) return; utils::Assert(TryBroadcast(sendrecvbuf_, total_size, root) == kSuccess, "Broadcast failed"); @@ -232,14 +220,6 @@ class AllreduceBase : public IEngine { int VersionNumber() const override { return version_number; } - /*! - * \brief explicitly re-init everything before calling LoadCheckPoint - * call this function when IEngine throw an exception out, - * this function is only used for test purpose - */ - void InitAfterException() override { - utils::Error("InitAfterException: not implemented"); - } /*! * \brief report current status to the job tracker * depending on the job tracker we are in diff --git a/rabit/src/allreduce_mock.h b/rabit/src/allreduce_mock.h index a0e725f8e..a1ef01513 100644 --- a/rabit/src/allreduce_mock.h +++ b/rabit/src/allreduce_mock.h @@ -11,8 +11,8 @@ #include #include #include +#include #include "rabit/internal/engine.h" -#include "rabit/internal/timer.h" #include "allreduce_base.h" namespace rabit { @@ -46,36 +46,30 @@ class AllreduceMock : public AllreduceBase { } void Allreduce(void *sendrecvbuf_, size_t type_nbytes, size_t count, ReduceFunction reducer, PreprocFunction prepare_fun, - void *prepare_arg, const char *_file = _FILE, - const int _line = _LINE, - const char *_caller = _CALLER) override { + void *prepare_arg) override { this->Verify(MockKey(rank, version_number, seq_counter, num_trial_), "AllReduce"); - double tstart = utils::GetTime(); + double tstart = dmlc::GetTime(); AllreduceBase::Allreduce(sendrecvbuf_, type_nbytes, count, reducer, - prepare_fun, prepare_arg, _file, _line, _caller); - tsum_allreduce_ += utils::GetTime() - tstart; + prepare_fun, prepare_arg); + tsum_allreduce_ += dmlc::GetTime() - tstart; } void Allgather(void *sendrecvbuf, size_t total_size, size_t slice_begin, - size_t slice_end, size_t size_prev_slice, - const char *_file = _FILE, const int _line = _LINE, - const char *_caller = _CALLER) override { + size_t slice_end, size_t size_prev_slice) override { this->Verify(MockKey(rank, version_number, seq_counter, num_trial_), "Allgather"); - double tstart = utils::GetTime(); + double tstart = dmlc::GetTime(); AllreduceBase::Allgather(sendrecvbuf, total_size, slice_begin, slice_end, - size_prev_slice, _file, _line, _caller); - tsum_allgather_ += utils::GetTime() - tstart; + size_prev_slice); + tsum_allgather_ += dmlc::GetTime() - tstart; } - void Broadcast(void *sendrecvbuf_, size_t total_size, int root, - const char *_file = _FILE, const int _line = _LINE, - const char *_caller = _CALLER) override { + void Broadcast(void *sendrecvbuf_, size_t total_size, int root) override { this->Verify(MockKey(rank, version_number, seq_counter, num_trial_), "Broadcast"); - AllreduceBase::Broadcast(sendrecvbuf_, total_size, root, _file, _line, _caller); + AllreduceBase::Broadcast(sendrecvbuf_, total_size, root); } int LoadCheckPoint(Serializable *global_model, Serializable *local_model) override { tsum_allreduce_ = 0.0; tsum_allgather_ = 0.0; - time_checkpoint_ = utils::GetTime(); + time_checkpoint_ = dmlc::GetTime(); if (force_local_ == 0) { return AllreduceBase::LoadCheckPoint(global_model, local_model); } else { @@ -87,7 +81,7 @@ class AllreduceMock : public AllreduceBase { void CheckPoint(const Serializable *global_model, const Serializable *local_model) override { this->Verify(MockKey(rank, version_number, seq_counter, num_trial_), "CheckPoint"); - double tstart = utils::GetTime(); + double tstart = dmlc::GetTime(); double tbet_chkpt = tstart - time_checkpoint_; if (force_local_ == 0) { AllreduceBase::CheckPoint(global_model, local_model); @@ -96,8 +90,8 @@ class AllreduceMock : public AllreduceBase { ComboSerializer com(global_model, local_model); AllreduceBase::CheckPoint(&dum, &com); } - time_checkpoint_ = utils::GetTime(); - double tcost = utils::GetTime() - tstart; + time_checkpoint_ = dmlc::GetTime(); + double tcost = dmlc::GetTime() - tstart; if (report_stats_ != 0 && rank == 0) { std::stringstream ss; ss << "[v" << version_number << "] global_size=" diff --git a/rabit/src/c_api.cc b/rabit/src/c_api.cc index f59722f2c..071da2944 100644 --- a/rabit/src/c_api.cc +++ b/rabit/src/c_api.cc @@ -228,12 +228,12 @@ RABIT_DLL bool RabitInit(int argc, char *argv[]) { return ret; } -RABIT_DLL bool RabitFinalize() { +RABIT_DLL int RabitFinalize() { auto ret = rabit::Finalize(); if (!ret) { XGBAPISetLastError("Failed to shutdown RABIT worker."); } - return ret; + return static_cast(ret); } RABIT_DLL int RabitGetRingPrevRank() { @@ -270,37 +270,39 @@ RABIT_DLL void RabitGetProcessorName(char *out_name, *out_len = static_cast(s.length()); } -RABIT_DLL void RabitBroadcast(void *sendrecv_data, +RABIT_DLL int RabitBroadcast(void *sendrecv_data, rbt_ulong size, int root) { + API_BEGIN() rabit::Broadcast(sendrecv_data, size, root); + API_END() } -RABIT_DLL void RabitAllgather(void *sendrecvbuf_, size_t total_size, +RABIT_DLL int RabitAllgather(void *sendrecvbuf_, size_t total_size, size_t beginIndex, size_t size_node_slice, size_t size_prev_slice, int enum_dtype) { - rabit::c_api::Allgather(sendrecvbuf_, - total_size, - beginIndex, - size_node_slice, - size_prev_slice, - static_cast(enum_dtype)); + API_BEGIN() + rabit::c_api::Allgather( + sendrecvbuf_, total_size, beginIndex, size_node_slice, size_prev_slice, + static_cast(enum_dtype)); + API_END() } -RABIT_DLL void RabitAllreduce(void *sendrecvbuf, size_t count, int enum_dtype, +RABIT_DLL int RabitAllreduce(void *sendrecvbuf, size_t count, int enum_dtype, int enum_op, void (*prepare_fun)(void *arg), void *prepare_arg) { - rabit::c_api::Allreduce - (sendrecvbuf, count, - static_cast(enum_dtype), - static_cast(enum_op), - prepare_fun, prepare_arg); + API_BEGIN() + rabit::c_api::Allreduce(sendrecvbuf, count, + static_cast(enum_dtype), + static_cast(enum_op), + prepare_fun, prepare_arg); + API_END() } RABIT_DLL int RabitLoadCheckPoint(char **out_global_model, rbt_ulong *out_global_len, char **out_local_model, rbt_ulong *out_local_len) { - // NOTE: this function is not thread-safe + // no-op as XGBoost 1.3 using rabit::BeginPtr; using namespace rabit::c_api; // NOLINT(*) static std::string global_buffer; diff --git a/rabit/src/engine.cc b/rabit/src/engine.cc index 616004765..66302530a 100644 --- a/rabit/src/engine.cc +++ b/rabit/src/engine.cc @@ -85,12 +85,9 @@ IEngine *GetEngine() { void Allgather(void *sendrecvbuf_, size_t total_size, size_t slice_begin, size_t slice_end, - size_t size_prev_slice, - const char* _file, - const int _line, - const char* _caller) { + size_t size_prev_slice) { GetEngine()->Allgather(sendrecvbuf_, total_size, slice_begin, - slice_end, size_prev_slice, _file, _line, _caller); + slice_end, size_prev_slice); } @@ -102,12 +99,9 @@ void Allreduce_(void *sendrecvbuf, // NOLINT mpi::DataType dtype, mpi::OpType op, IEngine::PreprocFunction prepare_fun, - void *prepare_arg, - const char* _file, - const int _line, - const char* _caller) { + void *prepare_arg) { GetEngine()->Allreduce(sendrecvbuf, type_nbytes, count, red, prepare_fun, - prepare_arg, _file, _line, _caller); + prepare_arg); } // code for reduce handle @@ -126,14 +120,10 @@ void ReduceHandle::Init(IEngine::ReduceFunction redfunc, size_t type_nbytes) { void ReduceHandle::Allreduce(void *sendrecvbuf, size_t type_nbytes, size_t count, IEngine::PreprocFunction prepare_fun, - void *prepare_arg, - const char* _file, - const int _line, - const char* _caller) { + void *prepare_arg) { utils::Assert(redfunc_ != nullptr, "must intialize handle to call AllReduce"); GetEngine()->Allreduce(sendrecvbuf, type_nbytes, count, - redfunc_, prepare_fun, prepare_arg, - _file, _line, _caller); + redfunc_, prepare_fun, prepare_arg); } } // namespace engine } // namespace rabit diff --git a/rabit/test/.gitignore b/rabit/test/.gitignore deleted file mode 100644 index 8e1ff376e..000000000 --- a/rabit/test/.gitignore +++ /dev/null @@ -1,3 +0,0 @@ -*.mpi -*_test -*_recover diff --git a/rabit/test/Makefile b/rabit/test/Makefile deleted file mode 100644 index 1a8e20717..000000000 --- a/rabit/test/Makefile +++ /dev/null @@ -1,77 +0,0 @@ -RABIT_BUILD_DMLC = 0 - -ifeq ($(RABIT_BUILD_DMLC),1) - DMLC=../dmlc-core -else - DMLC=../../dmlc-core -endif - -MPICXX=../mpich/bin/mpicxx -export LDFLAGS= -L../lib -pthread -lm -export CFLAGS = -Wall -O3 -Wno-unknown-pragmas - -export CC = gcc -export CXX = g++ - - -#---------------------------- -# Settings for power and arm arch -#---------------------------- -ARCH := $(shell uname -a) -ifneq (,$(filter $(ARCH), armv6l armv7l powerpc64le ppc64le aarch64)) - CFLAGS += -march=native -else - CFLAGS += -msse2 -endif - -ifndef WITH_FPIC - WITH_FPIC = 1 -endif -ifeq ($(WITH_FPIC), 1) - CFLAGS += -fPIC -endif - -CFLAGS += -I../include -I $(DMLC)/include -std=c++11 - -# specify tensor path -BIN = speed_test model_recover local_recover lazy_recover -OBJ = $(RABIT_OBJ) speed_test.o model_recover.o local_recover.o lazy_recover.o -MPIBIN = speed_test.mpi -.PHONY: clean all lib mpi - -.PHONY: lib all - -all: $(BIN) - -lib: - cd ..;make clean;make;cd - - -.PHONY: mpi -mpi: - cd ..;make mpi;cd - - -# programs -speed_test.o: speed_test.cc ../include/rabit/*.h lib mpi -model_recover.o: model_recover.cc ../include/rabit/*.h lib -local_recover.o: local_recover.cc ../include/rabit/*.h lib -lazy_recover.o: lazy_recover.cc ../include/rabit/*.h lib - -# we can link against MPI version to get use MPI -speed_test: speed_test.o $(RABIT_OBJ) -speed_test.mpi: speed_test.o $(MPIOBJ) -model_recover: model_recover.o $(RABIT_OBJ) -local_recover: local_recover.o $(RABIT_OBJ) -lazy_recover: lazy_recover.o $(RABIT_OBJ) - -$(BIN) : - $(CXX) $(CFLAGS) -o $@ $(filter %.cpp %.o %.c %.cc, $^) ../lib/librabit_mock.a $(LDFLAGS) - -$(OBJ) : - $(CXX) -c $(CFLAGS) -o $@ $(firstword $(filter %.cpp %.c %.cc, $^) ) - -$(MPIBIN) : - $(MPICXX) $(CFLAGS) -I../mpich/include -shared -o $@ $(filter %.cpp %.o %.c %.cc, $^) \ - ../lib/librabit_mpi.so $(LDFLAGS) -L../mpich/lib -Wl,-rpath,../mpich/lib -lmpi - -clean: - $(RM) $(OBJ) $(BIN) $(MPIBIN) $(MPIOBJ) *~ ../src/*~ diff --git a/rabit/test/README.md b/rabit/test/README.md deleted file mode 100644 index fb68112bf..000000000 --- a/rabit/test/README.md +++ /dev/null @@ -1,18 +0,0 @@ -Testcases of Rabit -==== -This folder contains internal testcases to test correctness and efficiency of rabit API - -The example running scripts for testcases are given by test.mk -* type ```make -f test.mk testcasename``` to run certain testcase - - -Helper Scripts -==== -* test.mk contains Makefile documentation of all testcases -* keepalive.sh helper bash to restart a program when it dies abnormally - -List of Programs -==== -* speed_test: test the running speed of rabit API -* test_local_recover: test recovery of local state when error happens -* test_model_recover: test recovery of global state when error happens diff --git a/rabit/test/lazy_recover.cc b/rabit/test/lazy_recover.cc deleted file mode 100644 index 180e2e4b5..000000000 --- a/rabit/test/lazy_recover.cc +++ /dev/null @@ -1,125 +0,0 @@ -// this is a test case to test whether rabit can recover model when -// facing an exception -#include -#include -#include -#include -using namespace rabit; - -// dummy model -class Model : public rabit::Serializable { - public: - // iterations - std::vector data; - // load from stream - virtual void Load(rabit::Stream *fi) { - fi->Read(&data); - } - /*! \brief save the model to the stream */ - virtual void Save(rabit::Stream *fo) const { - fo->Write(data); - } - virtual void InitModel(size_t n) { - data.clear(); - data.resize(n, 1.0f); - } -}; - -inline void TestMax(Model *model, int ntrial, int iter) { - int rank = rabit::GetRank(); - int nproc = rabit::GetWorldSize(); - const int z = iter + 111; - - std::vector ndata(model->data.size()); - for (size_t i = 0; i < ndata.size(); ++i) { - ndata[i] = (i * (rank+1)) % z + model->data[i]; - } - rabit::Allreduce(&ndata[0], ndata.size()); - - for (size_t i = 0; i < ndata.size(); ++i) { - float rmax = (i * 1) % z + model->data[i]; - for (int r = 0; r < nproc; ++r) { - rmax = std::max(rmax, (float)((i * (r+1)) % z) + model->data[i]); - } - utils::Check(rmax == ndata[i], "[%d] TestMax check failurem i=%lu, rmax=%f, ndata=%f", rank, i, rmax, ndata[i]); - } -} - -inline void TestSum(Model *model, int ntrial, int iter) { - int rank = rabit::GetRank(); - int nproc = rabit::GetWorldSize(); - const int z = 131 + iter; - - std::vector ndata(model->data.size()); - for (size_t i = 0; i < ndata.size(); ++i) { - ndata[i] = (i * (rank+1)) % z + model->data[i]; - } - Allreduce(&ndata[0], ndata.size()); - - for (size_t i = 0; i < ndata.size(); ++i) { - float rsum = model->data[i] * nproc; - for (int r = 0; r < nproc; ++r) { - rsum += (float)((i * (r+1)) % z); - } - utils::Check(fabsf(rsum - ndata[i]) < 1e-5 , - "[%d] TestSum check failure, local=%g, allreduce=%g", rank, rsum, ndata[i]); - } - model->data = ndata; -} - -inline void TestBcast(size_t n, int root, int ntrial, int iter) { - int rank = rabit::GetRank(); - std::string s; s.resize(n); - for (size_t i = 0; i < n; ++i) { - s[i] = char(i % 126 + 1); - } - std::string res; - if (root == rank) { - res = s; - rabit::Broadcast(&res, root); - } else { - rabit::Broadcast(&res, root); - } - utils::Check(res == s, "[%d] TestBcast fail", rank); -} - -int main(int argc, char *argv[]) { - if (argc < 3) { - printf("Usage: \n"); - return 0; - } - int n = atoi(argv[1]); - rabit::Init(argc, argv); - int rank = rabit::GetRank(); - int nproc = rabit::GetWorldSize(); - std::string name = rabit::GetProcessorName(); - Model model; - srand(0); - int ntrial = 0; - for (int i = 1; i < argc; ++i) { - int n; - if (sscanf(argv[i], "rabit_num_trial=%d", &n) == 1) ntrial = n; - } - int iter = rabit::LoadCheckPoint(&model); - if (iter == 0) { - model.InitModel(n); - printf("[%d] reload-trail=%d, init iter=%d\n", rank, ntrial, iter); - } else { - printf("[%d] reload-trail=%d, init iter=%d\n", rank, ntrial, iter); - } - for (int r = iter; r < 3; ++r) { - TestMax(&model, ntrial, r); - printf("[%d] !!!TestMax pass, iter=%d\n", rank, r); - int step = std::max(nproc / 3, 1); - for (int i = 0; i < nproc; i += step) { - TestBcast(n, i, ntrial, r); - } - printf("[%d] !!!TestBcast pass, iter=%d\n", rank, r); - TestSum(&model, ntrial, r); - printf("[%d] !!!TestSum pass, iter=%d\n", rank, r); - rabit::LazyCheckPoint(&model); - printf("[%d] !!!CheckPoint pass, iter=%d\n", rank, r); - } - rabit::Finalize(); - return 0; -} diff --git a/rabit/test/local_recover.cc b/rabit/test/local_recover.cc deleted file mode 100644 index 1f0b28b3b..000000000 --- a/rabit/test/local_recover.cc +++ /dev/null @@ -1,137 +0,0 @@ -// this is a test case to test whether rabit can recover model when -// facing an exception -#include -#include -#include -#include - -using namespace rabit; - -// dummy model -class Model : public rabit::Serializable { - public: - // iterations - std::vector data; - // load from stream - virtual void Load(rabit::Stream *fi) { - fi->Read(&data); - } - /*! \brief save the model to the stream */ - virtual void Save(rabit::Stream *fo) const { - fo->Write(data); - } - virtual void InitModel(size_t n, float v) { - data.clear(); - data.resize(n, v); - } -}; - -inline void TestMax(Model *model, Model *local, int ntrial, int iter) { - int rank = rabit::GetRank(); - int nproc = rabit::GetWorldSize(); - const int z = iter + 111; - std::vector ndata(model->data.size()); - rabit::Allreduce(&ndata[0], ndata.size(), - [&]() { - // use lambda expression to prepare the data - for (size_t i = 0; i < ndata.size(); ++i) { - ndata[i] = (i * (rank+1)) % z + local->data[i]; - } - }); - - for (size_t i = 0; i < ndata.size(); ++i) { - float rmax = (i * 1) % z + model->data[i]; - for (int r = 0; r < nproc; ++r) { - rmax = std::max(rmax, (float)((i * (r+1)) % z) + model->data[i] + r); - } - utils::Check(rmax == ndata[i], "[%d] TestMax check failure", rank); - } - model->data = ndata; - local->data = ndata; - for (size_t i = 0; i < ndata.size(); ++i) { - local->data[i] = ndata[i] + rank; - } -} - -inline void TestSum(Model *model, Model *local, int ntrial, int iter) { - int rank = rabit::GetRank(); - int nproc = rabit::GetWorldSize(); - const int z = 131 + iter; - - std::vector ndata(model->data.size()); - for (size_t i = 0; i < ndata.size(); ++i) { - ndata[i] = (i * (rank+1)) % z + local->data[i]; - } - Allreduce(&ndata[0], ndata.size()); - - for (size_t i = 0; i < ndata.size(); ++i) { - float rsum = 0.0f; - for (int r = 0; r < nproc; ++r) { - rsum += (float)((i * (r+1)) % z) + model->data[i] + r; - } - utils::Check(fabsf(rsum - ndata[i]) < 1e-5 , - "[%d] TestSum check failure, local=%g, allreduce=%g", rank, rsum, ndata[i]); - } - model->data = ndata; - for (size_t i = 0; i < ndata.size(); ++i) { - local->data[i] = ndata[i] + rank; - } -} - -inline void TestBcast(size_t n, int root, int ntrial, int iter) { - int rank = rabit::GetRank(); - std::string s; s.resize(n); - for (size_t i = 0; i < n; ++i) { - s[i] = char(i % 126 + 1); - } - std::string res; - if (root == rank) { - res = s; - rabit::Broadcast(&res, root); - } else { - rabit::Broadcast(&res, root); - } - utils::Check(res == s, "[%d] TestBcast fail", rank); -} - -int main(int argc, char *argv[]) { - if (argc < 3) { - printf("Usage: \n"); - return 0; - } - int n = atoi(argv[1]); - rabit::Init(argc, argv); - int rank = rabit::GetRank(); - int nproc = rabit::GetWorldSize(); - std::string name = rabit::GetProcessorName(); - Model model, local; - srand(0); - int ntrial = 0; - for (int i = 1; i < argc; ++i) { - int n; - if (sscanf(argv[i], "repeat=%d", &n) == 1) ntrial = n; - } - int iter = rabit::LoadCheckPoint(&model, &local); - if (iter == 0) { - model.InitModel(n, 1.0f); - local.InitModel(n, 1.0f + rank); - printf("[%d] reload-trail=%d, init iter=%d\n", rank, ntrial, iter); - } else { - printf("[%d] reload-trail=%d, init iter=%d\n", rank, ntrial, iter); - } - for (int r = iter; r < 3; ++r) { - TestMax(&model, &local, ntrial, r); - printf("[%d] !!!TestMax pass, iter=%d\n", rank, r); - int step = std::max(nproc / 3, 1); - for (int i = 0; i < nproc; i += step) { - TestBcast(n, i, ntrial, r); - } - printf("[%d] !!!TestBcast pass, iter=%d\n", rank, r); - TestSum(&model, &local, ntrial, r); - printf("[%d] !!!TestSum pass, iter=%d\n", rank, r); - rabit::CheckPoint(&model, &local); - printf("[%d] !!!CheckPoint pass, iter=%d\n", rank, r); - } - rabit::Finalize(); - return 0; -} diff --git a/rabit/test/local_recover.py b/rabit/test/local_recover.py deleted file mode 100755 index 6f7fae84c..000000000 --- a/rabit/test/local_recover.py +++ /dev/null @@ -1,32 +0,0 @@ -#!/usr/bin/env python3 - -from __future__ import print_function -from builtins import range - -import sys -sys.path.append('../python') - -import rabit -import numpy as np - -rabit.init(lib='mock') -rank = rabit.get_rank() -n = 10 -nround = 3 -data = np.ones(n) * rank - -version, model, local = rabit.load_checkpoint(True) -if version == 0: - model = np.zeros(n) - local = np.ones(n) -else: - print('[%d] restart from version %d' % (rank, version)) - -for i in range(version, nround): - res = rabit.allreduce(data + model+local, rabit.SUM) - print('[%d] iter=%d: %s' % (rank, i, str(res))) - model = res - local[:] = i - rabit.checkpoint(model, local) - -rabit.finalize() diff --git a/rabit/test/model_recover.cc b/rabit/test/model_recover.cc deleted file mode 100644 index 181638c07..000000000 --- a/rabit/test/model_recover.cc +++ /dev/null @@ -1,157 +0,0 @@ -// this is a test case to test whether rabit can recover model when -// facing an exception -#include -#include -#include -#include - -using namespace rabit; - -// dummy model -class Model : public rabit::Serializable { - public: - // iterations - std::vector data; - // load from stream - virtual void Load(rabit::Stream *fi) { - fi->Read(&data); - } - /*! \brief save the model to the stream */ - virtual void Save(rabit::Stream *fo) const { - fo->Write(data); - } - virtual void InitModel(size_t n) { - data.clear(); - data.resize(n, 1.0f); - } -}; - -inline void TestMax(Model *model, int iter) { - int rank = rabit::GetRank(); - int nproc = rabit::GetWorldSize(); - const int z = iter + 111; - - std::vector ndata(model->data.size()); - for (size_t i = 0; i < ndata.size(); ++i) { - ndata[i] = (i * (rank+1)) % z + model->data[i]; - } - rabit::Allreduce(&ndata[0], ndata.size()); - - for (size_t i = 0; i < ndata.size(); ++i) { - float rmax = (i * 1) % z + model->data[i]; - for (int r = 0; r < nproc; ++r) { - rmax = std::max(rmax, (float)((i * (r+1)) % z) + model->data[i]); - } - utils::Check(rmax == ndata[i], "[%d] TestMax check failurem i=%lu, rmax=%f, ndata=%f", rank, i, rmax, ndata[i]); - } - model->data = ndata; -} - -inline void TestSum(Model *model, int iter) { - int rank = rabit::GetRank(); - int nproc = rabit::GetWorldSize(); - const int z = 131 + iter; - - std::vector ndata(model->data.size()); - for (size_t i = 0; i < ndata.size(); ++i) { - ndata[i] = (i * (rank+1)) % z + model->data[i]; - } - Allreduce(&ndata[0], ndata.size()); - - for (size_t i = 0; i < ndata.size(); ++i) { - float rsum = model->data[i] * nproc; - for (int r = 0; r < nproc; ++r) { - rsum += (float)((i * (r+1)) % z); - } - utils::Check(fabsf(rsum - ndata[i]) < 1e-5 , - "[%d] TestSum check failure, local=%g, allreduce=%g", rank, rsum, ndata[i]); - } - model->data = ndata; -} - -inline void TestAllgather(Model *model, int iter) { - int rank = rabit::GetRank(); - int nproc = rabit::GetWorldSize(); - const int z = 131 + iter; - - std::vector ndata(model->data.size() * nproc); - size_t beginSlice = rank * model->data.size(); - for (size_t i = 0; i < model->data.size(); ++i) { - ndata[beginSlice + i] = (i * (rank+1)) % z + model->data[i]; - } - Allgather(&ndata[0], ndata.size(), beginSlice, - model->data.size(), model->data.size()); - - for (size_t i = 0; i < ndata.size(); ++i) { - int curRank = i / model->data.size(); - int remainder = i % model->data.size(); - float data = (remainder * (curRank+1)) % z + model->data[remainder]; - utils::Check(fabsf(data - ndata[i]) < 1e-5 , - "[%d] TestAllgather check failure, local=%g, allgatherring=%g", rank, data, ndata[i]); - } - model->data = ndata; -} - -inline void TestBcast(size_t n, int root) { - int rank = rabit::GetRank(); - std::string s; s.resize(n); - for (size_t i = 0; i < n; ++i) { - s[i] = char(i % 126 + 1); - } - std::string res; - if (root == rank) { - res = s; - } - rabit::Broadcast(&res, root); - - utils::Check(res == s, "[%d] TestBcast fail", rank); -} - -int main(int argc, char *argv[]) { - if (argc < 3) { - printf("Usage: \n"); - return 0; - } - int n = atoi(argv[1]); - rabit::Init(argc, argv); - int rank = rabit::GetRank(); - int nproc = rabit::GetWorldSize(); - std::string name = rabit::GetProcessorName(); - - int max_rank = rank; - rabit::Allreduce(&max_rank, 1); - utils::Check(max_rank == nproc - 1, "max rank is world size-1"); - - Model model; - srand(0); - int ntrial = 0; - for (int i = 1; i < argc; ++i) { - int n; - if (sscanf(argv[i], "rabit_num_trial=%d", &n) == 1) ntrial = n; - } - int iter = rabit::LoadCheckPoint(&model); - if (iter == 0) { - model.InitModel(n); - } - printf("[%d] reload-trail=%d, init iter=%d\n", rank, ntrial, iter); - - for (int r = iter; r < 3; ++r) { - TestMax(&model, r); - printf("[%d] !!!TestMax pass, iter=%d\n", rank, r); - int step = std::max(nproc / 3, 1); - for (int i = 0; i < nproc; i += step) { - TestBcast(n, i); - } - printf("[%d] !!!TestBcast pass, iter=%d\n", rank, r); - - TestSum(&model, r); - printf("[%d] !!!TestSum pass, iter=%d\n", rank, r); - TestAllgather(&model, r); - printf("[%d] !!!TestAllgather pass, iter=%d\n", rank, r); - rabit::CheckPoint(&model); - printf("[%d] !!!Checkpoint pass, iter=%d\n", rank, r); - } - rabit::Finalize(); - return 0; -} - diff --git a/rabit/test/speed_runner.py b/rabit/test/speed_runner.py deleted file mode 100644 index 1644bfe99..000000000 --- a/rabit/test/speed_runner.py +++ /dev/null @@ -1,34 +0,0 @@ -import os -import argparse -import sys - -def main(): - parser = argparse.ArgumentParser(description='TODO') - parser.add_argument('-ho', '--host_dir', required=True) - parser.add_argument('-s', '--submit_script', required=True) - parser.add_argument('-rex', '--rabit_exec', required=True) - parser.add_argument('-mpi', '--mpi_exec', required=True) - args = parser.parse_args() - - ndata = [10**4, 10**5, 10**6, 10**7] - nrepeat = [10**4, 10**3, 10**2, 10] - - machines = [2,4,8,16,31] - - executables = [args.rabit_exec, args.mpi_exec] - - for executable in executables: - sys.stderr.write('Executable %s' % executable) - sys.stderr.flush() - for i, data in enumerate(ndata): - for machine in machines: - host_file = os.path.join(args.host_dir, 'hosts%d' % machine) - cmd = 'python %s %d %s %s %d %d' % (args.submit_script, machine, host_file, executable, data, nrepeat[i]) - sys.stderr.write('data=%d, repeat=%d, machine=%d\n' % (data, nrepeat[i], machine)) - sys.stderr.flush() - os.system(cmd) - sys.stderr.write('\n') - sys.stderr.flush() - -if __name__ == "__main__": - main() diff --git a/rabit/test/speed_test.cc b/rabit/test/speed_test.cc deleted file mode 100644 index 8eb543def..000000000 --- a/rabit/test/speed_test.cc +++ /dev/null @@ -1,99 +0,0 @@ -// This program is used to test the speed of rabit API -#include -#include -#include -#include -#include -#include - -using namespace rabit; - -double max_tdiff, sum_tdiff, bcast_tdiff, tot_tdiff; - -inline void TestMax(size_t n) { - int rank = rabit::GetRank(); - std::vector ndata(n); - for (size_t i = 0; i < ndata.size(); ++i) { - ndata[i] = (i * (rank+1)) % 111; - } - double tstart = utils::GetTime(); - rabit::Allreduce(&ndata[0], ndata.size()); - max_tdiff += utils::GetTime() - tstart; -} - -inline void TestSum(size_t n) { - int rank = rabit::GetRank(); - const int z = 131; - std::vector ndata(n); - for (size_t i = 0; i < ndata.size(); ++i) { - ndata[i] = (i * (rank+1)) % z; - } - double tstart = utils::GetTime(); - rabit::Allreduce(&ndata[0], ndata.size()); - sum_tdiff += utils::GetTime() - tstart; -} - -inline void TestBcast(size_t n, int root) { - int rank = rabit::GetRank(); - std::string s; s.resize(n); - for (size_t i = 0; i < n; ++i) { - s[i] = char(i % 126 + 1); - } - std::string res; - res.resize(n); - if (root == rank) { - res = s; - } - double tstart = utils::GetTime(); - rabit::Broadcast(&res[0], res.length(), root); - bcast_tdiff += utils::GetTime() - tstart; -} - -inline void PrintStats(const char *name, double tdiff, int n, int nrep, size_t size) { - int nproc = rabit::GetWorldSize(); - double tsum = tdiff; - rabit::Allreduce(&tsum, 1); - double tavg = tsum / nproc; - double tsqr = tdiff - tavg; - tsqr *= tsqr; - rabit::Allreduce(&tsqr, 1); - double tstd = sqrt(tsqr / nproc); - if (rabit::GetRank() == 0) { - rabit::TrackerPrintf("%s: mean=%g, std=%g sec\n", name, tavg, tstd); - double ndata = n; - ndata *= nrep * size; - if (n != 0) { - rabit::TrackerPrintf("%s-speed: %g MB/sec\n", name, (ndata / tavg) / 1024 / 1024 ); - } - } -} - -int main(int argc, char *argv[]) { - if (argc < 3) { - printf("Usage: \n"); - return 0; - } - srand(0); - int n = atoi(argv[1]); - int nrep = atoi(argv[2]); - utils::Check(nrep >= 1, "need to at least repeat running once"); - rabit::Init(argc, argv); - //int rank = rabit::GetRank(); - int nproc = rabit::GetWorldSize(); - std::string name = rabit::GetProcessorName(); - max_tdiff = sum_tdiff = bcast_tdiff = 0; - double tstart = utils::GetTime(); - for (int i = 0; i < nrep; ++i) { - TestMax(n); - TestSum(n); - TestBcast(n, rand() % nproc); - } - tot_tdiff = utils::GetTime() - tstart; - // use allreduce to get the sum and std of time - PrintStats("max_tdiff", max_tdiff, n, nrep, sizeof(float)); - PrintStats("sum_tdiff", sum_tdiff, n, nrep, sizeof(float)); - PrintStats("bcast_tdiff", bcast_tdiff, n, nrep, sizeof(char)); - PrintStats("tot_tdiff", tot_tdiff, 0, nrep, sizeof(float)); - rabit::Finalize(); - return 0; -} diff --git a/rabit/test/test.mk b/rabit/test/test.mk deleted file mode 100644 index 537212d17..000000000 --- a/rabit/test/test.mk +++ /dev/null @@ -1,37 +0,0 @@ -RABIT_BUILD_DMLC = 0 - -ifeq ($(RABIT_BUILD_DMLC),1) - DMLC=../dmlc-core -else - DMLC=../../dmlc-core -endif - -# this is a makefile used to show testcases of rabit -.PHONY: all - -all: model_recover_10_10k model_recover_10_10k_die_same model_recover_10_10k_die_hard local_recover_10_10k lazy_recover_10_10k_die_hard lazy_recover_10_10k_die_same ringallreduce_10_10k pylocal_recover_10_10k - -# this experiment test recovery with actually process exit, use keepalive to keep program alive -model_recover_10_10k: - python $(DMLC)/tracker/dmlc-submit --cluster local --num-workers=10 --local-num-attempt=20 model_recover 10000 mock=0,0,1,0 mock=1,1,1,0 rabit_bootstrap_cache=true rabit_debug=true rabit_reduce_ring_mincount=1 rabit_timeout=true rabit_timeout_sec=5 - -model_recover_10_10k_die_same: - python $(DMLC)/tracker/dmlc-submit --cluster local --num-workers=10 --local-num-attempt=20 model_recover 10000 mock=0,0,1,0 mock=1,1,1,0 mock=0,1,1,0 mock=4,1,1,0 mock=9,1,1,0 rabit_bootstrap_cache=1 - -model_recover_10_10k_die_hard: - python $(DMLC)/tracker/dmlc-submit --cluster local --num-workers=10 --local-num-attempt=20 model_recover 10000 mock=0,0,1,0 mock=1,1,1,0 mock=1,1,1,1 mock=0,1,1,0 mock=4,1,1,0 mock=9,1,1,0 mock=8,1,2,0 mock=4,1,3,0 rabit_bootstrap_cache=1 - -local_recover_10_10k: - python $(DMLC)/tracker/dmlc-submit --cluster local --num-workers=10 --local-num-attempt=20 local_recover 10000 mock=0,0,1,0 mock=1,1,1,0 mock=0,1,1,0 mock=4,1,1,0 mock=9,1,1,0 mock=1,1,1,1 - -pylocal_recover_10_10k: - python $(DMLC)/tracker/dmlc-submit --cluster local --num-workers=10 --local-num-attempt=20 local_recover.py 10000 mock=0,0,1,0 mock=1,1,1,0 mock=0,1,1,0 mock=4,1,1,0 mock=9,1,1,0 mock=1,1,1,1 - -lazy_recover_10_10k_die_hard: - python $(DMLC)/tracker/dmlc-submit --cluster local --num-workers=10 --local-num-attempt=20 lazy_recover 10000 mock=0,0,1,0 mock=1,1,1,0 mock=1,1,1,1 mock=0,1,1,0 mock=4,1,1,0 mock=9,1,1,0 mock=8,1,2,0 mock=4,1,3,0 - -lazy_recover_10_10k_die_same: - python $(DMLC)/tracker/dmlc-submit --cluster local --num-workers=10 --local-num-attempt=20 lazy_recover 10000 mock=0,0,1,0 mock=1,1,1,0 mock=0,1,1,0 mock=4,1,1,0 mock=9,1,1,0 - -ringallreduce_10_10k: - python $(DMLC)/tracker/dmlc-submit --cluster local --num-workers=10 model_recover 10000 rabit_reduce_ring_mincount=10 diff --git a/src/common/random.h b/src/common/random.h index 7fd461d22..626800597 100644 --- a/src/common/random.h +++ b/src/common/random.h @@ -143,7 +143,7 @@ class ColumnSampler { */ ColumnSampler() { uint32_t seed = common::GlobalRandom()(); - rabit::Broadcast(&seed, sizeof(seed), 0, "seed"); + rabit::Broadcast(&seed, sizeof(seed), 0); rng_.seed(seed); } diff --git a/src/data/data.cc b/src/data/data.cc index 1733529c0..cd64f10d8 100644 --- a/src/data/data.cc +++ b/src/data/data.cc @@ -714,8 +714,7 @@ DMatrix* DMatrix::Load(const std::string& uri, /* sync up number of features after matrix loaded. * partitioned data will fail the train/val validation check * since partitioned data not knowing the real number of features. */ - rabit::Allreduce(&dmat->Info().num_col_, 1, nullptr, - nullptr, fname.c_str()); + rabit::Allreduce(&dmat->Info().num_col_, 1); // backward compatiblity code. if (!load_row_split) { MetaInfo& info = dmat->Info(); diff --git a/src/learner.cc b/src/learner.cc index dd0244e87..f6af88e9e 100644 --- a/src/learner.cc +++ b/src/learner.cc @@ -546,8 +546,7 @@ class LearnerConfiguration : public Learner { num_feature = std::max(num_feature, static_cast(num_col)); } - rabit::Allreduce(&num_feature, 1, nullptr, nullptr, - "num_feature"); + rabit::Allreduce(&num_feature, 1); if (num_feature > mparam_.num_feature) { mparam_.num_feature = num_feature; }