[rabit harden] Enable all tests (#90)

* include osx in tests
* address `time_wait` on port assignment
* increase submit attempts.
* cleanup tests
This commit is contained in:
Chen Qin
2019-04-24 04:12:11 -07:00
committed by Jiaming Yuan
parent ecd4bf7aae
commit e3d51d3e62
17 changed files with 168 additions and 66 deletions

View File

@@ -131,6 +131,8 @@ void AllreduceBase::Shutdown(void) {
utils::TCPSocket tracker = this->ConnectTracker();
tracker.SendStr(std::string("shutdown"));
tracker.Close();
// close listening sockets
sock_listen.Close();
utils::TCPSocket::Finalize();
}
void AllreduceBase::TrackerPrint(const std::string &msg) {
@@ -271,12 +273,26 @@ void AllreduceBase::ReConnectLinks(const char *cmd) {
"ReConnectLink failure 4");
Assert(tracker.RecvAll(&next_rank, sizeof(next_rank)) == sizeof(next_rank),
"ReConnectLink failure 4");
// create listening socket
utils::TCPSocket sock_listen;
sock_listen.Create();
int port = sock_listen.TryBindHost(slave_port, slave_port + nport_trial);
utils::Check(port != -1, "ReConnectLink fail to bind the ports specified");
sock_listen.Listen();
if (sock_listen == INVALID_SOCKET || sock_listen.AtMark()) {
if (!sock_listen.IsClosed()) {
sock_listen.Close();
}
// create listening socket
sock_listen.Create();
sock_listen.SetKeepAlive(true);
// http://deepix.github.io/2016/10/21/tcprst.html
sock_listen.SetLinger(0);
// [slave_port, slave_port+1 .... slave_port + newrank ...slave_port + nport_trial)
// work around processes bind to same port without set reuse option,
// start explore from slave_port + newrank towards end
port = sock_listen.TryBindHost(slave_port+ newrank%nport_trial, slave_port + nport_trial);
// if no port bindable, explore first half of range
if (port == -1) sock_listen.TryBindHost(slave_port, newrank% nport_trial + slave_port);
utils::Check(port != -1, "ReConnectLink fail to bind the ports specified");
sock_listen.Listen();
}
// get number of to connect and number of to accept nodes from tracker
int num_conn, num_accept, num_error = 1;
@@ -311,6 +327,7 @@ void AllreduceBase::ReConnectLinks(const char *cmd) {
"ReConnectLink failure 9");
Assert(tracker.RecvAll(&hrank, sizeof(hrank)) == sizeof(hrank),
"ReConnectLink failure 10");
r.sock.Create();
if (!r.sock.Connect(utils::SockAddr(hname.c_str(), hport))) {
num_error += 1; r.sock.Close(); continue;
@@ -357,8 +374,7 @@ void AllreduceBase::ReConnectLinks(const char *cmd) {
}
if (!match) all_links.push_back(r);
}
// close listening sockets
sock_listen.Close();
this->parent_index = -1;
// setup tree links and ring structure
tree_links.plinks.clear();

View File

@@ -521,6 +521,10 @@ class AllreduceBase : public IEngine {
int world_size;
// connect retry time
int connect_retry;
// backdoor listening peer connection
utils::TCPSocket sock_listen;
// backdoor port
int port = 0;
};
} // namespace engine
} // namespace rabit

View File

@@ -51,12 +51,10 @@ void AllreduceRobust::Shutdown(void) {
utils::Assert(RecoverExec(NULL, 0, ActionSummary::kCheckAck, ActionSummary::kSpecialOp),
"Shutdown: check ack must return true");
// one worker shutdowns and closes sockets while rest still run kCheckAck,
// seems has something to do with time-wait state in tcp connection,
// this cause rest workers checkandrecover and hang inf,
// https://github.com/dmlc/xgboost/pull/3818
// TODO(Chen Qin): a fundamental fix for this
sleep(1);
#if defined (__APPLE__)
sleep(1);
#endif
AllreduceBase::Shutdown();
}
/*!

View File

@@ -276,13 +276,21 @@ class TCPSocket : public Socket{
* \brief enable/disable TCP keepalive
* \param keepalive whether to set the keep alive option on
*/
inline void SetKeepAlive(bool keepalive) {
void SetKeepAlive(bool keepalive) {
int opt = static_cast<int>(keepalive);
if (setsockopt(sockfd, SOL_SOCKET, SO_KEEPALIVE,
reinterpret_cast<char*>(&opt), sizeof(opt)) < 0) {
Socket::Error("SetKeepAlive");
}
}
inline void SetLinger(int timeout = 0) {
struct linger sl;
sl.l_onoff = 1; /* non-zero value enables linger option in kernel */
sl.l_linger = timeout; /* timeout interval in seconds */
if (setsockopt(sockfd, SOL_SOCKET, SO_LINGER, &sl, sizeof(sl)) == -1) {
Socket::Error("SO_LINGER");
}
}
/*!
* \brief create the socket, call this before using socket
* \param af domain