[Breaking] Remove rabit support for custom reductions and grow_local_histmaker updater (#7992)

This commit is contained in:
Rong Ou
2022-06-21 00:08:23 -07:00
committed by GitHub
parent 4a87ea49b8
commit e5ec546da5
17 changed files with 36 additions and 1100 deletions

View File

@@ -245,51 +245,6 @@ void Allreduce_(void *sendrecvbuf, // NOLINT
mpi::OpType op,
IEngine::PreprocFunction prepare_fun = nullptr,
void *prepare_arg = nullptr);
/*!
* \brief handle for customized reducer, used to handle customized reduce
* this class is mainly created for compatiblity issues with MPI's customized reduce
*/
class ReduceHandle {
public:
// constructor
ReduceHandle();
// destructor
~ReduceHandle();
/*!
* \brief initialize the reduce function,
* with the type the reduce function needs to deal with
* the reduce function MUST be communicative
*/
void Init(IEngine::ReduceFunction redfunc, size_t type_nbytes);
/*!
* \brief customized in-place all reduce operation
* \param sendrecvbuf the in place send-recv buffer
* \param type_n4bytes size of the type, in terms of 4bytes
* \param count number of elements to send
* \param prepare_func Lazy preprocessing function, lazy prepare_fun(prepare_arg)
* will be called by the function before performing Allreduce in order to initialize the data in sendrecvbuf_.
* If the result of Allreduce can be recovered directly, then prepare_func will NOT be called
* \param prepare_arg argument used to pass into the lazy preprocessing function
*/
void Allreduce(void *sendrecvbuf,
size_t type_nbytes,
size_t count,
IEngine::PreprocFunction prepare_fun = nullptr,
void *prepare_arg = nullptr);
/*! \return the number of bytes occupied by the type */
static int TypeSize(const MPI::Datatype &dtype);
protected:
// handle function field
void *handle_ {nullptr};
// reduce function of the reducer
IEngine::ReduceFunction *redfunc_{nullptr};
// handle to the type field
void *htype_{nullptr};
// the created type in 4 bytes
size_t created_type_nbytes_;
};
} // namespace engine
} // namespace rabit
#endif // RABIT_INTERNAL_ENGINE_H_

View File

