seems a OK version of reset, start to work on decide exec

This commit is contained in:
tqchen 2014-11-29 22:22:51 -08:00
parent 5b0bb53184
commit 155ed3a814
3 changed files with 40 additions and 29 deletions

View File

@ -17,18 +17,14 @@ void AllReduceRobust::AllReduce(void *sendrecvbuf_,
size_t type_nbytes, size_t type_nbytes,
size_t count, size_t count,
ReduceFunction reducer) { ReduceFunction reducer) {
utils::LogPrintf("[%d] call AllReduce", rank);
TryResetLinks();
utils::LogPrintf("[%d] start work", rank);
while (true) { while (true) {
ReturnType ret = TryAllReduce(sendrecvbuf_, type_nbytes, count, reducer); ReturnType ret = TryAllReduce(sendrecvbuf_, type_nbytes, count, reducer);
if (ret == kSuccess) return; if (ret == kSuccess) return;
if (ret == kSockError) { if (ret == kSockError) {
utils::Error("error occur during all reduce\n"); utils::Error("error occur during all reduce\n");
} }
utils::LogPrintf("[%d] receive except signal, start reset link", rank); utils::LogPrintf("[%d] receive except signal, start reset link\n", rank);
TryResetLinks(); TryResetLinks();
//utils::Check(TryResetLinks() == kSuccess, "error when reset links");
} }
// TODO // TODO
} }
@ -70,13 +66,13 @@ void AllReduceRobust::CheckPoint(const utils::ISerializable &model) {
* and some link recovery proceduer is needed * and some link recovery proceduer is needed
*/ */
AllReduceRobust::ReturnType AllReduceRobust::TryResetLinks(void) { AllReduceRobust::ReturnType AllReduceRobust::TryResetLinks(void) {
utils::LogPrintf("[%d] TryResetLinks, start\n", rank);
// number of links // number of links
const int nlink = static_cast<int>(links.size()); const int nlink = static_cast<int>(links.size());
for (int i = 0; i < nlink; ++i) { for (int i = 0; i < nlink; ++i) {
links[i].InitBuffer(sizeof(int), 1 << 10, reduce_buffer_size); links[i].InitBuffer(sizeof(int), 1 << 10, reduce_buffer_size);
links[i].ResetSize(); links[i].ResetSize();
} }
// read and discard data from all channels until pass mark // read and discard data from all channels until pass mark
while (true) { while (true) {
for (int i = 0; i < nlink; ++i) { for (int i = 0; i < nlink; ++i) {
@ -92,6 +88,25 @@ AllReduceRobust::ReturnType AllReduceRobust::TryResetLinks(void) {
ssize_t len = links[i].sock.Send(&sig, sizeof(sig)); ssize_t len = links[i].sock.Send(&sig, sizeof(sig));
if (len == sizeof(sig)) links[i].size_write = 2; if (len == sizeof(sig)) links[i].size_write = 2;
} }
}
utils::SelectHelper rsel;
bool finished = true;
for (int i = 0; i < nlink; ++i) {
if (links[i].size_write != 2 && !links[i].sock.BadSocket()) {
rsel.WatchWrite(links[i].sock); finished = false;
}
}
if (finished) break;
// wait to read from the channels to discard data
rsel.Select();
}
for (int i = 0; i < nlink; ++i) {
if (!links[i].sock.BadSocket()) {
utils::SelectHelper::WaitExcept(links[i].sock);
}
}
while (true) {
for (int i = 0; i < nlink; ++i) {
if (links[i].size_read == 0) { if (links[i].size_read == 0) {
int atmark = links[i].sock.AtMark(); int atmark = links[i].sock.AtMark();
if (atmark < 0) { if (atmark < 0) {
@ -99,9 +114,9 @@ AllReduceRobust::ReturnType AllReduceRobust::TryResetLinks(void) {
} else if (atmark > 0) { } else if (atmark > 0) {
links[i].size_read = 1; links[i].size_read = 1;
} else { } else {
printf("buffer_size=%lu\n", links[i].buffer_size);
// no at mark, read and discard data // no at mark, read and discard data
ssize_t len = links[i].sock.Recv(links[i].buffer_head, links[i].buffer_size); ssize_t len = links[i].sock.Recv(links[i].buffer_head, links[i].buffer_size);
if (links[i].sock.AtMark()) links[i].size_read = 1;
// zero length, remote closed the connection, close socket // zero length, remote closed the connection, close socket
if (len == 0) links[i].sock.Close(); if (len == 0) links[i].sock.Close();
} }
@ -110,18 +125,14 @@ AllReduceRobust::ReturnType AllReduceRobust::TryResetLinks(void) {
utils::SelectHelper rsel; utils::SelectHelper rsel;
bool finished = true; bool finished = true;
for (int i = 0; i < nlink; ++i) { for (int i = 0; i < nlink; ++i) {
if (links[i].size_write != 2 && !links[i].sock.BadSocket()) {
rsel.WatchWrite(links[i].sock); finished = false;
}
if (links[i].size_read == 0 && !links[i].sock.BadSocket()) { if (links[i].size_read == 0 && !links[i].sock.BadSocket()) {
rsel.WatchRead(links[i].sock); finished = false; rsel.WatchRead(links[i].sock); finished = false;
} }
} }
if (finished) break; if (finished) break;
// wait to read from the channels to discard data
rsel.Select(); rsel.Select();
} }
utils::LogPrintf("[%d] Finish discard data\n", rank);
// start synchronization, use blocking I/O to avoid select // start synchronization, use blocking I/O to avoid select
for (int i = 0; i < nlink; ++i) { for (int i = 0; i < nlink; ++i) {
if (!links[i].sock.BadSocket()) { if (!links[i].sock.BadSocket()) {
@ -132,7 +143,7 @@ AllReduceRobust::ReturnType AllReduceRobust::TryResetLinks(void) {
links[i].sock.Close(); continue; links[i].sock.Close(); continue;
} else if (len > 0) { } else if (len > 0) {
utils::Assert(oob_mark == kResetMark, "wrong oob msg"); utils::Assert(oob_mark == kResetMark, "wrong oob msg");
utils::Assert(!links[i].sock.AtMark(), "should already read past mark"); utils::Assert(links[i].sock.AtMark() != 1, "should already read past mark");
} else { } else {
utils::Assert(errno != EAGAIN|| errno != EWOULDBLOCK, "BUG"); utils::Assert(errno != EAGAIN|| errno != EWOULDBLOCK, "BUG");
} }
@ -147,7 +158,6 @@ AllReduceRobust::ReturnType AllReduceRobust::TryResetLinks(void) {
} }
} }
} }
utils::LogPrintf("[%d] GGet all Acks\n", rank);
// wait all ack // wait all ack
for (int i = 0; i < nlink; ++i) { for (int i = 0; i < nlink; ++i) {
if (!links[i].sock.BadSocket()) { if (!links[i].sock.BadSocket()) {
@ -167,7 +177,6 @@ AllReduceRobust::ReturnType AllReduceRobust::TryResetLinks(void) {
for (int i = 0; i < nlink; ++i) { for (int i = 0; i < nlink; ++i) {
if (links[i].sock.BadSocket()) return kSockError; if (links[i].sock.BadSocket()) return kSockError;
} }
utils::LogPrintf("[%d] TryResetLinks,!! return\n", rank);
return kSuccess; return kSuccess;
} }

View File

@ -418,6 +418,7 @@ struct SelectHelper {
private: private:
inline static int Select_(int maxfd, fd_set *rfds, fd_set *wfds, fd_set *efds, long timeout) { inline static int Select_(int maxfd, fd_set *rfds, fd_set *wfds, fd_set *efds, long timeout) {
utils::Assert(maxfd < FD_SETSIZE, "maxdf must be smaller than FDSETSIZE");
if (timeout == 0) { if (timeout == 0) {
return select(maxfd, rfds, wfds, efds, NULL); return select(maxfd, rfds, wfds, efds, NULL);
} else { } else {

View File

@ -78,6 +78,7 @@ inline void HandlePrint(const char *msg) {
} }
inline void HandleLogPrint(const char *msg) { inline void HandleLogPrint(const char *msg) {
fprintf(stderr, "%s", msg); fprintf(stderr, "%s", msg);
fflush(stderr);
} }
#else #else
#ifndef ALLREDUCE_STRICT_CXX98_ #ifndef ALLREDUCE_STRICT_CXX98_