diff --git a/src/allreduce.h b/src/allreduce.h index 264541211..c6dccade6 100644 --- a/src/allreduce.h +++ b/src/allreduce.h @@ -2,7 +2,9 @@ #define ALLREDUCE_H /*! * \file allreduce.h - * \brief This file defines a template wrapper of engine to ensure + * \brief This file defines a template wrapper of engine to give more flexible + * AllReduce operations + * * \author Tianqi Chen, Nacho, Tianyi */ #include "./engine.h" diff --git a/src/engine.h b/src/engine.h index 42e19c139..d3493945f 100644 --- a/src/engine.h +++ b/src/engine.h @@ -1,6 +1,6 @@ /*! * \file engine.h - * \brief This file defines the interface of allreduce library + * \brief This file defines the core interface of allreduce library * \author Tianqi Chen, Nacho, Tianyi */ #ifndef ALLREDUCE_ENGINE_H diff --git a/src/engine_base.cc b/src/engine_base.cc index e2eca014f..dd9c8ac56 100644 --- a/src/engine_base.cc +++ b/src/engine_base.cc @@ -1,3 +1,8 @@ +/*! + * \file engine_base.cc + * \brief Basic implementation of AllReduce + * \author Tianqi, Nacho, Tianyi + */ #define _CRT_SECURE_NO_WARNINGS #define _CRT_SECURE_NO_DEPRECATE #define NOMINMAX @@ -137,10 +142,8 @@ void AllReduceBase::SetParam(const char *name, const char *val) { * \param type_nbytes the unit number of bytes the type have * \param count number of elements to be reduced * \param reducer reduce function - * \return this function can return - * - kSuccess: allreduce is success, - * - kSockError: a neighbor node go down, the connection is dropped - * - kGetExcept: another node which is not my neighbor go down, get Out-of-Band exception notification from my neighbor + * \return this function can return kSuccess, kSockError, kGetExcept, see ReturnType for details + * \sa ReturnType */ AllReduceBase::ReturnType AllReduceBase::TryAllReduce(void *sendrecvbuf_, @@ -278,7 +281,8 @@ AllReduceBase::TryAllReduce(void *sendrecvbuf_, * \param sendrecvbuf_ buffer for both sending and recving data * \param total_size the size of the data to be broadcasted * \param root the root worker id to broadcast the data - * \return this function can return three possible values, see detail in TryAllReduce + * \return this function can return kSuccess, kSockError, kGetExcept, see ReturnType for details + * \sa ReturnType */ AllReduceBase::ReturnType AllReduceBase::TryBroadcast(void *sendrecvbuf_, size_t total_size, int root) { diff --git a/src/engine_base.h b/src/engine_base.h index 6c138529a..61fdd6033 100644 --- a/src/engine_base.h +++ b/src/engine_base.h @@ -97,8 +97,14 @@ class AllReduceBase : public IEngine { protected: /*! \brief enumeration of possible returning results from Try functions */ enum ReturnType { + /*! \brief execution is successful */ kSuccess, + /*! \brief a neighbor node go down, the connection is dropped */ kSockError, + /*! + * \brief another node which is not my neighbor go down, + * get Out-of-Band exception notification from my neighbor + */ kGetExcept }; // link record to a neighbor @@ -202,10 +208,8 @@ class AllReduceBase : public IEngine { * \param type_nbytes the unit number of bytes the type have * \param count number of elements to be reduced * \param reducer reduce function - * \return this function can return - * - kSuccess: allreduce is success, - * - kSockError: a neighbor node go down, the connection is dropped - * - kGetExcept: another node which is not my neighbor go down, get Out-of-Band exception notification from my neighbor + * \return this function can return kSuccess, kSockError, kGetExcept, see ReturnType for details + * \sa ReturnType */ ReturnType TryAllReduce(void *sendrecvbuf_, size_t type_nbytes, @@ -216,7 +220,8 @@ class AllReduceBase : public IEngine { * \param sendrecvbuf_ buffer for both sending and recving data * \param size the size of the data to be broadcasted * \param root the root worker id to broadcast the data - * \return this function can return three possible values, see detail in TryAllReduce + * \return this function can return kSuccess, kSockError, kGetExcept, see ReturnType for details + * \sa ReturnType */ ReturnType TryBroadcast(void *sendrecvbuf_, size_t size, int root); //---- local data related to link ---- diff --git a/src/engine_robust.cc b/src/engine_robust.cc index 00efd7447..fcc3ebc20 100644 --- a/src/engine_robust.cc +++ b/src/engine_robust.cc @@ -1,3 +1,8 @@ +/*! + * \file engine_robust.cc + * \brief Robust implementation of AllReduce + * \author Tianqi, Nacho, Tianyi + */ #define _CRT_SECURE_NO_WARNINGS #define _CRT_SECURE_NO_DEPRECATE #define NOMINMAX @@ -71,8 +76,7 @@ AllReduceRobust::ReturnType AllReduceRobust::TryResetLinks(void) { for (int i = 0; i < nlink; ++i) { links[i].InitBuffer(sizeof(int), 1 << 10, reduce_buffer_size); links[i].ResetSize(); - } - + } // read and discard data from all channels until pass mark while (true) { for (int i = 0; i < nlink; ++i) { @@ -179,12 +183,150 @@ AllReduceRobust::ReturnType AllReduceRobust::TryResetLinks(void) { } return kSuccess; } - -bool AllReduceRobust::RecoverExec(void *sendrecvbuf_, size_t size, int flag, int seqno) { - if (flag != 0) { - utils::Assert(seqno == ActionSummary::kMaxSeq, "must only set seqno for normal operations"); +/*! + * \brief try to reconnect the broken links + * \return this function can kSuccess or kSockError + */ +AllReduceRobust::ReturnType AllReduceRobust::TryReConnectLinks(void) { + utils::Error("TryReConnectLinks: not implemented"); + return kSuccess; +} +/*! + * \brief if err_type indicates an error + * recover links according to the error type reported + * if there is no error, return true + * \param err_type the type of error happening in the system + * \return true if err_type is kSuccess, false otherwise + */ +bool AllReduceRobust::CheckAndRecover(ReturnType err_type) { + if (err_type == kSuccess) return true; + while(err_type != kSuccess) { + switch(err_type) { + case kGetExcept: err_type = TryResetLinks(); break; + case kSockError: { + TryResetLinks(); + err_type = TryReConnectLinks(); + break; + } + default: utils::Assert(false, "RecoverLinks: cannot reach here"); + } } - ActionSummary act(flag, seqno); + return false; +} +/*! + * \brief try to load check point + * + * This is a collaborative function called by all nodes + * only the nodes with requester set to true really needs to load the check point + * other nodes acts as collaborative roles to complete this request + * + * \param requester whether current node is the requester + * \return this function can return kSuccess/kSockError/kGetExcept, see ReturnType for details + * \sa ReturnType + */ +AllReduceRobust::ReturnType AllReduceRobust::TryLoadCheckPoint(bool requester) { + utils::Error("TryLoadCheckPoint: not implemented"); + return kSuccess; +} +/*! + * \brief try to get the result of operation specified by seqno + * + * This is a collaborative function called by all nodes + * only the nodes with requester set to true really needs to get the result + * other nodes acts as collaborative roles to complete this request + * + * \param buf the buffer to store the result, this parameter is only use when current node is requester + * \param size the total size of the buffer, this parameter is only use when current node is requester + * \param seqno sequence number of the operation, this is unique index of a operation in current iteration + * \param requester whether current node is the requester + * \return this function can return kSuccess/kSockError/kGetExcept, see ReturnType for details + * \sa ReturnType + */ +AllReduceRobust::ReturnType AllReduceRobust::TryGetResult(void *sendrecvbuf, size_t size, int seqno, bool requester) { + utils::Error("TryGetResult: not implemented"); + return kSuccess; +} +/*! + * \brief try to run recover execution for a request action described by flag and seqno, + * the function will keep blocking to run possible recovery operations before the specified action, + * until the requested result is received by a recovering procedure, + * or the function discovers that the requested action is not yet executed, and return false + * + * \param buf the buffer to store the result + * \param size the total size of the buffer + * \param flag flag information about the action \sa ActionSummary + * \param seqno sequence number of the action, if it is special action with flag set, seqno needs to be set to ActionSummary::kMaxSeq + * + * \return if this function can return true or false + * - true means buf already set to the + * result by recovering procedure, the action is complete, no further action is needed + * - false means this is the lastest action that has not yet been executed, need to execute the action + */ +bool AllReduceRobust::RecoverExec(void *buf, size_t size, int flag, int seqno) { + if (flag != 0) { + utils::Assert(seqno == ActionSummary::kMaxSeq, "must only set seqno for normal operations"); + } + // request + ActionSummary req(flag, seqno); + while (true) { + // action + ActionSummary act = req; + // get the reduced action + if (!CheckAndRecover(TryAllReduce(&act, sizeof(act), 1, ActionSummary::Reducer))) continue; + if (act.check_ack()) { + if (act.check_point()) { + // if we also have check_point, do check point first + utils::Assert(!act.diff_seq(), + "check ack & check pt cannot occur together with normal ops"); + // if we requested checkpoint, we are free to go + if (req.check_point()) return true; + } else if (act.load_check()) { + // if there is only check_ack and load_check, do load_check + if (!CheckAndRecover(TryLoadCheckPoint(req.load_check()))) continue; + // if requested load check, then misson complete + if (req.load_check()) return true; + } else { + // there is no check point and no load check, execute check ack + if (req.check_ack()) return true; + } + // if execute to this point + // this means the action requested has not been completed + // try next round + } else { + if (act.check_point()) { + if (act.diff_seq()) { + utils::Assert(act.min_seqno() != ActionSummary::kMaxSeq, "min seq bug"); + bool requester = req.min_seqno() == act.min_seqno(); + if (!CheckAndRecover(TryGetResult(buf, size, act.min_seqno(), requester))) continue; + if (requester) return true; + } else { + // no difference in seq no, means we are free to check point + if (req.check_point()) return true; + } + } else { + // no check point + if (act.load_check()) { + // load check have higher priority, do load_check + if (!CheckAndRecover(TryLoadCheckPoint(req.load_check()))) continue; + // if requested load check, then misson complete + if (req.load_check()) return true; + } else { + // no special flags, no checkpoint, check ack, load_check + utils::Assert(act.min_seqno() != ActionSummary::kMaxSeq, "min seq bug"); + if (act.diff_seq()) { + bool requester = req.min_seqno() == act.min_seqno(); + if (!CheckAndRecover(TryGetResult(buf, size, act.min_seqno(), requester))) continue; + if (requester) return true; + } else { + // all the request is same, this is most recent command that is yet to be executed + return false; + } + } + } + // something is still incomplete try next round + } + } + utils::Assert(false, "RecoverExec: should not reach here"); return true; } } // namespace engine diff --git a/src/engine_robust.h b/src/engine_robust.h index f1949e11a..fa18406db 100644 --- a/src/engine_robust.h +++ b/src/engine_robust.h @@ -89,15 +89,19 @@ class AllReduceRobust : public AllReduceBase { inline int min_seqno(void) const { return seqcode >> 4; } + // whether the operation set contains a load_check + inline bool load_check(void) const { + return (seqcode & kLoadCheck) != 0; + } // whether the operation set contains a check point inline bool check_point(void) const { return (seqcode & kCheckPoint) != 0; } - // whether the operation set contains a check point + // whether the operation set contains a check ack inline bool check_ack(void) const { return (seqcode & kCheckAck) != 0; } - // whether the operation set contains a check point + // whether the operation set contains different sequence number inline bool diff_seq(void) const { return (seqcode & kDiffSeq) != 0; } @@ -184,17 +188,64 @@ class AllReduceRobust : public AllReduceBase { * when kSockError is returned, it simply means there are bad sockets in the links, * and some link recovery proceduer is needed */ - ReturnType TryResetLinks(void); - /*! - * \brief Run recovery execution of a action specified by flag and seqno, - * there can be two outcome of the function - * - * \param sendrecvbuf_ - * - * \return if this function returns true, this means - * behind and we will be able to recover data from existing node + ReturnType TryResetLinks(void); + /*! + * \brief try to reconnect the broken links + * \return this function can kSuccess or kSockError */ - bool RecoverExec(void *sendrecvbuf_, size_t size, int flag, int seqno); + ReturnType TryReConnectLinks(void); + /*! + * \brief if err_type indicates an error + * recover links according to the error type reported + * if there is no error, return true + * \param err_type the type of error happening in the system + * \return true if err_type is kSuccess, false otherwise + */ + bool CheckAndRecover(ReturnType err_type); + /*! + * \brief try to run recover execution for a request action described by flag and seqno, + * the function will keep blocking to run possible recovery operations before the specified action, + * until the requested result is received by a recovering procedure, + * or the function discovers that the requested action is not yet executed, and return false + * + * \param buf the buffer to store the result + * \param size the total size of the buffer + * \param flag flag information about the action \sa ActionSummary + * \param seqno sequence number of the action, if it is special action with flag set, seqno needs to be set to ActionSummary::kMaxSeq + * + * \return if this function can return true or false + * - true means buf already set to the + * result by recovering procedure, the action is complete, no further action is needed + * - false means this is the lastest action that has not yet been executed, need to execute the action + */ + bool RecoverExec(void *buf, size_t size, int flag, int seqno = ActionSummary::kMaxSeq); + /*! + * \brief try to load check point + * + * This is a collaborative function called by all nodes + * only the nodes with requester set to true really needs to load the check point + * other nodes acts as collaborative roles to complete this request + * + * \param requester whether current node is the requester + * \return this function can return kSuccess/kSockError/kGetExcept, see ReturnType for details + * \sa ReturnType + */ + ReturnType TryLoadCheckPoint(bool requester); + /*! + * \brief try to get the result of operation specified by seqno + * + * This is a collaborative function called by all nodes + * only the nodes with requester set to true really needs to get the result + * other nodes acts as collaborative roles to complete this request + * + * \param buf the buffer to store the result, this parameter is only use when current node is requester + * \param size the total size of the buffer, this parameter is only use when current node is requester + * \param seqno sequence number of the operation, this is unique index of a operation in current iteration + * \param requester whether current node is the requester + * \return this function can return kSuccess/kSockError/kGetExcept, see ReturnType for details + * \sa ReturnType + */ + ReturnType TryGetResult(void *buf, size_t size, int seqno, bool requester); //---- recovery data structure ---- // call sequence counter, records how many calls we made so far // from last call to CheckPoint, LoadCheckPoint