From 19631ecef6bc3e431d4ef25c80f4d016959fdfdf Mon Sep 17 00:00:00 2001 From: tqchen Date: Sat, 6 Dec 2014 09:24:12 -0800 Subject: [PATCH] more tracker renaming --- src/allreduce_base.cc | 92 +++++++++++++++++++++---------------------- src/allreduce_base.h | 12 +++--- src/rabit_tracker.py | 10 +++-- test/test.sh | 8 ---- 4 files changed, 58 insertions(+), 64 deletions(-) delete mode 100755 test/test.sh diff --git a/src/allreduce_base.cc b/src/allreduce_base.cc index 6eb77b6dd..90a32dbee 100644 --- a/src/allreduce_base.cc +++ b/src/allreduce_base.cc @@ -15,8 +15,8 @@ namespace rabit { namespace engine { // constructor AllreduceBase::AllreduceBase(void) { - master_uri = "NULL"; - master_port = 9000; + tracker_uri = "NULL"; + tracker_port = 9000; host_uri = ""; slave_port = 9010; nport_trial = 1000; @@ -45,7 +45,7 @@ void AllreduceBase::Init(void) { utils::Socket::Startup(); utils::Assert(links.size() == 0, "can only call Init once"); this->host_uri = utils::SockAddr::GetHostName(); - // get information from master + // get information from tracker this->ReConnectLinks(); } @@ -55,22 +55,22 @@ void AllreduceBase::Shutdown(void) { } links.clear(); - if (master_uri == "NULL") return; + if (tracker_uri == "NULL") return; int magic = kMagic; - // notify master rank i have shutdown - utils::TCPSocket master; - master.Create(); - if (!master.Connect(utils::SockAddr(master_uri.c_str(), master_port))) { - utils::Socket::Error("Connect Master"); + // notify tracker rank i have shutdown + utils::TCPSocket tracker; + tracker.Create(); + if (!tracker.Connect(utils::SockAddr(tracker_uri.c_str(), tracker_port))) { + utils::Socket::Error("Connect Tracker"); } - utils::Assert(master.SendAll(&magic, sizeof(magic)) == sizeof(magic), "ReConnectLink failure 1"); - utils::Assert(master.RecvAll(&magic, sizeof(magic)) == sizeof(magic), "ReConnectLink failure 2"); - utils::Check(magic == kMagic, "sync::Invalid master message, init failure"); + utils::Assert(tracker.SendAll(&magic, sizeof(magic)) == sizeof(magic), "ReConnectLink failure 1"); + utils::Assert(tracker.RecvAll(&magic, sizeof(magic)) == sizeof(magic), "ReConnectLink failure 2"); + utils::Check(magic == kMagic, "sync::Invalid tracker message, init failure"); - utils::Assert(master.SendAll(&rank, sizeof(rank)) == sizeof(rank), "ReConnectLink failure 3"); - master.SendStr(task_id); - master.SendStr(std::string("shutdown")); - master.Close(); + utils::Assert(tracker.SendAll(&rank, sizeof(rank)) == sizeof(rank), "ReConnectLink failure 3"); + tracker.SendStr(task_id); + tracker.SendStr(std::string("shutdown")); + tracker.Close(); utils::TCPSocket::Finalize(); } /*! @@ -79,8 +79,8 @@ void AllreduceBase::Shutdown(void) { * \param val parameter value */ void AllreduceBase::SetParam(const char *name, const char *val) { - if (!strcmp(name, "master_uri")) master_uri = val; - if (!strcmp(name, "master_port")) master_port = atoi(val); + if (!strcmp(name, "rabit_tracker_uri")) tracker_uri = val; + if (!strcmp(name, "rabit_tracker_port")) tracker_port = atoi(val); if (!strcmp(name, "task_id")) task_id = val; if (!strcmp(name, "hadoop_mode")) hadoop_mode = atoi(val); if (!strcmp(name, "reduce_buffer")) { @@ -100,34 +100,34 @@ void AllreduceBase::SetParam(const char *name, const char *val) { } } /*! - * \brief connect to the master to fix the the missing links + * \brief connect to the tracker to fix the the missing links * this function is also used when the engine start up */ void AllreduceBase::ReConnectLinks(const char *cmd) { // single node mode - if (master_uri == "NULL") { + if (tracker_uri == "NULL") { rank = 0; return; } int magic = kMagic; - // get information from master - utils::TCPSocket master; - master.Create(); - if (!master.Connect(utils::SockAddr(master_uri.c_str(), master_port))) { + // get information from tracker + utils::TCPSocket tracker; + tracker.Create(); + if (!tracker.Connect(utils::SockAddr(tracker_uri.c_str(), tracker_port))) { utils::Socket::Error("Connect"); } - utils::Assert(master.SendAll(&magic, sizeof(magic)) == sizeof(magic), "ReConnectLink failure 1"); - utils::Assert(master.RecvAll(&magic, sizeof(magic)) == sizeof(magic), "ReConnectLink failure 2"); - utils::Check(magic == kMagic, "sync::Invalid master message, init failure"); - utils::Assert(master.SendAll(&rank, sizeof(rank)) == sizeof(rank), "ReConnectLink failure 3"); - master.SendStr(task_id); - master.SendStr(std::string(cmd)); + utils::Assert(tracker.SendAll(&magic, sizeof(magic)) == sizeof(magic), "ReConnectLink failure 1"); + utils::Assert(tracker.RecvAll(&magic, sizeof(magic)) == sizeof(magic), "ReConnectLink failure 2"); + utils::Check(magic == kMagic, "sync::Invalid tracker message, init failure"); + utils::Assert(tracker.SendAll(&rank, sizeof(rank)) == sizeof(rank), "ReConnectLink failure 3"); + tracker.SendStr(task_id); + tracker.SendStr(std::string(cmd)); {// get new ranks int newrank; - utils::Assert(master.RecvAll(&newrank, sizeof(newrank)) == sizeof(newrank), + utils::Assert(tracker.RecvAll(&newrank, sizeof(newrank)) == sizeof(newrank), "ReConnectLink failure 4"); - utils::Assert(master.RecvAll(&parent_rank, sizeof(parent_rank)) == sizeof(parent_rank), + utils::Assert(tracker.RecvAll(&parent_rank, sizeof(parent_rank)) == sizeof(parent_rank), "ReConnectLink failure 4"); - utils::Assert(master.RecvAll(&world_size, sizeof(world_size)) == sizeof(world_size), + utils::Assert(tracker.RecvAll(&world_size, sizeof(world_size)) == sizeof(world_size), "ReConnectLink failure 4"); utils::Assert(rank == -1 || newrank == rank, "must keep rank to same if the node already have one"); rank = newrank; @@ -139,7 +139,7 @@ void AllreduceBase::ReConnectLinks(const char *cmd) { utils::Check(port != -1, "ReConnectLink fail to bind the ports specified"); sock_listen.Listen(); - // get number of to connect and number of to accept nodes from master + // get number of to connect and number of to accept nodes from tracker int num_conn, num_accept, num_error = 1; do { // send over good links @@ -152,24 +152,24 @@ void AllreduceBase::ReConnectLinks(const char *cmd) { } } int ngood = static_cast(good_link.size()); - utils::Assert(master.SendAll(&ngood, sizeof(ngood)) == sizeof(ngood), + utils::Assert(tracker.SendAll(&ngood, sizeof(ngood)) == sizeof(ngood), "ReConnectLink failure 5"); for (size_t i = 0; i < good_link.size(); ++i) { - utils::Assert(master.SendAll(&good_link[i], sizeof(good_link[i])) == sizeof(good_link[i]), + utils::Assert(tracker.SendAll(&good_link[i], sizeof(good_link[i])) == sizeof(good_link[i]), "ReConnectLink failure 6"); } - utils::Assert(master.RecvAll(&num_conn, sizeof(num_conn)) == sizeof(num_conn), + utils::Assert(tracker.RecvAll(&num_conn, sizeof(num_conn)) == sizeof(num_conn), "ReConnectLink failure 7"); - utils::Assert(master.RecvAll(&num_accept, sizeof(num_accept)) == sizeof(num_accept), + utils::Assert(tracker.RecvAll(&num_accept, sizeof(num_accept)) == sizeof(num_accept), "ReConnectLink failure 8"); num_error = 0; for (int i = 0; i < num_conn; ++i) { LinkRecord r; int hport, hrank; std::string hname; - master.RecvStr(&hname); - utils::Assert(master.RecvAll(&hport, sizeof(hport)) == sizeof(hport), "ReConnectLink failure 9"); - utils::Assert(master.RecvAll(&hrank, sizeof(hrank)) == sizeof(hrank), "ReConnectLink failure 10"); + tracker.RecvStr(&hname); + utils::Assert(tracker.RecvAll(&hport, sizeof(hport)) == sizeof(hport), "ReConnectLink failure 9"); + utils::Assert(tracker.RecvAll(&hrank, sizeof(hrank)) == sizeof(hrank), "ReConnectLink failure 10"); r.sock.Create(); if (!r.sock.Connect(utils::SockAddr(hname.c_str(), hport))) { num_error += 1; r.sock.Close(); continue; @@ -186,12 +186,12 @@ void AllreduceBase::ReConnectLinks(const char *cmd) { } if (!match) links.push_back(r); } - utils::Assert(master.SendAll(&num_error, sizeof(num_error)) == sizeof(num_error), "ReConnectLink failure 14"); + utils::Assert(tracker.SendAll(&num_error, sizeof(num_error)) == sizeof(num_error), "ReConnectLink failure 14"); } while (num_error != 0); - // send back socket listening port to master - utils::Assert(master.SendAll(&port, sizeof(port)) == sizeof(port), "ReConnectLink failure 14"); - // close connection to master - master.Close(); + // send back socket listening port to tracker + utils::Assert(tracker.SendAll(&port, sizeof(port)) == sizeof(port), "ReConnectLink failure 14"); + // close connection to tracker + tracker.Close(); // listen to incoming links for (int i = 0; i < num_accept; ++i) { LinkRecord r; diff --git a/src/allreduce_base.h b/src/allreduce_base.h index e972551f8..6eea948ce 100644 --- a/src/allreduce_base.h +++ b/src/allreduce_base.h @@ -260,9 +260,9 @@ class AllreduceBase : public IEngine { std::vector buffer_; }; /*! - * \brief connect to the master to fix the the missing links + * \brief connect to the tracker to fix the the missing links * this function is also used when the engine start up - * \param cmd possible command to sent to master + * \param cmd possible command to sent to tracker */ void ReConnectLinks(const char *cmd = "start"); /*! @@ -316,10 +316,10 @@ class AllreduceBase : public IEngine { std::string task_id; // uri of current host, to be set by Init std::string host_uri; - // uri of master - std::string master_uri; - // port of master address - int master_port; + // uri of tracker + std::string tracker_uri; + // port of tracker address + int tracker_port; // port of slave process int slave_port, nport_trial; // reduce buffer size diff --git a/src/rabit_tracker.py b/src/rabit_tracker.py index ea77506be..7ae53fd26 100644 --- a/src/rabit_tracker.py +++ b/src/rabit_tracker.py @@ -1,6 +1,8 @@ """ -Master script for rabit -Implements the master control protocol to start rabit jobs and assign necessary information +Tracker script for rabit +Implements the tracker control protocol + - start rabit jobs + - help nodes to establish links with each other Tianqi Chen """ @@ -128,8 +130,8 @@ class Tracker: def __del__(self): self.sock.close() def slave_args(self): - return ['master_uri=%s' % socket.gethostname(), - 'master_port=%s' % self.port] + return ['rabit_tracker_uri=%s' % socket.gethostname(), + 'rabit_tracker_port=%s' % self.port] def accept_slaves(self, nslave): # set of nodes that finishs the job shutdown = {} diff --git a/test/test.sh b/test/test.sh deleted file mode 100755 index c323785dd..000000000 --- a/test/test.sh +++ /dev/null @@ -1,8 +0,0 @@ -#!/bin/bash -if [ "$#" -lt 4 ]; -then - echo "Usage " - exit -1 -fi - -../submit_job.py $1 test_recover "${@:2}"