diff --git a/src/allreduce_base.cc b/src/allreduce_base.cc index 671c53877..4f43eb97b 100644 --- a/src/allreduce_base.cc +++ b/src/allreduce_base.cc @@ -213,7 +213,7 @@ void AllreduceBase::ReConnectLinks(const char *cmd) { } else { if (!all_links[i].sock.IsClosed()) all_links[i].sock.Close(); } - } + } int ngood = static_cast(good_link.size()); Assert(tracker.SendAll(&ngood, sizeof(ngood)) == sizeof(ngood), "ReConnectLink failure 5"); @@ -388,12 +388,17 @@ AllreduceBase::TryAllreduce(void *sendrecvbuf_, // exception handling for (int i = 0; i < nlink; ++i) { // recive OOB message from some link - if (selecter.CheckExcept(links[i].sock)) return kGetExcept; + if (selecter.CheckExcept(links[i].sock)) { + return ReportError(&links[i], kGetExcept); + } } // read data from childs for (int i = 0; i < nlink; ++i) { if (i != parent_index && selecter.CheckRead(links[i].sock)) { - if (!links[i].ReadToRingBuffer(size_up_out)) return kSockError; + ReturnType ret = links[i].ReadToRingBuffer(size_up_out); + if (ret != kSuccess) { + return ReportError(&links[i], ret); + } } } // this node have childs, peform reduce @@ -439,7 +444,10 @@ AllreduceBase::TryAllreduce(void *sendrecvbuf_, if (len != -1) { size_up_out += static_cast(len); } else { - if (errno != EAGAIN && errno != EWOULDBLOCK) return kSockError; + ReturnType ret = Errno2Return(errno); + if (ret != kSuccess) { + return ReportError(&links[parent_index], ret); + } } } // read data from parent @@ -448,14 +456,18 @@ AllreduceBase::TryAllreduce(void *sendrecvbuf_, ssize_t len = links[parent_index].sock. Recv(sendrecvbuf + size_down_in, total_size - size_down_in); if (len == 0) { - links[parent_index].sock.Close(); return kSockError; + links[parent_index].sock.Close(); + return ReportError(&links[parent_index], kRecvZeroLen); } if (len != -1) { size_down_in += static_cast(len); utils::Assert(size_down_in <= size_up_out, "Allreduce: boundary error"); } else { - if (errno != EAGAIN && errno != EWOULDBLOCK) return kSockError; + ReturnType ret = Errno2Return(errno); + if (ret != kSuccess) { + return ReportError(&links[parent_index], ret); + } } } } else { @@ -465,8 +477,9 @@ AllreduceBase::TryAllreduce(void *sendrecvbuf_, // can pass message down to childs for (int i = 0; i < nlink; ++i) { if (i != parent_index && selecter.CheckWrite(links[i].sock)) { - if (!links[i].WriteFromArray(sendrecvbuf, size_down_in)) { - return kSockError; + ReturnType ret = links[i].WriteFromArray(sendrecvbuf, size_down_in); + if (ret != kSuccess) { + return ReportError(&links[i], ret); } } } @@ -527,14 +540,17 @@ AllreduceBase::TryBroadcast(void *sendrecvbuf_, size_t total_size, int root) { // exception handling for (int i = 0; i < nlink; ++i) { // recive OOB message from some link - if (selecter.CheckExcept(links[i].sock)) return kGetExcept; + if (selecter.CheckExcept(links[i].sock)) { + return ReportError(&links[i], kGetExcept); + } } if (in_link == -2) { // probe in-link for (int i = 0; i < nlink; ++i) { if (selecter.CheckRead(links[i].sock)) { - if (!links[i].ReadToArray(sendrecvbuf_, total_size)) { - return kSockError; + ReturnType ret = links[i].ReadToArray(sendrecvbuf_, total_size); + if (ret != kSuccess) { + return ReportError(&links[i], ret); } size_in = links[i].size_read; if (size_in != 0) { @@ -554,7 +570,10 @@ AllreduceBase::TryBroadcast(void *sendrecvbuf_, size_t total_size, int root) { // send data to all out-link for (int i = 0; i < nlink; ++i) { if (i != in_link && selecter.CheckWrite(links[i].sock)) { - if (!links[i].WriteFromArray(sendrecvbuf_, size_in)) return kSockError; + ReturnType ret = links[i].WriteFromArray(sendrecvbuf_, size_in); + if (ret != kSuccess) { + return ReportError(&links[i], ret); + } } } } diff --git a/src/allreduce_base.h b/src/allreduce_base.h index aaee59312..003671ab4 100644 --- a/src/allreduce_base.h +++ b/src/allreduce_base.h @@ -201,6 +201,10 @@ class AllreduceBase : public IEngine { enum ReturnType { /*! \brief execution is successful */ kSuccess, + /*! \brief a link was reset by peer */ + kConnReset, + /*! \brief received a zero length message */ + kRecvZeroLen, /*! \brief a neighbor node go down, the connection is dropped */ kSockError, /*! @@ -209,6 +213,12 @@ class AllreduceBase : public IEngine { */ kGetExcept }; + /*! \brief translate errno to return type */ + inline static ReturnType Errno2Return(int errsv) { + if (errsv == EAGAIN || errsv == EWOULDBLOCK) return kSuccess; + if (errsv == ECONNRESET) return kConnReset; + return kSockError; + } // link record to a neighbor struct LinkRecord { public: @@ -249,22 +259,22 @@ class AllreduceBase : public IEngine { * position after protect_start * \param protect_start all data start from protect_start is still needed in buffer * read shall not override this - * \return true if it is an successful read, false if there is some error happens, check errno + * \return the type of reading */ - inline bool ReadToRingBuffer(size_t protect_start) { + inline ReturnType ReadToRingBuffer(size_t protect_start) { size_t ngap = size_read - protect_start; utils::Assert(ngap <= buffer_size, "Allreduce: boundary check"); size_t offset = size_read % buffer_size; size_t nmax = std::min(buffer_size - ngap, buffer_size - offset); - if (nmax == 0) return true; + if (nmax == 0) return kSuccess; ssize_t len = sock.Recv(buffer_head + offset, nmax); // length equals 0, remote disconnected if (len == 0) { - sock.Close(); return false; + sock.Close(); return kRecvZeroLen; } - if (len == -1) return errno == EAGAIN || errno == EWOULDBLOCK; + if (len == -1) return Errno2Return(errno); size_read += static_cast(len); - return true; + return kSuccess; } /*! * \brief read data into array, @@ -273,17 +283,17 @@ class AllreduceBase : public IEngine { * \param max_size maximum size of array * \return true if it is an successful read, false if there is some error happens, check errno */ - inline bool ReadToArray(void *recvbuf_, size_t max_size) { - if (max_size == size_read) return true; + inline ReturnType ReadToArray(void *recvbuf_, size_t max_size) { + if (max_size == size_read) return kSuccess; char *p = static_cast(recvbuf_); ssize_t len = sock.Recv(p + size_read, max_size - size_read); // length equals 0, remote disconnected if (len == 0) { - sock.Close(); return false; + sock.Close(); return kRecvZeroLen; } - if (len == -1) return errno == EAGAIN || errno == EWOULDBLOCK; + if (len == -1) return Errno2Return(errno); size_read += static_cast(len); - return true; + return kSuccess; } /*! * \brief write data in array to sock @@ -291,12 +301,12 @@ class AllreduceBase : public IEngine { * \param max_size maximum size of array * \return true if it is an successful write, false if there is some error happens, check errno */ - inline bool WriteFromArray(const void *sendbuf_, size_t max_size) { + inline ReturnType WriteFromArray(const void *sendbuf_, size_t max_size) { const char *p = static_cast(sendbuf_); ssize_t len = sock.Send(p + size_write, max_size - size_write); - if (len == -1) return errno == EAGAIN || errno == EWOULDBLOCK; + if (len == -1) return Errno2Return(errno); size_write += static_cast(len); - return true; + return kSuccess; } private: @@ -356,6 +366,14 @@ class AllreduceBase : public IEngine { * \sa ReturnType */ ReturnType TryBroadcast(void *sendrecvbuf_, size_t size, int root); + /*! + * \brief function used to report error when a link goes wrong + * \param link the pointer to the link who causes the error + * \param err the error type + */ + inline ReturnType ReportError(LinkRecord *link, ReturnType err) { + err_link = link; return err; + } //---- data structure related to model ---- // call sequence counter, records how many calls we made so far // from last call to CheckPoint, LoadCheckPoint @@ -371,6 +389,8 @@ class AllreduceBase : public IEngine { int parent_rank; // sockets of all links this connects to std::vector all_links; + // used to record the link where things goes wrong + LinkRecord *err_link; // all the links in the reduction tree connection RefLinkVector tree_links; // pointer to links in the ring diff --git a/src/allreduce_robust-inl.h b/src/allreduce_robust-inl.h index e0250e426..d8cc8dcdd 100644 --- a/src/allreduce_robust-inl.h +++ b/src/allreduce_robust-inl.h @@ -97,7 +97,9 @@ AllreduceRobust::MsgPassing(const NodeType &node_value, // exception handling for (int i = 0; i < nlink; ++i) { // recive OOB message from some link - if (selecter.CheckExcept(links[i].sock)) return kGetExcept; + if (selecter.CheckExcept(links[i].sock)) { + return ReportError(&links[i], kGetExcept); + } } if (stage == 0) { bool finished = true; @@ -105,9 +107,8 @@ AllreduceRobust::MsgPassing(const NodeType &node_value, for (int i = 0; i < nlink; ++i) { if (i != parent_index) { if (selecter.CheckRead(links[i].sock)) { - if (!links[i].ReadToArray(&edge_in[i], sizeof(EdgeType))) { - return kSockError; - } + ReturnType ret = links[i].ReadToArray(&edge_in[i], sizeof(EdgeType)); + if (ret != kSuccess) return ReportError(&links[i], ret); } if (links[i].size_read != sizeof(EdgeType)) finished = false; } @@ -128,17 +129,15 @@ AllreduceRobust::MsgPassing(const NodeType &node_value, if (stage == 1) { const int pid = this->parent_index; utils::Assert(pid != -1, "MsgPassing invalid stage"); - if (!links[pid].WriteFromArray(&edge_out[pid], sizeof(EdgeType))) { - return kSockError; - } + ReturnType ret = links[pid].WriteFromArray(&edge_out[pid], sizeof(EdgeType)); + if (ret != kSuccess) return ReportError(&links[pid], ret); if (links[pid].size_write == sizeof(EdgeType)) stage = 2; } if (stage == 2) { const int pid = this->parent_index; utils::Assert(pid != -1, "MsgPassing invalid stage"); - if (!links[pid].ReadToArray(&edge_in[pid], sizeof(EdgeType))) { - return kSockError; - } + ReturnType ret = links[pid].ReadToArray(&edge_in[pid], sizeof(EdgeType)); + if (ret != kSuccess) return ReportError(&links[pid], ret); if (links[pid].size_read == sizeof(EdgeType)) { for (int i = 0; i < nlink; ++i) { if (i != pid) edge_out[i] = func(node_value, edge_in, i); @@ -149,9 +148,8 @@ AllreduceRobust::MsgPassing(const NodeType &node_value, if (stage == 3) { for (int i = 0; i < nlink; ++i) { if (i != parent_index && links[i].size_write != sizeof(EdgeType)) { - if (!links[i].WriteFromArray(&edge_out[i], sizeof(EdgeType))) { - return kSockError; - } + ReturnType ret = links[i].WriteFromArray(&edge_out[i], sizeof(EdgeType)); + if (ret != kSuccess) return ReportError(&links[i], ret); } } } diff --git a/src/allreduce_robust.cc b/src/allreduce_robust.cc index 90a0f4fac..25c221f18 100644 --- a/src/allreduce_robust.cc +++ b/src/allreduce_robust.cc @@ -626,18 +626,24 @@ AllreduceRobust::TryRecoverData(RecoverType role, selecter.Select(); // exception handling for (int i = 0; i < nlink; ++i) { - if (selecter.CheckExcept(links[i].sock)) return kGetExcept; + if (selecter.CheckExcept(links[i].sock)) { + return ReportError(&links[i], kGetExcept); + } } if (role == kRequestData) { const int pid = recv_link; if (selecter.CheckRead(links[pid].sock)) { - if (!links[pid].ReadToArray(sendrecvbuf_, size)) return kSockError; + ReturnType ret = links[pid].ReadToArray(sendrecvbuf_, size); + if (ret != kSuccess) { + return ReportError(&links[pid], ret); + } } for (int i = 0; i < nlink; ++i) { if (req_in[i] && links[i].size_write != links[pid].size_read && selecter.CheckWrite(links[i].sock)) { - if (!links[i].WriteFromArray(sendrecvbuf_, links[pid].size_read)) { - return kSockError; + ReturnType ret = links[i].WriteFromArray(sendrecvbuf_, links[pid].size_read); + if (ret != kSuccess) { + return ReportError(&links[i], ret); } } } @@ -645,8 +651,9 @@ AllreduceRobust::TryRecoverData(RecoverType role, if (role == kHaveData) { for (int i = 0; i < nlink; ++i) { if (req_in[i] && selecter.CheckWrite(links[i].sock)) { - if (!links[i].WriteFromArray(sendrecvbuf_, size)) { - return kSockError; + ReturnType ret = links[i].WriteFromArray(sendrecvbuf_, size); + if (ret != kSuccess) { + return ReportError(&links[i], ret); } } } @@ -660,7 +667,10 @@ AllreduceRobust::TryRecoverData(RecoverType role, if (req_in[i]) min_write = std::min(links[i].size_write, min_write); } utils::Assert(min_write <= links[pid].size_read, "boundary check"); - if (!links[pid].ReadToRingBuffer(min_write)) return kSockError; + ReturnType ret = links[pid].ReadToRingBuffer(min_write); + if (ret != kSuccess) { + return ReportError(&links[pid], ret); + } } for (int i = 0; i < nlink; ++i) { if (req_in[i] && selecter.CheckWrite(links[i].sock) && @@ -672,7 +682,8 @@ AllreduceRobust::TryRecoverData(RecoverType role, if (len != -1) { links[i].size_write += len; } else { - if (errno != EAGAIN && errno != EWOULDBLOCK) return kSockError; + ReturnType ret = Errno2Return(errno); + if (ret != kSuccess) return ReportError(&links[i], ret); } } } @@ -1131,17 +1142,18 @@ AllreduceRobust::RingPassing(void *sendrecvbuf_, selecter.WatchException(next.sock); if (finished) break; selecter.Select(); - if (selecter.CheckExcept(prev.sock)) return kGetExcept; - if (selecter.CheckExcept(next.sock)) return kGetExcept; + if (selecter.CheckExcept(prev.sock)) return ReportError(&prev, kGetExcept); + if (selecter.CheckExcept(next.sock)) return ReportError(&next, kGetExcept); if (read_ptr != read_end && selecter.CheckRead(prev.sock)) { ssize_t len = prev.sock.Recv(buf + read_ptr, read_end - read_ptr); if (len == 0) { - prev.sock.Close(); return kSockError; + prev.sock.Close(); return ReportError(&prev, kRecvZeroLen); } if (len != -1) { read_ptr += static_cast(len); } else { - if (errno != EAGAIN && errno != EWOULDBLOCK) return kSockError; + ReturnType ret = Errno2Return(errno); + if (ret != kSuccess) return ReportError(&prev, ret); } } if (write_ptr != write_end && write_ptr < read_ptr && @@ -1151,7 +1163,8 @@ AllreduceRobust::RingPassing(void *sendrecvbuf_, if (len != -1) { write_ptr += static_cast(len); } else { - if (errno != EAGAIN && errno != EWOULDBLOCK) return kSockError; + ReturnType ret = Errno2Return(errno); + if (ret != kSuccess) return ReportError(&prev, ret); } } }