add more error report when things goes wrong, need review
This commit is contained in:
parent
968b33ec79
commit
a57c5c5425
@ -213,7 +213,7 @@ void AllreduceBase::ReConnectLinks(const char *cmd) {
|
|||||||
} else {
|
} else {
|
||||||
if (!all_links[i].sock.IsClosed()) all_links[i].sock.Close();
|
if (!all_links[i].sock.IsClosed()) all_links[i].sock.Close();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
int ngood = static_cast<int>(good_link.size());
|
int ngood = static_cast<int>(good_link.size());
|
||||||
Assert(tracker.SendAll(&ngood, sizeof(ngood)) == sizeof(ngood),
|
Assert(tracker.SendAll(&ngood, sizeof(ngood)) == sizeof(ngood),
|
||||||
"ReConnectLink failure 5");
|
"ReConnectLink failure 5");
|
||||||
@ -388,12 +388,17 @@ AllreduceBase::TryAllreduce(void *sendrecvbuf_,
|
|||||||
// exception handling
|
// exception handling
|
||||||
for (int i = 0; i < nlink; ++i) {
|
for (int i = 0; i < nlink; ++i) {
|
||||||
// recive OOB message from some link
|
// recive OOB message from some link
|
||||||
if (selecter.CheckExcept(links[i].sock)) return kGetExcept;
|
if (selecter.CheckExcept(links[i].sock)) {
|
||||||
|
return ReportError(&links[i], kGetExcept);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
// read data from childs
|
// read data from childs
|
||||||
for (int i = 0; i < nlink; ++i) {
|
for (int i = 0; i < nlink; ++i) {
|
||||||
if (i != parent_index && selecter.CheckRead(links[i].sock)) {
|
if (i != parent_index && selecter.CheckRead(links[i].sock)) {
|
||||||
if (!links[i].ReadToRingBuffer(size_up_out)) return kSockError;
|
ReturnType ret = links[i].ReadToRingBuffer(size_up_out);
|
||||||
|
if (ret != kSuccess) {
|
||||||
|
return ReportError(&links[i], ret);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// this node have childs, peform reduce
|
// this node have childs, peform reduce
|
||||||
@ -439,7 +444,10 @@ AllreduceBase::TryAllreduce(void *sendrecvbuf_,
|
|||||||
if (len != -1) {
|
if (len != -1) {
|
||||||
size_up_out += static_cast<size_t>(len);
|
size_up_out += static_cast<size_t>(len);
|
||||||
} else {
|
} else {
|
||||||
if (errno != EAGAIN && errno != EWOULDBLOCK) return kSockError;
|
ReturnType ret = Errno2Return(errno);
|
||||||
|
if (ret != kSuccess) {
|
||||||
|
return ReportError(&links[parent_index], ret);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// read data from parent
|
// read data from parent
|
||||||
@ -448,14 +456,18 @@ AllreduceBase::TryAllreduce(void *sendrecvbuf_,
|
|||||||
ssize_t len = links[parent_index].sock.
|
ssize_t len = links[parent_index].sock.
|
||||||
Recv(sendrecvbuf + size_down_in, total_size - size_down_in);
|
Recv(sendrecvbuf + size_down_in, total_size - size_down_in);
|
||||||
if (len == 0) {
|
if (len == 0) {
|
||||||
links[parent_index].sock.Close(); return kSockError;
|
links[parent_index].sock.Close();
|
||||||
|
return ReportError(&links[parent_index], kRecvZeroLen);
|
||||||
}
|
}
|
||||||
if (len != -1) {
|
if (len != -1) {
|
||||||
size_down_in += static_cast<size_t>(len);
|
size_down_in += static_cast<size_t>(len);
|
||||||
utils::Assert(size_down_in <= size_up_out,
|
utils::Assert(size_down_in <= size_up_out,
|
||||||
"Allreduce: boundary error");
|
"Allreduce: boundary error");
|
||||||
} else {
|
} else {
|
||||||
if (errno != EAGAIN && errno != EWOULDBLOCK) return kSockError;
|
ReturnType ret = Errno2Return(errno);
|
||||||
|
if (ret != kSuccess) {
|
||||||
|
return ReportError(&links[parent_index], ret);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
@ -465,8 +477,9 @@ AllreduceBase::TryAllreduce(void *sendrecvbuf_,
|
|||||||
// can pass message down to childs
|
// can pass message down to childs
|
||||||
for (int i = 0; i < nlink; ++i) {
|
for (int i = 0; i < nlink; ++i) {
|
||||||
if (i != parent_index && selecter.CheckWrite(links[i].sock)) {
|
if (i != parent_index && selecter.CheckWrite(links[i].sock)) {
|
||||||
if (!links[i].WriteFromArray(sendrecvbuf, size_down_in)) {
|
ReturnType ret = links[i].WriteFromArray(sendrecvbuf, size_down_in);
|
||||||
return kSockError;
|
if (ret != kSuccess) {
|
||||||
|
return ReportError(&links[i], ret);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -527,14 +540,17 @@ AllreduceBase::TryBroadcast(void *sendrecvbuf_, size_t total_size, int root) {
|
|||||||
// exception handling
|
// exception handling
|
||||||
for (int i = 0; i < nlink; ++i) {
|
for (int i = 0; i < nlink; ++i) {
|
||||||
// recive OOB message from some link
|
// recive OOB message from some link
|
||||||
if (selecter.CheckExcept(links[i].sock)) return kGetExcept;
|
if (selecter.CheckExcept(links[i].sock)) {
|
||||||
|
return ReportError(&links[i], kGetExcept);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if (in_link == -2) {
|
if (in_link == -2) {
|
||||||
// probe in-link
|
// probe in-link
|
||||||
for (int i = 0; i < nlink; ++i) {
|
for (int i = 0; i < nlink; ++i) {
|
||||||
if (selecter.CheckRead(links[i].sock)) {
|
if (selecter.CheckRead(links[i].sock)) {
|
||||||
if (!links[i].ReadToArray(sendrecvbuf_, total_size)) {
|
ReturnType ret = links[i].ReadToArray(sendrecvbuf_, total_size);
|
||||||
return kSockError;
|
if (ret != kSuccess) {
|
||||||
|
return ReportError(&links[i], ret);
|
||||||
}
|
}
|
||||||
size_in = links[i].size_read;
|
size_in = links[i].size_read;
|
||||||
if (size_in != 0) {
|
if (size_in != 0) {
|
||||||
@ -554,7 +570,10 @@ AllreduceBase::TryBroadcast(void *sendrecvbuf_, size_t total_size, int root) {
|
|||||||
// send data to all out-link
|
// send data to all out-link
|
||||||
for (int i = 0; i < nlink; ++i) {
|
for (int i = 0; i < nlink; ++i) {
|
||||||
if (i != in_link && selecter.CheckWrite(links[i].sock)) {
|
if (i != in_link && selecter.CheckWrite(links[i].sock)) {
|
||||||
if (!links[i].WriteFromArray(sendrecvbuf_, size_in)) return kSockError;
|
ReturnType ret = links[i].WriteFromArray(sendrecvbuf_, size_in);
|
||||||
|
if (ret != kSuccess) {
|
||||||
|
return ReportError(&links[i], ret);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -201,6 +201,10 @@ class AllreduceBase : public IEngine {
|
|||||||
enum ReturnType {
|
enum ReturnType {
|
||||||
/*! \brief execution is successful */
|
/*! \brief execution is successful */
|
||||||
kSuccess,
|
kSuccess,
|
||||||
|
/*! \brief a link was reset by peer */
|
||||||
|
kConnReset,
|
||||||
|
/*! \brief received a zero length message */
|
||||||
|
kRecvZeroLen,
|
||||||
/*! \brief a neighbor node go down, the connection is dropped */
|
/*! \brief a neighbor node go down, the connection is dropped */
|
||||||
kSockError,
|
kSockError,
|
||||||
/*!
|
/*!
|
||||||
@ -209,6 +213,12 @@ class AllreduceBase : public IEngine {
|
|||||||
*/
|
*/
|
||||||
kGetExcept
|
kGetExcept
|
||||||
};
|
};
|
||||||
|
/*! \brief translate errno to return type */
|
||||||
|
inline static ReturnType Errno2Return(int errsv) {
|
||||||
|
if (errsv == EAGAIN || errsv == EWOULDBLOCK) return kSuccess;
|
||||||
|
if (errsv == ECONNRESET) return kConnReset;
|
||||||
|
return kSockError;
|
||||||
|
}
|
||||||
// link record to a neighbor
|
// link record to a neighbor
|
||||||
struct LinkRecord {
|
struct LinkRecord {
|
||||||
public:
|
public:
|
||||||
@ -249,22 +259,22 @@ class AllreduceBase : public IEngine {
|
|||||||
* position after protect_start
|
* position after protect_start
|
||||||
* \param protect_start all data start from protect_start is still needed in buffer
|
* \param protect_start all data start from protect_start is still needed in buffer
|
||||||
* read shall not override this
|
* read shall not override this
|
||||||
* \return true if it is an successful read, false if there is some error happens, check errno
|
* \return the type of reading
|
||||||
*/
|
*/
|
||||||
inline bool ReadToRingBuffer(size_t protect_start) {
|
inline ReturnType ReadToRingBuffer(size_t protect_start) {
|
||||||
size_t ngap = size_read - protect_start;
|
size_t ngap = size_read - protect_start;
|
||||||
utils::Assert(ngap <= buffer_size, "Allreduce: boundary check");
|
utils::Assert(ngap <= buffer_size, "Allreduce: boundary check");
|
||||||
size_t offset = size_read % buffer_size;
|
size_t offset = size_read % buffer_size;
|
||||||
size_t nmax = std::min(buffer_size - ngap, buffer_size - offset);
|
size_t nmax = std::min(buffer_size - ngap, buffer_size - offset);
|
||||||
if (nmax == 0) return true;
|
if (nmax == 0) return kSuccess;
|
||||||
ssize_t len = sock.Recv(buffer_head + offset, nmax);
|
ssize_t len = sock.Recv(buffer_head + offset, nmax);
|
||||||
// length equals 0, remote disconnected
|
// length equals 0, remote disconnected
|
||||||
if (len == 0) {
|
if (len == 0) {
|
||||||
sock.Close(); return false;
|
sock.Close(); return kRecvZeroLen;
|
||||||
}
|
}
|
||||||
if (len == -1) return errno == EAGAIN || errno == EWOULDBLOCK;
|
if (len == -1) return Errno2Return(errno);
|
||||||
size_read += static_cast<size_t>(len);
|
size_read += static_cast<size_t>(len);
|
||||||
return true;
|
return kSuccess;
|
||||||
}
|
}
|
||||||
/*!
|
/*!
|
||||||
* \brief read data into array,
|
* \brief read data into array,
|
||||||
@ -273,17 +283,17 @@ class AllreduceBase : public IEngine {
|
|||||||
* \param max_size maximum size of array
|
* \param max_size maximum size of array
|
||||||
* \return true if it is an successful read, false if there is some error happens, check errno
|
* \return true if it is an successful read, false if there is some error happens, check errno
|
||||||
*/
|
*/
|
||||||
inline bool ReadToArray(void *recvbuf_, size_t max_size) {
|
inline ReturnType ReadToArray(void *recvbuf_, size_t max_size) {
|
||||||
if (max_size == size_read) return true;
|
if (max_size == size_read) return kSuccess;
|
||||||
char *p = static_cast<char*>(recvbuf_);
|
char *p = static_cast<char*>(recvbuf_);
|
||||||
ssize_t len = sock.Recv(p + size_read, max_size - size_read);
|
ssize_t len = sock.Recv(p + size_read, max_size - size_read);
|
||||||
// length equals 0, remote disconnected
|
// length equals 0, remote disconnected
|
||||||
if (len == 0) {
|
if (len == 0) {
|
||||||
sock.Close(); return false;
|
sock.Close(); return kRecvZeroLen;
|
||||||
}
|
}
|
||||||
if (len == -1) return errno == EAGAIN || errno == EWOULDBLOCK;
|
if (len == -1) return Errno2Return(errno);
|
||||||
size_read += static_cast<size_t>(len);
|
size_read += static_cast<size_t>(len);
|
||||||
return true;
|
return kSuccess;
|
||||||
}
|
}
|
||||||
/*!
|
/*!
|
||||||
* \brief write data in array to sock
|
* \brief write data in array to sock
|
||||||
@ -291,12 +301,12 @@ class AllreduceBase : public IEngine {
|
|||||||
* \param max_size maximum size of array
|
* \param max_size maximum size of array
|
||||||
* \return true if it is an successful write, false if there is some error happens, check errno
|
* \return true if it is an successful write, false if there is some error happens, check errno
|
||||||
*/
|
*/
|
||||||
inline bool WriteFromArray(const void *sendbuf_, size_t max_size) {
|
inline ReturnType WriteFromArray(const void *sendbuf_, size_t max_size) {
|
||||||
const char *p = static_cast<const char*>(sendbuf_);
|
const char *p = static_cast<const char*>(sendbuf_);
|
||||||
ssize_t len = sock.Send(p + size_write, max_size - size_write);
|
ssize_t len = sock.Send(p + size_write, max_size - size_write);
|
||||||
if (len == -1) return errno == EAGAIN || errno == EWOULDBLOCK;
|
if (len == -1) return Errno2Return(errno);
|
||||||
size_write += static_cast<size_t>(len);
|
size_write += static_cast<size_t>(len);
|
||||||
return true;
|
return kSuccess;
|
||||||
}
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
@ -356,6 +366,14 @@ class AllreduceBase : public IEngine {
|
|||||||
* \sa ReturnType
|
* \sa ReturnType
|
||||||
*/
|
*/
|
||||||
ReturnType TryBroadcast(void *sendrecvbuf_, size_t size, int root);
|
ReturnType TryBroadcast(void *sendrecvbuf_, size_t size, int root);
|
||||||
|
/*!
|
||||||
|
* \brief function used to report error when a link goes wrong
|
||||||
|
* \param link the pointer to the link who causes the error
|
||||||
|
* \param err the error type
|
||||||
|
*/
|
||||||
|
inline ReturnType ReportError(LinkRecord *link, ReturnType err) {
|
||||||
|
err_link = link; return err;
|
||||||
|
}
|
||||||
//---- data structure related to model ----
|
//---- data structure related to model ----
|
||||||
// call sequence counter, records how many calls we made so far
|
// call sequence counter, records how many calls we made so far
|
||||||
// from last call to CheckPoint, LoadCheckPoint
|
// from last call to CheckPoint, LoadCheckPoint
|
||||||
@ -371,6 +389,8 @@ class AllreduceBase : public IEngine {
|
|||||||
int parent_rank;
|
int parent_rank;
|
||||||
// sockets of all links this connects to
|
// sockets of all links this connects to
|
||||||
std::vector<LinkRecord> all_links;
|
std::vector<LinkRecord> all_links;
|
||||||
|
// used to record the link where things goes wrong
|
||||||
|
LinkRecord *err_link;
|
||||||
// all the links in the reduction tree connection
|
// all the links in the reduction tree connection
|
||||||
RefLinkVector tree_links;
|
RefLinkVector tree_links;
|
||||||
// pointer to links in the ring
|
// pointer to links in the ring
|
||||||
|
|||||||
@ -97,7 +97,9 @@ AllreduceRobust::MsgPassing(const NodeType &node_value,
|
|||||||
// exception handling
|
// exception handling
|
||||||
for (int i = 0; i < nlink; ++i) {
|
for (int i = 0; i < nlink; ++i) {
|
||||||
// recive OOB message from some link
|
// recive OOB message from some link
|
||||||
if (selecter.CheckExcept(links[i].sock)) return kGetExcept;
|
if (selecter.CheckExcept(links[i].sock)) {
|
||||||
|
return ReportError(&links[i], kGetExcept);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if (stage == 0) {
|
if (stage == 0) {
|
||||||
bool finished = true;
|
bool finished = true;
|
||||||
@ -105,9 +107,8 @@ AllreduceRobust::MsgPassing(const NodeType &node_value,
|
|||||||
for (int i = 0; i < nlink; ++i) {
|
for (int i = 0; i < nlink; ++i) {
|
||||||
if (i != parent_index) {
|
if (i != parent_index) {
|
||||||
if (selecter.CheckRead(links[i].sock)) {
|
if (selecter.CheckRead(links[i].sock)) {
|
||||||
if (!links[i].ReadToArray(&edge_in[i], sizeof(EdgeType))) {
|
ReturnType ret = links[i].ReadToArray(&edge_in[i], sizeof(EdgeType));
|
||||||
return kSockError;
|
if (ret != kSuccess) return ReportError(&links[i], ret);
|
||||||
}
|
|
||||||
}
|
}
|
||||||
if (links[i].size_read != sizeof(EdgeType)) finished = false;
|
if (links[i].size_read != sizeof(EdgeType)) finished = false;
|
||||||
}
|
}
|
||||||
@ -128,17 +129,15 @@ AllreduceRobust::MsgPassing(const NodeType &node_value,
|
|||||||
if (stage == 1) {
|
if (stage == 1) {
|
||||||
const int pid = this->parent_index;
|
const int pid = this->parent_index;
|
||||||
utils::Assert(pid != -1, "MsgPassing invalid stage");
|
utils::Assert(pid != -1, "MsgPassing invalid stage");
|
||||||
if (!links[pid].WriteFromArray(&edge_out[pid], sizeof(EdgeType))) {
|
ReturnType ret = links[pid].WriteFromArray(&edge_out[pid], sizeof(EdgeType));
|
||||||
return kSockError;
|
if (ret != kSuccess) return ReportError(&links[pid], ret);
|
||||||
}
|
|
||||||
if (links[pid].size_write == sizeof(EdgeType)) stage = 2;
|
if (links[pid].size_write == sizeof(EdgeType)) stage = 2;
|
||||||
}
|
}
|
||||||
if (stage == 2) {
|
if (stage == 2) {
|
||||||
const int pid = this->parent_index;
|
const int pid = this->parent_index;
|
||||||
utils::Assert(pid != -1, "MsgPassing invalid stage");
|
utils::Assert(pid != -1, "MsgPassing invalid stage");
|
||||||
if (!links[pid].ReadToArray(&edge_in[pid], sizeof(EdgeType))) {
|
ReturnType ret = links[pid].ReadToArray(&edge_in[pid], sizeof(EdgeType));
|
||||||
return kSockError;
|
if (ret != kSuccess) return ReportError(&links[pid], ret);
|
||||||
}
|
|
||||||
if (links[pid].size_read == sizeof(EdgeType)) {
|
if (links[pid].size_read == sizeof(EdgeType)) {
|
||||||
for (int i = 0; i < nlink; ++i) {
|
for (int i = 0; i < nlink; ++i) {
|
||||||
if (i != pid) edge_out[i] = func(node_value, edge_in, i);
|
if (i != pid) edge_out[i] = func(node_value, edge_in, i);
|
||||||
@ -149,9 +148,8 @@ AllreduceRobust::MsgPassing(const NodeType &node_value,
|
|||||||
if (stage == 3) {
|
if (stage == 3) {
|
||||||
for (int i = 0; i < nlink; ++i) {
|
for (int i = 0; i < nlink; ++i) {
|
||||||
if (i != parent_index && links[i].size_write != sizeof(EdgeType)) {
|
if (i != parent_index && links[i].size_write != sizeof(EdgeType)) {
|
||||||
if (!links[i].WriteFromArray(&edge_out[i], sizeof(EdgeType))) {
|
ReturnType ret = links[i].WriteFromArray(&edge_out[i], sizeof(EdgeType));
|
||||||
return kSockError;
|
if (ret != kSuccess) return ReportError(&links[i], ret);
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -626,18 +626,24 @@ AllreduceRobust::TryRecoverData(RecoverType role,
|
|||||||
selecter.Select();
|
selecter.Select();
|
||||||
// exception handling
|
// exception handling
|
||||||
for (int i = 0; i < nlink; ++i) {
|
for (int i = 0; i < nlink; ++i) {
|
||||||
if (selecter.CheckExcept(links[i].sock)) return kGetExcept;
|
if (selecter.CheckExcept(links[i].sock)) {
|
||||||
|
return ReportError(&links[i], kGetExcept);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if (role == kRequestData) {
|
if (role == kRequestData) {
|
||||||
const int pid = recv_link;
|
const int pid = recv_link;
|
||||||
if (selecter.CheckRead(links[pid].sock)) {
|
if (selecter.CheckRead(links[pid].sock)) {
|
||||||
if (!links[pid].ReadToArray(sendrecvbuf_, size)) return kSockError;
|
ReturnType ret = links[pid].ReadToArray(sendrecvbuf_, size);
|
||||||
|
if (ret != kSuccess) {
|
||||||
|
return ReportError(&links[pid], ret);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
for (int i = 0; i < nlink; ++i) {
|
for (int i = 0; i < nlink; ++i) {
|
||||||
if (req_in[i] && links[i].size_write != links[pid].size_read &&
|
if (req_in[i] && links[i].size_write != links[pid].size_read &&
|
||||||
selecter.CheckWrite(links[i].sock)) {
|
selecter.CheckWrite(links[i].sock)) {
|
||||||
if (!links[i].WriteFromArray(sendrecvbuf_, links[pid].size_read)) {
|
ReturnType ret = links[i].WriteFromArray(sendrecvbuf_, links[pid].size_read);
|
||||||
return kSockError;
|
if (ret != kSuccess) {
|
||||||
|
return ReportError(&links[i], ret);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -645,8 +651,9 @@ AllreduceRobust::TryRecoverData(RecoverType role,
|
|||||||
if (role == kHaveData) {
|
if (role == kHaveData) {
|
||||||
for (int i = 0; i < nlink; ++i) {
|
for (int i = 0; i < nlink; ++i) {
|
||||||
if (req_in[i] && selecter.CheckWrite(links[i].sock)) {
|
if (req_in[i] && selecter.CheckWrite(links[i].sock)) {
|
||||||
if (!links[i].WriteFromArray(sendrecvbuf_, size)) {
|
ReturnType ret = links[i].WriteFromArray(sendrecvbuf_, size);
|
||||||
return kSockError;
|
if (ret != kSuccess) {
|
||||||
|
return ReportError(&links[i], ret);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -660,7 +667,10 @@ AllreduceRobust::TryRecoverData(RecoverType role,
|
|||||||
if (req_in[i]) min_write = std::min(links[i].size_write, min_write);
|
if (req_in[i]) min_write = std::min(links[i].size_write, min_write);
|
||||||
}
|
}
|
||||||
utils::Assert(min_write <= links[pid].size_read, "boundary check");
|
utils::Assert(min_write <= links[pid].size_read, "boundary check");
|
||||||
if (!links[pid].ReadToRingBuffer(min_write)) return kSockError;
|
ReturnType ret = links[pid].ReadToRingBuffer(min_write);
|
||||||
|
if (ret != kSuccess) {
|
||||||
|
return ReportError(&links[pid], ret);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
for (int i = 0; i < nlink; ++i) {
|
for (int i = 0; i < nlink; ++i) {
|
||||||
if (req_in[i] && selecter.CheckWrite(links[i].sock) &&
|
if (req_in[i] && selecter.CheckWrite(links[i].sock) &&
|
||||||
@ -672,7 +682,8 @@ AllreduceRobust::TryRecoverData(RecoverType role,
|
|||||||
if (len != -1) {
|
if (len != -1) {
|
||||||
links[i].size_write += len;
|
links[i].size_write += len;
|
||||||
} else {
|
} else {
|
||||||
if (errno != EAGAIN && errno != EWOULDBLOCK) return kSockError;
|
ReturnType ret = Errno2Return(errno);
|
||||||
|
if (ret != kSuccess) return ReportError(&links[i], ret);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1131,17 +1142,18 @@ AllreduceRobust::RingPassing(void *sendrecvbuf_,
|
|||||||
selecter.WatchException(next.sock);
|
selecter.WatchException(next.sock);
|
||||||
if (finished) break;
|
if (finished) break;
|
||||||
selecter.Select();
|
selecter.Select();
|
||||||
if (selecter.CheckExcept(prev.sock)) return kGetExcept;
|
if (selecter.CheckExcept(prev.sock)) return ReportError(&prev, kGetExcept);
|
||||||
if (selecter.CheckExcept(next.sock)) return kGetExcept;
|
if (selecter.CheckExcept(next.sock)) return ReportError(&next, kGetExcept);
|
||||||
if (read_ptr != read_end && selecter.CheckRead(prev.sock)) {
|
if (read_ptr != read_end && selecter.CheckRead(prev.sock)) {
|
||||||
ssize_t len = prev.sock.Recv(buf + read_ptr, read_end - read_ptr);
|
ssize_t len = prev.sock.Recv(buf + read_ptr, read_end - read_ptr);
|
||||||
if (len == 0) {
|
if (len == 0) {
|
||||||
prev.sock.Close(); return kSockError;
|
prev.sock.Close(); return ReportError(&prev, kRecvZeroLen);
|
||||||
}
|
}
|
||||||
if (len != -1) {
|
if (len != -1) {
|
||||||
read_ptr += static_cast<size_t>(len);
|
read_ptr += static_cast<size_t>(len);
|
||||||
} else {
|
} else {
|
||||||
if (errno != EAGAIN && errno != EWOULDBLOCK) return kSockError;
|
ReturnType ret = Errno2Return(errno);
|
||||||
|
if (ret != kSuccess) return ReportError(&prev, ret);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (write_ptr != write_end && write_ptr < read_ptr &&
|
if (write_ptr != write_end && write_ptr < read_ptr &&
|
||||||
@ -1151,7 +1163,8 @@ AllreduceRobust::RingPassing(void *sendrecvbuf_,
|
|||||||
if (len != -1) {
|
if (len != -1) {
|
||||||
write_ptr += static_cast<size_t>(len);
|
write_ptr += static_cast<size_t>(len);
|
||||||
} else {
|
} else {
|
||||||
if (errno != EAGAIN && errno != EWOULDBLOCK) return kSockError;
|
ReturnType ret = Errno2Return(errno);
|
||||||
|
if (ret != kSuccess) return ReportError(&prev, ret);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user