diff --git a/src/allreduce.h b/src/allreduce.h index c6dccade6..3f389a591 100644 --- a/src/allreduce.h +++ b/src/allreduce.h @@ -5,7 +5,7 @@ * \brief This file defines a template wrapper of engine to give more flexible * AllReduce operations * - * \author Tianqi Chen, Nacho, Tianyi + * \author Tianqi Chen, Ignacio Cano, Tianyi Zhou */ #include "./engine.h" diff --git a/src/engine.cc b/src/engine.cc index 17aacd5cf..375f8e770 100644 --- a/src/engine.cc +++ b/src/engine.cc @@ -3,7 +3,7 @@ * \brief this file governs which implementation of engine we are actually using * provides an singleton of engine interface * - * \author Tianqi, Nacho, Tianyi + * \author Tianqi Chen, Ignacio Cano, Tianyi Zhou */ #define _CRT_SECURE_NO_WARNINGS #define _CRT_SECURE_NO_DEPRECATE diff --git a/src/engine_base.cc b/src/engine_base.cc index dd9c8ac56..fb6e683ae 100644 --- a/src/engine_base.cc +++ b/src/engine_base.cc @@ -1,7 +1,8 @@ /*! * \file engine_base.cc * \brief Basic implementation of AllReduce - * \author Tianqi, Nacho, Tianyi + * + * \author Tianqi Chen, Ignacio Cano, Tianyi Zhou */ #define _CRT_SECURE_NO_WARNINGS #define _CRT_SECURE_NO_DEPRECATE diff --git a/src/engine_base.h b/src/engine_base.h index 61fdd6033..582cf5e17 100644 --- a/src/engine_base.h +++ b/src/engine_base.h @@ -5,8 +5,8 @@ * * This implementation provides basic utility of AllReduce and Broadcast * without considering node failure - * - * \author Tianqi, Nacho, Tianyi + * + * \author Tianqi Chen, Ignacio Cano, Tianyi Zhou */ #ifndef ALLREDUCE_ENGINE_BASE_H #define ALLREDUCE_ENGINE_BASE_H @@ -136,7 +136,7 @@ class AllReduceBase : public IEngine { inline void ResetSize(void) { size_write = size_read = 0; } - /*! + /*! * \brief read data into ring-buffer, with care not to existing useful override data * position after protect_start * \param protect_start all data start from protect_start is still needed in buffer @@ -157,7 +157,7 @@ class AllReduceBase : public IEngine { if (len == -1) return errno == EAGAIN || errno == EWOULDBLOCK; size_read += static_cast(len); return true; - } + } /*! * \brief read data into array, * this function can not be used together with ReadToRingBuffer diff --git a/src/engine_robust-inl.h b/src/engine_robust-inl.h index 5eb30625d..2817d4c0a 100644 --- a/src/engine_robust-inl.h +++ b/src/engine_robust-inl.h @@ -2,7 +2,7 @@ * \file engine_robust-inl.h * \brief implementation of inline template function in AllReduceRobust * - * \author Tianqi, Nacho, Tianyi + * \author Tianqi Chen */ #ifndef ALLREDUCE_ENGINE_ROBUST_INL_H #define ALLREDUCE_ENGINE_ROBUST_INL_H diff --git a/src/engine_robust.cc b/src/engine_robust.cc index de7744b23..dbc48f406 100644 --- a/src/engine_robust.cc +++ b/src/engine_robust.cc @@ -1,7 +1,8 @@ /*! * \file engine_robust.cc * \brief Robust implementation of AllReduce - * \author Tianqi, Nacho, Tianyi + * + * \author Tianqi Chen, Ignacio Cano, Tianyi Zhou */ #define _CRT_SECURE_NO_WARNINGS #define _CRT_SECURE_NO_DEPRECATE @@ -272,24 +273,22 @@ inline char DataRequest(const std::pair &node_value, /*! * \brief try to decide the recovery message passing request * \param role the current role of the node - * \param p_req_outlink used to store the output link the - * current node should recv data from, - * this can be -1 or -2, - * -1 means current node have the data - * -2 means current node do not have data, but also do not need to send/recv data - * \param p_req_in used to store the resulting vector, indicating which link we should send the data to * \param p_size used to store the size of the message, for node in state kHaveData, * this size must be set correctly before calling the function * for others, this surves as output parameter * + * \param p_recvlink used to store the link current node should recv data from, if necessary + * this can be -1, which means current node have the data + * \param p_req_in used to store the resulting vector, indicating which link we should send the data to + * * \return this function can return kSuccess/kSockError/kGetExcept, see ReturnType for details * \sa ReturnType */ AllReduceRobust::ReturnType -AllReduceRobust::TryDecideRequest(AllReduceRobust::RecoverType role, - int *p_req_outlink, - std::vector *p_req_in, - size_t *p_size) { +AllReduceRobust::TryDecideRouting(AllReduceRobust::RecoverType role, + size_t *p_size, + int *p_recvlink, + std::vector *p_req_in) { int best_link = -2; {// get the shortest distance to the request point std::vector< std::pair > dist_in, dist_out; @@ -317,7 +316,6 @@ AllReduceRobust::TryDecideRequest(AllReduceRobust::RecoverType role, ReturnType succ = MsgPassing(std::make_pair(role == kRequestData, best_link), &req_in, &req_out, DataRequest); if (succ != kSuccess) return succ; - bool need_recv = false; // set p_req_in p_req_in->resize(req_in.size()); for (size_t i = 0; i < req_in.size(); ++i) { @@ -326,16 +324,115 @@ AllReduceRobust::TryDecideRequest(AllReduceRobust::RecoverType role, if (req_out[i] != 0) { utils::Assert(req_in[i] == 0, "cannot get and receive request"); utils::Assert(static_cast(i) == best_link, "request result inconsistent"); - need_recv = true; } } - if (role == kPassData && !need_recv) { - for (size_t i = 0; i < req_in.size(); ++i) { - utils::Assert(req_in[i] == 0, "Bug in TryDecideRequest"); + *p_recvlink = best_link; + return kSuccess; +} + +/*! + * \brief try to finish the data recovery request, + * this function is used together with TryDecideRouting + * \param role the current role of the node + * \param sendrecvbuf_ the buffer to store the data to be sent/recived + * - if the role is kHaveData, this stores the data to be sent + * - if the role is kRequestData, this is the buffer to store the result + * - if the role is kPassData, this will not be used, and can be NULL + * \param size the size of the data, obtained from TryDecideRouting + * \param recv_link the link index to receive data, if necessary, obtained from TryDecideRouting + * \param req_in the request of each link to send data, obtained from TryDecideRouting + * + * \return this function can return kSuccess/kSockError/kGetExcept, see ReturnType for details + * \sa ReturnType, TryDecideRouting + */ +AllReduceRobust::ReturnType +AllReduceRobust::TryRecoverData(RecoverType role, + void *sendrecvbuf_, + size_t size, + int recv_link, + const std::vector &req_in) { + // no need to run recovery for zero size message + if (size == 0) return kSuccess; + utils::Assert(req_in.size() == links.size(), "TryRecoverData"); + const int nlink = static_cast(links.size()); + { + bool req_data = role == kRequestData; + for (int i = 0; i < nlink; ++i) { + if (req_in[i]) { + utils::Assert(i != recv_link, "TryDecideRouting"); + req_data = true; + } + } + // do not need to provide data or receive data, directly exit + if (!req_data) return kSuccess; + } + for (int i = 0; i < nlink; ++i) { + links[i].ResetSize(); + } + utils::Assert(recv_link >= 0 || role == kHaveData, "recv_link must be active"); + if (role == kPassData) { + links[recv_link].InitBuffer(1, size, reduce_buffer_size); + } + while (true) { + bool finished = true; + utils::SelectHelper selecter; + for (int i = 0; i < nlink; ++i) { + if (i == recv_link && links[i].size_read != size) { + selecter.WatchRead(links[i].sock); + finished = false; + } + if (req_in[i] && links[i].size_write != size) { + selecter.WatchWrite(links[i].sock); + finished = false; + } + selecter.WatchException(links[i].sock); + } + if (finished) break; + selecter.Select(); + if (role == kRequestData) { + const int pid = recv_link; + if (selecter.CheckRead(links[pid].sock)) { + if(!links[pid].ReadToArray(sendrecvbuf_, size)) return kSockError; + } + for (int i = 0; i < nlink; ++i) { + if (req_in[i] && links[i].size_write != links[pid].size_read && + selecter.CheckWrite(links[i].sock)) { + if(!links[i].WriteFromArray(sendrecvbuf_, links[pid].size_read)) return kSockError; + } + } + } + if (role == kHaveData) { + for (int i = 0; i < nlink; ++i) { + if (req_in[i] && selecter.CheckWrite(links[i].sock)) { + if(!links[i].WriteFromArray(sendrecvbuf_, size)) return kSockError; + } + } + } + if (role == kPassData) { + const int pid = recv_link; + const size_t buffer_size = links[pid].buffer_size; + if (selecter.CheckRead(links[pid].sock)) { + size_t min_write = size; + for (int i = 0; i < nlink; ++i) { + if (req_in[i]) min_write = std::min(links[i].size_write, min_write); + } + utils::Assert(min_write <= links[pid].size_read, "boundary check"); + if (!links[pid].ReadToRingBuffer(min_write)) return kSockError; + } + for (int i = 0; i < nlink; ++i) { + if (req_in[i] && selecter.CheckWrite(links[i].sock)) { + size_t start = links[i].size_write % buffer_size; + // send out data from ring buffer + size_t nwrite = std::min(buffer_size - start, links[pid].size_read - links[i].size_write); + ssize_t len = links[pid].sock.Send(links[pid].buffer_head + start, nwrite); + if (len != -1) { + links[i].size_write += len; + } else { + if (errno != EAGAIN && errno != EWOULDBLOCK) return kSockError; + } + } + } } - *p_req_outlink = -2; - } else { - *p_req_outlink = best_link; } return kSuccess; } diff --git a/src/engine_robust.h b/src/engine_robust.h index e6312d7ce..be9cf0998 100644 --- a/src/engine_robust.h +++ b/src/engine_robust.h @@ -5,7 +5,7 @@ * * This implementation considers the failure of nodes * - * \author Tianqi, Nacho, Tianyi + * \author Tianqi Chen, Ignacio Cano, Tianyi Zhou */ #ifndef ALLREDUCE_ENGINE_ROBUST_H #define ALLREDUCE_ENGINE_ROBUST_H @@ -257,25 +257,43 @@ class AllReduceRobust : public AllReduceBase { */ ReturnType TryGetResult(void *buf, size_t size, int seqno, bool requester); /*! - * \brief try to decide the recovery message passing request + * \brief try to decide the routing strategy for recovery * \param role the current role of the node - * \param p_req_outlink used to store the output link the - * current node should recv data from, - * this can be nonnegative value, -1 or -2, - * -1 means current node have the data - * -2 means current node do not have data, but also do not need to send/recv data - * \param p_req_in used to store the resulting vector, indicating which link we should send the data to * \param p_size used to store the size of the message, for node in state kHaveData, * this size must be set correctly before calling the function * for others, this surves as output parameter + + * \param p_recvlink used to store the link current node should recv data from, if necessary + * this can be -1, which means current node have the data + * \param p_req_in used to store the resulting vector, indicating which link we should send the data to * * \return this function can return kSuccess/kSockError/kGetExcept, see ReturnType for details - * \sa ReturnType + * \sa ReturnType, TryRecoverData + */ + ReturnType TryDecideRouting(RecoverType role, + size_t *p_size, + int *p_recvlink, + std::vector *p_req_in); + /*! + * \brief try to finish the data recovery request, + * this function is used together with TryDecideRouting + * \param role the current role of the node + * \param sendrecvbuf_ the buffer to store the data to be sent/recived + * - if the role is kHaveData, this stores the data to be sent + * - if the role is kRequestData, this is the buffer to store the result + * - if the role is kPassData, this will not be used, and can be NULL + * \param size the size of the data, obtained from TryDecideRouting + * \param recv_link the link index to receive data, if necessary, obtained from TryDecideRouting + * \param req_in the request of each link to send data, obtained from TryDecideRouting + * + * \return this function can return kSuccess/kSockError/kGetExcept, see ReturnType for details + * \sa ReturnType, TryDecideRouting */ - ReturnType TryDecideRequest(RecoverType role, - int *p_req_outlink, - std::vector *p_req_in, - size_t *p_size); + ReturnType TryRecoverData(RecoverType role, + void *sendrecvbuf_, + size_t size, + int recv_link, + const std::vector &req_in); /*! * \brief run message passing algorithm on the allreduce tree * the result is edge message stored in p_edge_in and p_edge_out