diff --git a/src/allreduce_base.cc b/src/allreduce_base.cc index 99e12561c..d2ab14daa 100644 --- a/src/allreduce_base.cc +++ b/src/allreduce_base.cc @@ -22,7 +22,7 @@ AllreduceBase::AllreduceBase(void) { slave_port = 9010; nport_trial = 1000; rank = -1; - world_size = 1; + world_size = -1; hadoop_mode = 0; version_number = 0; task_id = "NULL"; @@ -31,8 +31,8 @@ AllreduceBase::AllreduceBase(void) { // initialization function void AllreduceBase::Init(void) { - { - // handling for hadoop + // setup from enviroment variables + {// handling for hadoop const char *task_id = getenv("mapred_task_id"); if (hadoop_mode != 0) { utils::Check(task_id != NULL, "hadoop_mode is set but cannot find mapred_task_id"); @@ -41,7 +41,16 @@ void AllreduceBase::Init(void) { this->SetParam("rabit_task_id", task_id); this->SetParam("rabit_hadoop_mode", "1"); } - } + // handling for hadoop + const char *num_task = getenv("mapred_map_tasks"); + if (hadoop_mode != 0) { + utils::Check(num_task != NULL, "hadoop_mode is set but cannot find mapred_map_tasks"); + } + if (num_task != NULL) { + this->SetParam("rabit_world_size", num_task); + } + } + //--------------------- // start socket utils::Socket::Startup(); utils::Assert(all_links.size() == 0, "can only call Init once"); @@ -70,6 +79,7 @@ void AllreduceBase::Shutdown(void) { utils::Check(magic == kMagic, "sync::Invalid tracker message, init failure"); utils::Assert(tracker.SendAll(&rank, sizeof(rank)) == sizeof(rank), "ReConnectLink failure 3"); + utils::Assert(tracker.SendAll(&world_size, sizeof(world_size)) == sizeof(world_size), "ReConnectLink failure 3"); tracker.SendStr(task_id); tracker.SendStr(std::string("shutdown")); tracker.Close(); @@ -84,6 +94,7 @@ void AllreduceBase::SetParam(const char *name, const char *val) { if (!strcmp(name, "rabit_tracker_uri")) tracker_uri = val; if (!strcmp(name, "rabit_tracker_port")) tracker_port = atoi(val); if (!strcmp(name, "rabit_task_id")) task_id = val; + if (!strcmp(name, "rabit_world_size")) world_size = atoi(val); if (!strcmp(name, "rabit_hadoop_mode")) hadoop_mode = atoi(val); if (!strcmp(name, "rabit_reduce_buffer")) { char unit; @@ -108,7 +119,7 @@ void AllreduceBase::SetParam(const char *name, const char *val) { void AllreduceBase::ReConnectLinks(const char *cmd) { // single node mode if (tracker_uri == "NULL") { - rank = 0; return; + rank = 0; world_size = 1; return; } int magic = kMagic; // get information from tracker @@ -121,6 +132,7 @@ void AllreduceBase::ReConnectLinks(const char *cmd) { utils::Assert(tracker.RecvAll(&magic, sizeof(magic)) == sizeof(magic), "ReConnectLink failure 2"); utils::Check(magic == kMagic, "sync::Invalid tracker message, init failure"); utils::Assert(tracker.SendAll(&rank, sizeof(rank)) == sizeof(rank), "ReConnectLink failure 3"); + utils::Assert(tracker.SendAll(&world_size, sizeof(world_size)) == sizeof(world_size), "ReConnectLink failure 3"); tracker.SendStr(task_id); tracker.SendStr(std::string(cmd)); // the rank of previous link, next link in ring diff --git a/submit_hadoop.py b/submit_hadoop.py deleted file mode 100755 index 3852b9f1d..000000000 --- a/submit_hadoop.py +++ /dev/null @@ -1,31 +0,0 @@ -#!/usr/bin/python -""" -This is an example job submit script for hadoop streaming -""" -import argparse -import sys -import os -import time -import subprocess -sys.path.append('./src/') -import rabit_tracker as tracker - -parser = argparse.ArgumentParser(description='Hadoop Streaming submission script') -parser.add_argument('-s', '--nslaves', required=True, type=int) -parser.add_argument('-hb', '--hadoop_binary', required=True) -parser.add_argument('-hs', '--hadoop_streaming_jar', required=True) -parser.add_argument('-i', '--input', required=True) -parser.add_argument('-o', '--output', required=True) -parser.add_argument('-m', '--mapper', required=True) -parser.add_argument('-k', '--nclusters', required=True, type=int) -parser.add_argument('-itr', '--iterations', required=True, type=int) -args = parser.parse_args() - -def hadoop_streaming(nslaves, slave_args): - cmd = '%s jar %s -input %s -output %s -mapper \"%s stdin %d %d stdout %s\" -reducer \"/bin/cat\" -file %s' % (args.hadoop_binary, args.hadoop_streaming_jar, args.input, args.output, args.mapper, args.nclusters, args.iterations, ' '.join(slave_args), args.mapper) - print cmd - subprocess.check_call(cmd, shell = True) - -start = time.time() -tracker.submit(args.nslaves, [], fun_submit= hadoop_streaming) -print 'All run took %s' % (time.time() - start) diff --git a/test/test.mk b/test/test.mk index a70fcf050..a7b6ceb51 100644 --- a/test/test.mk +++ b/test/test.mk @@ -10,11 +10,11 @@ endif local_recover: - ../submit_mpi.py $(nslave) local test_local_recover $(ndata) rabit_local_replica=1 + ../tracker/rabit_mpi.py $(nslave) local test_local_recover $(ndata) rabit_local_replica=1 local_recover_10_10k: - ../submit_mpi.py 10 local test_local_recover 10000 rabit_local_replica=1 + ../tracker/rabit_mpi.py 10 local test_local_recover 10000 rabit_local_replica=1 # this experiment test recovery with actually process exit, use keepalive to keep program alive model_recover_10_10k: - ../submit_mpi.py 10 local keepalive.sh test_model_recover 10000 + ../tracker/rabit_mpi.py 10 local keepalive.sh test_model_recover 10000 diff --git a/tracker/rabit_hadoop.py b/tracker/rabit_hadoop.py new file mode 100755 index 000000000..4a2cdb718 --- /dev/null +++ b/tracker/rabit_hadoop.py @@ -0,0 +1,44 @@ +#!/usr/bin/python +""" +This is a script to submit rabit job using hadoop streaming +submit the rabit process as mappers of MapReduce +""" +import argparse +import sys +import os +import time +import subprocess +import rabit_tracker as tracker + +#!!! you can directly set hadoop binary path and hadoop streaming path here +hadoop_binary = 'hadoop' +hadoop_streaming_jar = None + +parser = argparse.ArgumentParser(description='Rabit script to submit rabit jobs using hadoop streaming') +parser.add_argument('-s', '--nslaves', required=True, type=int, + help = "number of slaves proccess to be launched") +if hadoop_binary == None: + parser.add_argument('-hb', '--hadoop_binary', required=True, + help="path-to-hadoop binary folder") +if hadoop_streaming_jar == None: + parser.add_argument('-hs', '--hadoop_streaming_jar', required=True, + help='path-to hadoop streamimg jar file') +parser.add_argument('-i', '--input', required=True) +parser.add_argument('-o', '--output', required=True) +parser.add_argument('-m', '--mapper', required=True) +parser.add_argument('-a', '--args', required=True) +args = parser.parse_args() + +if hadoop_binary != None: + args.hadoop_binary = hadoop_binary +if hadoop_streaming_jar != None: + args.hadoop_streaming_jar = hadoop_streaming_jar + +def hadoop_streaming(nslaves, slave_args): + cmd = '%s jar %s -input %s -output %s -mapper \"%s stdin %d %d stdout %s\" -reducer \"/bin/cat\" -file %s' % (args.hadoop_binary, args.hadoop_streaming_jar, args.input, args.output, args.mapper, args.nclusters, args.iterations, ' '.join(slave_args), args.mapper) + print cmd + subprocess.check_call(cmd, shell = True) + +start = time.time() +tracker.submit(args.nslaves, [], fun_submit= hadoop_streaming) +print 'All run took %s' % (time.time() - start) diff --git a/submit_mpi.py b/tracker/rabit_mpi.py similarity index 100% rename from submit_mpi.py rename to tracker/rabit_mpi.py diff --git a/src/rabit_tracker.py b/tracker/rabit_tracker.py similarity index 86% rename from src/rabit_tracker.py rename to tracker/rabit_tracker.py index 025b5938a..8e05b4b5a 100644 --- a/src/rabit_tracker.py +++ b/tracker/rabit_tracker.py @@ -53,6 +53,7 @@ class SlaveEntry: assert magic == kMagic, 'invalid magic number=%d from %s' % (magic, s_addr[0]) slave.sendint(kMagic) self.rank = slave.recvint() + self.world_size = slave.recvint() self.jobid = slave.recvstr() self.cmd = slave.recvstr() @@ -188,32 +189,42 @@ class Tracker: ring_map[rlst[r]] = (rlst[rprev], rlst[rnext]) return ring_map def accept_slaves(self, nslave): - tree_map, parent_map = self.get_tree(nslave) - ring_map = self.get_ring(tree_map, parent_map) # set of nodes that finishs the job shutdown = {} # set of nodes that is waiting for connections wait_conn = {} - # set of nodes that is pending for getting up - todo_nodes = range(nslave) - random.shuffle(todo_nodes) # maps job id to rank job_map = {} # list of workers that is pending to be assigned rank pending = [] + # lazy initialize tree_map + tree_map = None while len(shutdown) != nslave: fd, s_addr = self.sock.accept() - s = SlaveEntry(fd, s_addr) + s = SlaveEntry(fd, s_addr) if s.cmd == 'shutdown': assert s.rank >= 0 and s.rank not in shutdown assert s.rank not in wait_conn shutdown[s.rank] = s + print 'Recieve %s signal from %d' % (s.cmd, s.rank) continue - assert s.cmd == 'start' or s.cmd == 'recover' + assert s.cmd == 'start' or s.cmd == 'recover' + # lazily initialize the slaves + if tree_map == None: + assert s.cmd == 'start' + print s.world_size + if s.world_size > 0: + nslave = s.world_size + tree_map, parent_map = self.get_tree(nslave) + ring_map = self.get_ring(tree_map, parent_map) + # set of nodes that is pending for getting up + todo_nodes = range(nslave) + random.shuffle(todo_nodes) + else: + assert s.world_size == -1 or s.world_size == nslave if s.cmd == 'recover': assert s.rank >= 0 - print 'Recieve %s signal from %d' % (s.cmd, s.rank) rank = s.decide_rank(job_map) if rank == -1: assert len(todo_nodes) != 0 @@ -221,6 +232,10 @@ class Tracker: if s.jobid != 'NULL': job_map[s.jobid] = rank s.assign_rank(rank, wait_conn, tree_map, parent_map, ring_map) + if s.cmd != 'start': + print 'Recieve %s signal from %d' % (s.cmd, s.rank) + else: + print 'Recieve %s signal from %s assign rank %d' % (s.cmd, s.host, s.rank) if s.wait_accept > 0: wait_conn[rank] = s print 'All nodes finishes job' @@ -234,5 +249,5 @@ def submit(nslave, args, fun_submit = mpi_submit): master = Tracker() submit_thread = Thread(target = fun_submit, args = (nslave, args + master.slave_args())) submit_thread.start() - master.accept_slaves(nslave) + master.accept_slaves(nslaves) submit_thread.join()