From f7928c68a31c69f907d010057164630c716755d9 Mon Sep 17 00:00:00 2001 From: tqchen Date: Sun, 30 Nov 2014 21:07:34 -0800 Subject: [PATCH] next round try more careful select design --- src/engine_base.cc | 4 ++-- src/engine_robust-inl.h | 9 ++++----- src/engine_robust.cc | 2 +- 3 files changed, 7 insertions(+), 8 deletions(-) diff --git a/src/engine_base.cc b/src/engine_base.cc index fb6e683ae..9f0aaa405 100644 --- a/src/engine_base.cc +++ b/src/engine_base.cc @@ -151,7 +151,7 @@ AllReduceBase::TryAllReduce(void *sendrecvbuf_, size_t type_nbytes, size_t count, ReduceFunction reducer) { - if (links.size() == 0) return kSuccess; + if (links.size() == 0 || count == 0) return kSuccess; // total size of message const size_t total_size = type_nbytes * count; // number of links @@ -287,7 +287,7 @@ AllReduceBase::TryAllReduce(void *sendrecvbuf_, */ AllReduceBase::ReturnType AllReduceBase::TryBroadcast(void *sendrecvbuf_, size_t total_size, int root) { - if (links.size() == 0) return kSuccess; + if (links.size() == 0 || total_size == 0) return kSuccess; // number of links const int nlink = static_cast(links.size()); // size of space already read from data diff --git a/src/engine_robust-inl.h b/src/engine_robust-inl.h index 2817d4c0a..42558a750 100644 --- a/src/engine_robust-inl.h +++ b/src/engine_robust-inl.h @@ -67,6 +67,7 @@ AllReduceRobust::MsgPassing(const NodeType &node_value, } // select helper utils::SelectHelper selecter; + bool done = (stage == 3); for (int i = 0; i < nlink; ++i) { selecter.WatchException(links[i].sock); switch (stage) { @@ -80,12 +81,14 @@ AllReduceRobust::MsgPassing(const NodeType &node_value, case 3: if (i != parent_index && links[i].size_write != sizeof(EdgeType)) { selecter.WatchWrite(links[i].sock); + done = false; } break; default: utils::Error("invalid stage"); } } - // select must return + // finish all the stages, and write out message + if (done) break; selecter.Select(); // exception handling for (int i = 0; i < nlink; ++i) { @@ -134,15 +137,11 @@ AllReduceRobust::MsgPassing(const NodeType &node_value, } } if (stage == 3) { - bool finished = true; for (int i = 0; i < nlink; ++i) { if (i != parent_index && links[i].size_write != sizeof(EdgeType)) { if (!links[i].WriteFromArray(&edge_out[i], sizeof(EdgeType))) return kSockError; - if (links[i].size_write != sizeof(EdgeType)) finished = false; } } - // finish all the stages - if (finished) break; } } return kSuccess; diff --git a/src/engine_robust.cc b/src/engine_robust.cc index dbc48f406..7f510d2f3 100644 --- a/src/engine_robust.cc +++ b/src/engine_robust.cc @@ -352,7 +352,7 @@ AllReduceRobust::TryRecoverData(RecoverType role, int recv_link, const std::vector &req_in) { // no need to run recovery for zero size message - if (size == 0) return kSuccess; + if (links.size() == 0 || size == 0) return kSuccess; utils::Assert(req_in.size() == links.size(), "TryRecoverData"); const int nlink = static_cast(links.size()); {