more tracker renaming

This commit is contained in:
tqchen 2014-12-06 09:24:12 -08:00
parent a569bf2698
commit 19631ecef6
4 changed files with 58 additions and 64 deletions

View File

@ -15,8 +15,8 @@ namespace rabit {
namespace engine {
// constructor
AllreduceBase::AllreduceBase(void) {
master_uri = "NULL";
master_port = 9000;
tracker_uri = "NULL";
tracker_port = 9000;
host_uri = "";
slave_port = 9010;
nport_trial = 1000;
@ -45,7 +45,7 @@ void AllreduceBase::Init(void) {
utils::Socket::Startup();
utils::Assert(links.size() == 0, "can only call Init once");
this->host_uri = utils::SockAddr::GetHostName();
// get information from master
// get information from tracker
this->ReConnectLinks();
}
@ -55,22 +55,22 @@ void AllreduceBase::Shutdown(void) {
}
links.clear();
if (master_uri == "NULL") return;
if (tracker_uri == "NULL") return;
int magic = kMagic;
// notify master rank i have shutdown
utils::TCPSocket master;
master.Create();
if (!master.Connect(utils::SockAddr(master_uri.c_str(), master_port))) {
utils::Socket::Error("Connect Master");
// notify tracker rank i have shutdown
utils::TCPSocket tracker;
tracker.Create();
if (!tracker.Connect(utils::SockAddr(tracker_uri.c_str(), tracker_port))) {
utils::Socket::Error("Connect Tracker");
}
utils::Assert(master.SendAll(&magic, sizeof(magic)) == sizeof(magic), "ReConnectLink failure 1");
utils::Assert(master.RecvAll(&magic, sizeof(magic)) == sizeof(magic), "ReConnectLink failure 2");
utils::Check(magic == kMagic, "sync::Invalid master message, init failure");
utils::Assert(tracker.SendAll(&magic, sizeof(magic)) == sizeof(magic), "ReConnectLink failure 1");
utils::Assert(tracker.RecvAll(&magic, sizeof(magic)) == sizeof(magic), "ReConnectLink failure 2");
utils::Check(magic == kMagic, "sync::Invalid tracker message, init failure");
utils::Assert(master.SendAll(&rank, sizeof(rank)) == sizeof(rank), "ReConnectLink failure 3");
master.SendStr(task_id);
master.SendStr(std::string("shutdown"));
master.Close();
utils::Assert(tracker.SendAll(&rank, sizeof(rank)) == sizeof(rank), "ReConnectLink failure 3");
tracker.SendStr(task_id);
tracker.SendStr(std::string("shutdown"));
tracker.Close();
utils::TCPSocket::Finalize();
}
/*!
@ -79,8 +79,8 @@ void AllreduceBase::Shutdown(void) {
* \param val parameter value
*/
void AllreduceBase::SetParam(const char *name, const char *val) {
if (!strcmp(name, "master_uri")) master_uri = val;
if (!strcmp(name, "master_port")) master_port = atoi(val);
if (!strcmp(name, "rabit_tracker_uri")) tracker_uri = val;
if (!strcmp(name, "rabit_tracker_port")) tracker_port = atoi(val);
if (!strcmp(name, "task_id")) task_id = val;
if (!strcmp(name, "hadoop_mode")) hadoop_mode = atoi(val);
if (!strcmp(name, "reduce_buffer")) {
@ -100,34 +100,34 @@ void AllreduceBase::SetParam(const char *name, const char *val) {
}
}
/*!
* \brief connect to the master to fix the the missing links
* \brief connect to the tracker to fix the the missing links
* this function is also used when the engine start up
*/
void AllreduceBase::ReConnectLinks(const char *cmd) {
// single node mode
if (master_uri == "NULL") {
if (tracker_uri == "NULL") {
rank = 0; return;
}
int magic = kMagic;
// get information from master
utils::TCPSocket master;
master.Create();
if (!master.Connect(utils::SockAddr(master_uri.c_str(), master_port))) {
// get information from tracker
utils::TCPSocket tracker;
tracker.Create();
if (!tracker.Connect(utils::SockAddr(tracker_uri.c_str(), tracker_port))) {
utils::Socket::Error("Connect");
}
utils::Assert(master.SendAll(&magic, sizeof(magic)) == sizeof(magic), "ReConnectLink failure 1");
utils::Assert(master.RecvAll(&magic, sizeof(magic)) == sizeof(magic), "ReConnectLink failure 2");
utils::Check(magic == kMagic, "sync::Invalid master message, init failure");
utils::Assert(master.SendAll(&rank, sizeof(rank)) == sizeof(rank), "ReConnectLink failure 3");
master.SendStr(task_id);
master.SendStr(std::string(cmd));
utils::Assert(tracker.SendAll(&magic, sizeof(magic)) == sizeof(magic), "ReConnectLink failure 1");
utils::Assert(tracker.RecvAll(&magic, sizeof(magic)) == sizeof(magic), "ReConnectLink failure 2");
utils::Check(magic == kMagic, "sync::Invalid tracker message, init failure");
utils::Assert(tracker.SendAll(&rank, sizeof(rank)) == sizeof(rank), "ReConnectLink failure 3");
tracker.SendStr(task_id);
tracker.SendStr(std::string(cmd));
{// get new ranks
int newrank;
utils::Assert(master.RecvAll(&newrank, sizeof(newrank)) == sizeof(newrank),
utils::Assert(tracker.RecvAll(&newrank, sizeof(newrank)) == sizeof(newrank),
"ReConnectLink failure 4");
utils::Assert(master.RecvAll(&parent_rank, sizeof(parent_rank)) == sizeof(parent_rank),
utils::Assert(tracker.RecvAll(&parent_rank, sizeof(parent_rank)) == sizeof(parent_rank),
"ReConnectLink failure 4");
utils::Assert(master.RecvAll(&world_size, sizeof(world_size)) == sizeof(world_size),
utils::Assert(tracker.RecvAll(&world_size, sizeof(world_size)) == sizeof(world_size),
"ReConnectLink failure 4");
utils::Assert(rank == -1 || newrank == rank, "must keep rank to same if the node already have one");
rank = newrank;
@ -139,7 +139,7 @@ void AllreduceBase::ReConnectLinks(const char *cmd) {
utils::Check(port != -1, "ReConnectLink fail to bind the ports specified");
sock_listen.Listen();
// get number of to connect and number of to accept nodes from master
// get number of to connect and number of to accept nodes from tracker
int num_conn, num_accept, num_error = 1;
do {
// send over good links
@ -152,24 +152,24 @@ void AllreduceBase::ReConnectLinks(const char *cmd) {
}
}
int ngood = static_cast<int>(good_link.size());
utils::Assert(master.SendAll(&ngood, sizeof(ngood)) == sizeof(ngood),
utils::Assert(tracker.SendAll(&ngood, sizeof(ngood)) == sizeof(ngood),
"ReConnectLink failure 5");
for (size_t i = 0; i < good_link.size(); ++i) {
utils::Assert(master.SendAll(&good_link[i], sizeof(good_link[i])) == sizeof(good_link[i]),
utils::Assert(tracker.SendAll(&good_link[i], sizeof(good_link[i])) == sizeof(good_link[i]),
"ReConnectLink failure 6");
}
utils::Assert(master.RecvAll(&num_conn, sizeof(num_conn)) == sizeof(num_conn),
utils::Assert(tracker.RecvAll(&num_conn, sizeof(num_conn)) == sizeof(num_conn),
"ReConnectLink failure 7");
utils::Assert(master.RecvAll(&num_accept, sizeof(num_accept)) == sizeof(num_accept),
utils::Assert(tracker.RecvAll(&num_accept, sizeof(num_accept)) == sizeof(num_accept),
"ReConnectLink failure 8");
num_error = 0;
for (int i = 0; i < num_conn; ++i) {
LinkRecord r;
int hport, hrank;
std::string hname;
master.RecvStr(&hname);
utils::Assert(master.RecvAll(&hport, sizeof(hport)) == sizeof(hport), "ReConnectLink failure 9");
utils::Assert(master.RecvAll(&hrank, sizeof(hrank)) == sizeof(hrank), "ReConnectLink failure 10");
tracker.RecvStr(&hname);
utils::Assert(tracker.RecvAll(&hport, sizeof(hport)) == sizeof(hport), "ReConnectLink failure 9");
utils::Assert(tracker.RecvAll(&hrank, sizeof(hrank)) == sizeof(hrank), "ReConnectLink failure 10");
r.sock.Create();
if (!r.sock.Connect(utils::SockAddr(hname.c_str(), hport))) {
num_error += 1; r.sock.Close(); continue;
@ -186,12 +186,12 @@ void AllreduceBase::ReConnectLinks(const char *cmd) {
}
if (!match) links.push_back(r);
}
utils::Assert(master.SendAll(&num_error, sizeof(num_error)) == sizeof(num_error), "ReConnectLink failure 14");
utils::Assert(tracker.SendAll(&num_error, sizeof(num_error)) == sizeof(num_error), "ReConnectLink failure 14");
} while (num_error != 0);
// send back socket listening port to master
utils::Assert(master.SendAll(&port, sizeof(port)) == sizeof(port), "ReConnectLink failure 14");
// close connection to master
master.Close();
// send back socket listening port to tracker
utils::Assert(tracker.SendAll(&port, sizeof(port)) == sizeof(port), "ReConnectLink failure 14");
// close connection to tracker
tracker.Close();
// listen to incoming links
for (int i = 0; i < num_accept; ++i) {
LinkRecord r;

View File

@ -260,9 +260,9 @@ class AllreduceBase : public IEngine {
std::vector<uint64_t> buffer_;
};
/*!
* \brief connect to the master to fix the the missing links
* \brief connect to the tracker to fix the the missing links
* this function is also used when the engine start up
* \param cmd possible command to sent to master
* \param cmd possible command to sent to tracker
*/
void ReConnectLinks(const char *cmd = "start");
/*!
@ -316,10 +316,10 @@ class AllreduceBase : public IEngine {
std::string task_id;
// uri of current host, to be set by Init
std::string host_uri;
// uri of master
std::string master_uri;
// port of master address
int master_port;
// uri of tracker
std::string tracker_uri;
// port of tracker address
int tracker_port;
// port of slave process
int slave_port, nport_trial;
// reduce buffer size

View File

@ -1,6 +1,8 @@
"""
Master script for rabit
Implements the master control protocol to start rabit jobs and assign necessary information
Tracker script for rabit
Implements the tracker control protocol
- start rabit jobs
- help nodes to establish links with each other
Tianqi Chen
"""
@ -128,8 +130,8 @@ class Tracker:
def __del__(self):
self.sock.close()
def slave_args(self):
return ['master_uri=%s' % socket.gethostname(),
'master_port=%s' % self.port]
return ['rabit_tracker_uri=%s' % socket.gethostname(),
'rabit_tracker_port=%s' % self.port]
def accept_slaves(self, nslave):
# set of nodes that finishs the job
shutdown = {}

View File

@ -1,8 +0,0 @@
#!/bin/bash
if [ "$#" -lt 4 ];
then
echo "Usage <nslave> <ndata> <config> <round_files_dir>"
exit -1
fi
../submit_job.py $1 test_recover "${@:2}"