bring it back alive again

This commit is contained in:
tqchen 2014-11-23 21:27:16 -08:00
parent 7f3dc967cf
commit d2f151ef5a
2 changed files with 17 additions and 13 deletions

View File

@ -43,7 +43,7 @@ class SyncManager {
links[i].sock.Close(); links[i].sock.Close();
} }
links.clear(); links.clear();
TCPSocket::Finalize(); utils::TCPSocket::Finalize();
} }
/*! \brief set parameters to the sync manager */ /*! \brief set parameters to the sync manager */
inline void SetParam(const char *name, const char *val) { inline void SetParam(const char *name, const char *val) {
@ -83,7 +83,7 @@ class SyncManager {
} }
// initialize the manager // initialize the manager
inline void Init(void) { inline void Init(void) {
TCPSocket::Startup(); utils::TCPSocket::Startup();
// single node mode // single node mode
if (master_uri == "NULL") return; if (master_uri == "NULL") return;
utils::Assert(links.size() == 0, "can only call Init once"); utils::Assert(links.size() == 0, "can only call Init once");

View File

@ -25,6 +25,9 @@ namespace xgboost {
namespace utils { namespace utils {
#if defined(_WIN32) #if defined(_WIN32)
typedef int ssize_t; typedef int ssize_t;
#else
typedef int SOCKET;
const int INVALID_SOCKET = -1;
#endif #endif
/*! \brief data structure for network address */ /*! \brief data structure for network address */
@ -92,7 +95,7 @@ class TCPSocket {
inline void Create(int af = PF_INET) { inline void Create(int af = PF_INET) {
sockfd = socket(PF_INET, SOCK_STREAM, 0); sockfd = socket(PF_INET, SOCK_STREAM, 0);
if (sockfd == INVALID_SOCKET) { if (sockfd == INVALID_SOCKET) {
SockError("Create", errno); SockError("Create");
} }
} }
/*! /*!
@ -120,7 +123,7 @@ class TCPSocket {
#else #else
int flag = fcntl(sockfd, F_GETFL, 0); int flag = fcntl(sockfd, F_GETFL, 0);
if (flag == -1) { if (flag == -1) {
SockError("SetNonBlock-1", errno); SockError("SetNonBlock-1");
} }
if (non_block) { if (non_block) {
flag |= O_NONBLOCK; flag |= O_NONBLOCK;
@ -128,7 +131,7 @@ class TCPSocket {
flag &= ~O_NONBLOCK; flag &= ~O_NONBLOCK;
} }
if (fcntl(sockfd, F_SETFL, flag) == -1) { if (fcntl(sockfd, F_SETFL, flag) == -1) {
SockError("SetNonBlock-2", errno); SockError("SetNonBlock-2");
} }
#endif #endif
} }
@ -153,7 +156,7 @@ class TCPSocket {
*/ */
inline void Bind(const SockAddr &addr) { inline void Bind(const SockAddr &addr) {
if (bind(sockfd, (sockaddr*)&addr.addr, sizeof(addr.addr)) == -1) { if (bind(sockfd, (sockaddr*)&addr.addr, sizeof(addr.addr)) == -1) {
SockError("Bind", errno); SockError("Bind");
} }
} }
/*! /*!
@ -170,7 +173,7 @@ class TCPSocket {
return port; return port;
} }
if (errno != EADDRINUSE) { if (errno != EADDRINUSE) {
SockError("TryBindHost", errno); SockError("TryBindHost");
} }
} }
return -1; return -1;
@ -181,7 +184,7 @@ class TCPSocket {
*/ */
inline void Connect(const SockAddr &addr) { inline void Connect(const SockAddr &addr) {
if (connect(sockfd, (sockaddr*)&addr.addr, sizeof(addr.addr)) == -1) { if (connect(sockfd, (sockaddr*)&addr.addr, sizeof(addr.addr)) == -1) {
SockError("Connect", errno); SockError("Connect");
} }
} }
/*! \brief close the connection */ /*! \brief close the connection */
@ -209,7 +212,7 @@ class TCPSocket {
ssize_t ret = send(sockfd, buf, len, flag); ssize_t ret = send(sockfd, buf, len, flag);
if (ret == -1) { if (ret == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) return 0; if (errno == EAGAIN || errno == EWOULDBLOCK) return 0;
SockError("Send", errno); SockError("Send");
} }
return ret; return ret;
} }
@ -225,7 +228,7 @@ class TCPSocket {
ssize_t ret = recv(sockfd, buf, len, flags); ssize_t ret = recv(sockfd, buf, len, flags);
if (ret == -1) { if (ret == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) return 0; if (errno == EAGAIN || errno == EWOULDBLOCK) return 0;
SockError("Recv", errno); SockError("Recv");
} }
return ret; return ret;
} }
@ -243,7 +246,7 @@ class TCPSocket {
ssize_t ret = send(sockfd, buf, static_cast<ssize_t>(len - ndone), 0); ssize_t ret = send(sockfd, buf, static_cast<ssize_t>(len - ndone), 0);
if (ret == -1) { if (ret == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) return ndone; if (errno == EAGAIN || errno == EWOULDBLOCK) return ndone;
SockError("Recv", errno); SockError("Recv");
} }
buf += ret; buf += ret;
ndone += ret; ndone += ret;
@ -264,7 +267,7 @@ class TCPSocket {
ssize_t ret = recv(sockfd, buf, len - ndone, MSG_WAITALL); ssize_t ret = recv(sockfd, buf, len - ndone, MSG_WAITALL);
if (ret == -1) { if (ret == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) return ndone; if (errno == EAGAIN || errno == EWOULDBLOCK) return ndone;
SockError("Recv", errno); SockError("Recv");
} }
if (ret == 0) return ndone; if (ret == 0) return ndone;
buf += ret; buf += ret;
@ -275,7 +278,8 @@ class TCPSocket {
private: private:
// report an socket error // report an socket error
inline static void SockError(const char *msg, int errsv) { inline static void SockError(const char *msg) {
int errsv = errno;
char buf[256]; char buf[256];
Error("Socket %s Error:%s", msg, strerror_r(errsv, buf, sizeof(buf))); Error("Socket %s Error:%s", msg, strerror_r(errsv, buf, sizeof(buf)));
} }