@@ -225,122 +225,5 @@ inline void LazyCheckPoint(const Serializable *global_model) {
inline int VersionNumber() {
return engine::GetEngine()->VersionNumber();
}
// ---------------------------------
// Code to handle customized Reduce
// ---------------------------------
// function to perform reduction for Reducer
template<typename DType, void (*freduce)(DType &dst, const DType &src)>
inline void ReducerSafeImpl(const void *src_, void *dst_, int len_, const MPI::Datatype &dtype) {
const size_t kUnit = sizeof(DType);
const char *psrc = reinterpret_cast<const char*>(src_);
char *pdst = reinterpret_cast<char*>(dst_);
for (int i = 0; i < len_; ++i) {
DType tdst, tsrc;
// use memcpy to avoid alignment issue
std::memcpy(&tdst, pdst + (i * kUnit), sizeof(tdst));
std::memcpy(&tsrc, psrc + (i * kUnit), sizeof(tsrc));
freduce(tdst, tsrc);
std::memcpy(pdst + i * kUnit, &tdst, sizeof(tdst));
}
}
// function to perform reduction for Reducer
template<typename DType, void (*freduce)(DType &dst, const DType &src)> // NOLINT(*)
inline void ReducerAlignImpl(const void *src_, void *dst_,
int len_, const MPI::Datatype &dtype) {
const DType *psrc = reinterpret_cast<const DType*>(src_);
DType *pdst = reinterpret_cast<DType*>(dst_);
for (int i = 0; i < len_; ++i) {
freduce(pdst[i], psrc[i]);
}
}
template<typename DType, void (*freduce)(DType &dst, const DType &src)> // NOLINT(*)
inline Reducer<DType, freduce>::Reducer() {
// it is safe to directly use handle for aligned data types
if (sizeof(DType) == 8 || sizeof(DType) == 4 || sizeof(DType) == 1) {
this->handle_.Init(ReducerAlignImpl<DType, freduce>, sizeof(DType));
} else {
this->handle_.Init(ReducerSafeImpl<DType, freduce>, sizeof(DType));
}
}
template<typename DType, void (*freduce)(DType &dst, const DType &src)> // NOLINT(*)
inline void Reducer<DType, freduce>::Allreduce(DType *sendrecvbuf, size_t count,
void (*prepare_fun)(void *arg),
void *prepare_arg) {
handle_.Allreduce(sendrecvbuf, sizeof(DType), count, prepare_fun,
prepare_arg);
}
// function to perform reduction for SerializeReducer
template<typename DType>
inline void SerializeReducerFuncImpl(const void *src_, void *dst_,
int len_, const MPI::Datatype &dtype) {
int nbytes = engine::ReduceHandle::TypeSize(dtype);
// temp space
for (int i = 0; i < len_; ++i) {
DType tsrc, tdst;
utils::MemoryFixSizeBuffer fsrc((char*)(src_) + i * nbytes, nbytes); // NOLINT(*)
utils::MemoryFixSizeBuffer fdst((char*)(dst_) + i * nbytes, nbytes); // NOLINT(*)
tsrc.Load(fsrc);
tdst.Load(fdst);
// govern const check
tdst.Reduce(static_cast<const DType &>(tsrc), nbytes);
fdst.Seek(0);
tdst.Save(fdst);
}
}
template<typename DType>
inline SerializeReducer<DType>::SerializeReducer() {
handle_.Init(SerializeReducerFuncImpl<DType>, sizeof(DType));
}
// closure to call Allreduce
template<typename DType>
struct SerializeReduceClosure {
DType *sendrecvobj;
size_t max_nbyte, count;
void (*prepare_fun)(void *arg);
void *prepare_arg;
std::string *p_buffer;
// invoke the closure
inline void Run() {
if (prepare_fun != nullptr) prepare_fun(prepare_arg);
for (size_t i = 0; i < count; ++i) {
utils::MemoryFixSizeBuffer fs(BeginPtr(*p_buffer) + i * max_nbyte, max_nbyte);
sendrecvobj[i].Save(fs);
}
}
inline static void Invoke(void *c) {
static_cast<SerializeReduceClosure<DType>*>(c)->Run();
}
};
template<typename DType>
inline void SerializeReducer<DType>::Allreduce(DType *sendrecvobj,
size_t max_nbyte, size_t count,
void (*prepare_fun)(void *arg),
void *prepare_arg) {
buffer_.resize(max_nbyte * count);
// setup closure
SerializeReduceClosure<DType> c;
c.sendrecvobj = sendrecvobj; c.max_nbyte = max_nbyte; c.count = count;
c.prepare_fun = prepare_fun; c.prepare_arg = prepare_arg; c.p_buffer = &buffer_;
// invoke here
handle_.Allreduce(BeginPtr(buffer_), max_nbyte, count,
SerializeReduceClosure<DType>::Invoke, &c);
for (size_t i = 0; i < count; ++i) {
utils::MemoryFixSizeBuffer fs(BeginPtr(buffer_) + i * max_nbyte, max_nbyte);
sendrecvobj[i].Load(fs);
}
}
template<typename DType, void (*freduce)(DType &dst, const DType &src)> // NOLINT(*)g
inline void Reducer<DType, freduce>::Allreduce(DType *sendrecvbuf, size_t count,
std::function<void()> prepare_fun) {
this->Allreduce(sendrecvbuf, count, InvokeLambda, &prepare_fun);
}
template<typename DType>
inline void SerializeReducer<DType>::Allreduce(DType *sendrecvobj,
size_t max_nbytes, size_t count,
std::function<void()> prepare_fun) {
this->Allreduce(sendrecvobj, max_nbytes, count, InvokeLambda, &prepare_fun);
}
} // namespace rabit
#endif // RABIT_INTERNAL_RABIT_INL_H_

View File

