check in the recover strategy

This commit is contained in:
tqchen 2014-11-30 11:42:59 -08:00
parent 155ed3a814
commit 2e536eda29
6 changed files with 235 additions and 31 deletions

View File

@ -2,7 +2,9 @@
#define ALLREDUCE_H
/*!
* \file allreduce.h
* \brief This file defines a template wrapper of engine to ensure
* \brief This file defines a template wrapper of engine to give more flexible
* AllReduce operations
*
* \author Tianqi Chen, Nacho, Tianyi
*/
#include "./engine.h"

View File

@ -1,6 +1,6 @@
/*!
* \file engine.h
* \brief This file defines the interface of allreduce library
* \brief This file defines the core interface of allreduce library
* \author Tianqi Chen, Nacho, Tianyi
*/
#ifndef ALLREDUCE_ENGINE_H

View File

@ -1,3 +1,8 @@
/*!
* \file engine_base.cc
* \brief Basic implementation of AllReduce
* \author Tianqi, Nacho, Tianyi
*/
#define _CRT_SECURE_NO_WARNINGS
#define _CRT_SECURE_NO_DEPRECATE
#define NOMINMAX
@ -137,10 +142,8 @@ void AllReduceBase::SetParam(const char *name, const char *val) {
* \param type_nbytes the unit number of bytes the type have
* \param count number of elements to be reduced
* \param reducer reduce function
* \return this function can return
* - kSuccess: allreduce is success,
* - kSockError: a neighbor node go down, the connection is dropped
* - kGetExcept: another node which is not my neighbor go down, get Out-of-Band exception notification from my neighbor
* \return this function can return kSuccess, kSockError, kGetExcept, see ReturnType for details
* \sa ReturnType
*/
AllReduceBase::ReturnType
AllReduceBase::TryAllReduce(void *sendrecvbuf_,
@ -278,7 +281,8 @@ AllReduceBase::TryAllReduce(void *sendrecvbuf_,
* \param sendrecvbuf_ buffer for both sending and recving data
* \param total_size the size of the data to be broadcasted
* \param root the root worker id to broadcast the data
* \return this function can return three possible values, see detail in TryAllReduce
* \return this function can return kSuccess, kSockError, kGetExcept, see ReturnType for details
* \sa ReturnType
*/
AllReduceBase::ReturnType
AllReduceBase::TryBroadcast(void *sendrecvbuf_, size_t total_size, int root) {

View File

@ -97,8 +97,14 @@ class AllReduceBase : public IEngine {
protected:
/*! \brief enumeration of possible returning results from Try functions */
enum ReturnType {
/*! \brief execution is successful */
kSuccess,
/*! \brief a neighbor node go down, the connection is dropped */
kSockError,
/*!
* \brief another node which is not my neighbor go down,
* get Out-of-Band exception notification from my neighbor
*/
kGetExcept
};
// link record to a neighbor
@ -202,10 +208,8 @@ class AllReduceBase : public IEngine {
* \param type_nbytes the unit number of bytes the type have
* \param count number of elements to be reduced
* \param reducer reduce function
* \return this function can return
* - kSuccess: allreduce is success,
* - kSockError: a neighbor node go down, the connection is dropped
* - kGetExcept: another node which is not my neighbor go down, get Out-of-Band exception notification from my neighbor
* \return this function can return kSuccess, kSockError, kGetExcept, see ReturnType for details
* \sa ReturnType
*/
ReturnType TryAllReduce(void *sendrecvbuf_,
size_t type_nbytes,
@ -216,7 +220,8 @@ class AllReduceBase : public IEngine {
* \param sendrecvbuf_ buffer for both sending and recving data
* \param size the size of the data to be broadcasted
* \param root the root worker id to broadcast the data
* \return this function can return three possible values, see detail in TryAllReduce
* \return this function can return kSuccess, kSockError, kGetExcept, see ReturnType for details
* \sa ReturnType
*/
ReturnType TryBroadcast(void *sendrecvbuf_, size_t size, int root);
//---- local data related to link ----

View File

@ -1,3 +1,8 @@
/*!
* \file engine_robust.cc
* \brief Robust implementation of AllReduce
* \author Tianqi, Nacho, Tianyi
*/
#define _CRT_SECURE_NO_WARNINGS
#define _CRT_SECURE_NO_DEPRECATE
#define NOMINMAX
@ -71,8 +76,7 @@ AllReduceRobust::ReturnType AllReduceRobust::TryResetLinks(void) {
for (int i = 0; i < nlink; ++i) {
links[i].InitBuffer(sizeof(int), 1 << 10, reduce_buffer_size);
links[i].ResetSize();
}
}
// read and discard data from all channels until pass mark
while (true) {
for (int i = 0; i < nlink; ++i) {
@ -179,12 +183,150 @@ AllReduceRobust::ReturnType AllReduceRobust::TryResetLinks(void) {
}
return kSuccess;
}
bool AllReduceRobust::RecoverExec(void *sendrecvbuf_, size_t size, int flag, int seqno) {
if (flag != 0) {
utils::Assert(seqno == ActionSummary::kMaxSeq, "must only set seqno for normal operations");
/*!
* \brief try to reconnect the broken links
* \return this function can kSuccess or kSockError
*/
AllReduceRobust::ReturnType AllReduceRobust::TryReConnectLinks(void) {
utils::Error("TryReConnectLinks: not implemented");
return kSuccess;
}
/*!
* \brief if err_type indicates an error
* recover links according to the error type reported
* if there is no error, return true
* \param err_type the type of error happening in the system
* \return true if err_type is kSuccess, false otherwise
*/
bool AllReduceRobust::CheckAndRecover(ReturnType err_type) {
if (err_type == kSuccess) return true;
while(err_type != kSuccess) {
switch(err_type) {
case kGetExcept: err_type = TryResetLinks(); break;
case kSockError: {
TryResetLinks();
err_type = TryReConnectLinks();
break;
}
default: utils::Assert(false, "RecoverLinks: cannot reach here");
}
}
ActionSummary act(flag, seqno);
return false;
}
/*!
* \brief try to load check point
*
* This is a collaborative function called by all nodes
* only the nodes with requester set to true really needs to load the check point
* other nodes acts as collaborative roles to complete this request
*
* \param requester whether current node is the requester
* \return this function can return kSuccess/kSockError/kGetExcept, see ReturnType for details
* \sa ReturnType
*/
AllReduceRobust::ReturnType AllReduceRobust::TryLoadCheckPoint(bool requester) {
utils::Error("TryLoadCheckPoint: not implemented");
return kSuccess;
}
/*!
* \brief try to get the result of operation specified by seqno
*
* This is a collaborative function called by all nodes
* only the nodes with requester set to true really needs to get the result
* other nodes acts as collaborative roles to complete this request
*
* \param buf the buffer to store the result, this parameter is only use when current node is requester
* \param size the total size of the buffer, this parameter is only use when current node is requester
* \param seqno sequence number of the operation, this is unique index of a operation in current iteration
* \param requester whether current node is the requester
* \return this function can return kSuccess/kSockError/kGetExcept, see ReturnType for details
* \sa ReturnType
*/
AllReduceRobust::ReturnType AllReduceRobust::TryGetResult(void *sendrecvbuf, size_t size, int seqno, bool requester) {
utils::Error("TryGetResult: not implemented");
return kSuccess;
}
/*!
* \brief try to run recover execution for a request action described by flag and seqno,
* the function will keep blocking to run possible recovery operations before the specified action,
* until the requested result is received by a recovering procedure,
* or the function discovers that the requested action is not yet executed, and return false
*
* \param buf the buffer to store the result
* \param size the total size of the buffer
* \param flag flag information about the action \sa ActionSummary
* \param seqno sequence number of the action, if it is special action with flag set, seqno needs to be set to ActionSummary::kMaxSeq
*
* \return if this function can return true or false
* - true means buf already set to the
* result by recovering procedure, the action is complete, no further action is needed
* - false means this is the lastest action that has not yet been executed, need to execute the action
*/
bool AllReduceRobust::RecoverExec(void *buf, size_t size, int flag, int seqno) {
if (flag != 0) {
utils::Assert(seqno == ActionSummary::kMaxSeq, "must only set seqno for normal operations");
}
// request
ActionSummary req(flag, seqno);
while (true) {
// action
ActionSummary act = req;
// get the reduced action
if (!CheckAndRecover(TryAllReduce(&act, sizeof(act), 1, ActionSummary::Reducer))) continue;
if (act.check_ack()) {
if (act.check_point()) {
// if we also have check_point, do check point first
utils::Assert(!act.diff_seq(),
"check ack & check pt cannot occur together with normal ops");
// if we requested checkpoint, we are free to go
if (req.check_point()) return true;
} else if (act.load_check()) {
// if there is only check_ack and load_check, do load_check
if (!CheckAndRecover(TryLoadCheckPoint(req.load_check()))) continue;
// if requested load check, then misson complete
if (req.load_check()) return true;
} else {
// there is no check point and no load check, execute check ack
if (req.check_ack()) return true;
}
// if execute to this point
// this means the action requested has not been completed
// try next round
} else {
if (act.check_point()) {
if (act.diff_seq()) {
utils::Assert(act.min_seqno() != ActionSummary::kMaxSeq, "min seq bug");
bool requester = req.min_seqno() == act.min_seqno();
if (!CheckAndRecover(TryGetResult(buf, size, act.min_seqno(), requester))) continue;
if (requester) return true;
} else {
// no difference in seq no, means we are free to check point
if (req.check_point()) return true;
}
} else {
// no check point
if (act.load_check()) {
// load check have higher priority, do load_check
if (!CheckAndRecover(TryLoadCheckPoint(req.load_check()))) continue;
// if requested load check, then misson complete
if (req.load_check()) return true;
} else {
// no special flags, no checkpoint, check ack, load_check
utils::Assert(act.min_seqno() != ActionSummary::kMaxSeq, "min seq bug");
if (act.diff_seq()) {
bool requester = req.min_seqno() == act.min_seqno();
if (!CheckAndRecover(TryGetResult(buf, size, act.min_seqno(), requester))) continue;
if (requester) return true;
} else {
// all the request is same, this is most recent command that is yet to be executed
return false;
}
}
}
// something is still incomplete try next round
}
}
utils::Assert(false, "RecoverExec: should not reach here");
return true;
}
} // namespace engine

View File

@ -89,15 +89,19 @@ class AllReduceRobust : public AllReduceBase {
inline int min_seqno(void) const {
return seqcode >> 4;
}
// whether the operation set contains a load_check
inline bool load_check(void) const {
return (seqcode & kLoadCheck) != 0;
}
// whether the operation set contains a check point
inline bool check_point(void) const {
return (seqcode & kCheckPoint) != 0;
}
// whether the operation set contains a check point
// whether the operation set contains a check ack
inline bool check_ack(void) const {
return (seqcode & kCheckAck) != 0;
}
// whether the operation set contains a check point
// whether the operation set contains different sequence number
inline bool diff_seq(void) const {
return (seqcode & kDiffSeq) != 0;
}
@ -184,17 +188,64 @@ class AllReduceRobust : public AllReduceBase {
* when kSockError is returned, it simply means there are bad sockets in the links,
* and some link recovery proceduer is needed
*/
ReturnType TryResetLinks(void);
/*!
* \brief Run recovery execution of a action specified by flag and seqno,
* there can be two outcome of the function
*
* \param sendrecvbuf_
*
* \return if this function returns true, this means
* behind and we will be able to recover data from existing node
ReturnType TryResetLinks(void);
/*!
* \brief try to reconnect the broken links
* \return this function can kSuccess or kSockError
*/
bool RecoverExec(void *sendrecvbuf_, size_t size, int flag, int seqno);
ReturnType TryReConnectLinks(void);
/*!
* \brief if err_type indicates an error
* recover links according to the error type reported
* if there is no error, return true
* \param err_type the type of error happening in the system
* \return true if err_type is kSuccess, false otherwise
*/
bool CheckAndRecover(ReturnType err_type);
/*!
* \brief try to run recover execution for a request action described by flag and seqno,
* the function will keep blocking to run possible recovery operations before the specified action,
* until the requested result is received by a recovering procedure,
* or the function discovers that the requested action is not yet executed, and return false
*
* \param buf the buffer to store the result
* \param size the total size of the buffer
* \param flag flag information about the action \sa ActionSummary
* \param seqno sequence number of the action, if it is special action with flag set, seqno needs to be set to ActionSummary::kMaxSeq
*
* \return if this function can return true or false
* - true means buf already set to the
* result by recovering procedure, the action is complete, no further action is needed
* - false means this is the lastest action that has not yet been executed, need to execute the action
*/
bool RecoverExec(void *buf, size_t size, int flag, int seqno = ActionSummary::kMaxSeq);
/*!
* \brief try to load check point
*
* This is a collaborative function called by all nodes
* only the nodes with requester set to true really needs to load the check point
* other nodes acts as collaborative roles to complete this request
*
* \param requester whether current node is the requester
* \return this function can return kSuccess/kSockError/kGetExcept, see ReturnType for details
* \sa ReturnType
*/
ReturnType TryLoadCheckPoint(bool requester);
/*!
* \brief try to get the result of operation specified by seqno
*
* This is a collaborative function called by all nodes
* only the nodes with requester set to true really needs to get the result
* other nodes acts as collaborative roles to complete this request
*
* \param buf the buffer to store the result, this parameter is only use when current node is requester
* \param size the total size of the buffer, this parameter is only use when current node is requester
* \param seqno sequence number of the operation, this is unique index of a operation in current iteration
* \param requester whether current node is the requester
* \return this function can return kSuccess/kSockError/kGetExcept, see ReturnType for details
* \sa ReturnType
*/
ReturnType TryGetResult(void *buf, size_t size, int seqno, bool requester);
//---- recovery data structure ----
// call sequence counter, records how many calls we made so far
// from last call to CheckPoint, LoadCheckPoint