@@ -274,100 +274,6 @@ inline void LazyCheckPoint(const Serializable *global_model);
* \sa LoadCheckPoint, CheckPoint
*/
inline int VersionNumber();
// ----- extensions that allow customized reducer ------
// helper class to do customized reduce, user do not need to know the type
namespace engine {
class ReduceHandle;
} // namespace engine
/*!
* \brief template class to make customized reduce and all reduce easy
* Do not use reducer directly in the function you call Finalize,
* because the destructor can execute after Finalize
* \tparam DType data type that to be reduced
* \tparam freduce the customized reduction function
* DType must be a struct, with no pointer
*/
template<typename DType, void (*freduce)(DType &dst, const DType &src)> // NOLINT(*)
class Reducer {
public:
Reducer();
/*!
* \brief customized in-place all reduce operation
* \param sendrecvbuf the in place send-recv buffer
* \param count number of elements to be reduced
* \param prepare_fun Lazy preprocessing function, if it is not NULL, prepare_fun(prepare_arg)
* will be called by the function before performing Allreduce, to initialize the data in sendrecvbuf.
* If the result of Allreduce can be recovered directly, then prepare_func will NOT be called
* \param prepare_arg argument used to pass into the lazy preprocessing function
*/
inline void Allreduce(DType *sendrecvbuf, size_t count,
void (*prepare_fun)(void *) = nullptr,
void *prepare_arg = nullptr);
#if DMLC_USE_CXX11
/*!
* \brief customized in-place all reduce operation, with lambda function as preprocessor
* \param sendrecvbuf pointer to the array of objects to be reduced
* \param count number of elements to be reduced
* \param prepare_fun lambda function executed to prepare the data, if necessary
*/
inline void Allreduce(DType *sendrecvbuf, size_t count,
std::function<void()> prepare_fun);
#endif // DMLC_USE_CXX11
private:
/*! \brief function handle to do reduce */
engine::ReduceHandle handle_;
};
/*!
* \brief template class to make customized reduce,
* this class defines complex reducer handles all the data structure that can be
* serialized/deserialized into fixed size buffer
* Do not use reducer directly in the function you call Finalize, because the destructor can execute after Finalize
*
* \tparam DType data type that to be reduced, DType must contain the following functions:
* \tparam freduce the customized reduction function
* (1) Save(IStream &fs) (2) Load(IStream &fs) (3) Reduce(const DType &src, size_t max_nbyte)
*/
template<typename DType>
class SerializeReducer {
public:
SerializeReducer();
/*!
* \brief customized in-place all reduce operation
* \param sendrecvobj pointer to the array of objects to be reduced
* \param max_nbyte maximum amount of memory needed to serialize each object
* this includes budget limit for intermediate and final result
* \param count number of elements to be reduced
* \param prepare_fun Lazy preprocessing function, if it is not NULL, prepare_fun(prepare_arg)
* will be called by the function before performing Allreduce, to initialize the data in sendrecvbuf.
* If the result of Allreduce can be recovered directly, then the prepare_func will NOT be called
* \param prepare_arg argument used to pass into the lazy preprocessing function
*/
inline void Allreduce(DType *sendrecvobj,
size_t max_nbyte, size_t count,
void (*prepare_fun)(void *) = nullptr,
void *prepare_arg = nullptr);
// C++11 support for lambda prepare function
#if DMLC_USE_CXX11
/*!
* \brief customized in-place all reduce operation, with lambda function as preprocessor
* \param sendrecvobj pointer to the array of objects to be reduced
* \param max_nbyte maximum amount of memory needed to serialize each object
* this includes budget limit for intermediate and final result
* \param count number of elements to be reduced
* \param prepare_fun lambda function executed to prepare the data, if necessary
*/
inline void Allreduce(DType *sendrecvobj,
size_t max_nbyte, size_t count,
std::function<void()> prepare_fun);
#endif // DMLC_USE_CXX11
private:
/*! \brief function handle to do reduce */
engine::ReduceHandle handle_;
/*! \brief temporal buffer used to do reduce*/
std::string buffer_;
};
} // namespace rabit
// implementation of template functions
#include "./internal/rabit-inl.h"