diff --git a/test/test.mk b/test/test.mk index 5f943103e..b1dddb0b4 100644 --- a/test/test.mk +++ b/test/test.mk @@ -10,17 +10,17 @@ endif local_recover: - ../tracker/rabit_mpi.py $(nslave) local test_local_recover $(ndata) rabit_local_replica=1 + ../tracker/rabit_mpi.py -n $(nslave) test_local_recover $(ndata) rabit_local_replica=1 local_recover_10_10k: - ../tracker/rabit_mpi.py 10 local test_local_recover 10000 rabit_local_replica=1 + ../tracker/rabit_mpi.py -n 10 test_local_recover 10000 rabit_local_replica=1 # this experiment test recovery with actually process exit, use keepalive to keep program alive model_recover_10_10k: - ../tracker/rabit_mpi.py 10 local keepalive.sh test_model_recover 10000 mock=0,0,1,0 mock=1,1,1,0 + ../tracker/rabit_mpi.py -n 10 keepalive.sh test_model_recover 10000 mock=0,0,1,0 mock=1,1,1,0 model_recover_10_10k_die_same: - ../tracker/rabit_mpi.py 10 local keepalive.sh test_model_recover 10000 mock=0,0,1,0 mock=1,1,1,0 mock=0,1,1,0 mock=4,1,1,0 mock=9,1,1,0 + ../tracker/rabit_mpi.py -n 10 keepalive.sh test_model_recover 10000 mock=0,0,1,0 mock=1,1,1,0 mock=0,1,1,0 mock=4,1,1,0 mock=9,1,1,0 model_recover_10_10k_die_hard: - ../tracker/rabit_mpi.py 10 local keepalive.sh test_model_recover 10000 mock=0,0,1,0 mock=1,1,1,0 mock=1,1,1,1 mock=0,1,1,0 mock=4,1,1,0 mock=9,1,1,0 mock=8,1,2,0 mock=4,1,3,0 + ../tracker/rabit_mpi.py -n 10 keepalive.sh test_model_recover 10000 mock=0,0,1,0 mock=1,1,1,0 mock=1,1,1,1 mock=0,1,1,0 mock=4,1,1,0 mock=9,1,1,0 mock=8,1,2,0 mock=4,1,3,0 diff --git a/tracker/rabit_hadoop.py b/tracker/rabit_hadoop.py index b86d91281..7bc855ec8 100755 --- a/tracker/rabit_hadoop.py +++ b/tracker/rabit_hadoop.py @@ -10,40 +10,63 @@ import time import subprocess import rabit_tracker as tracker -#!!! you can directly set hadoop binary path and hadoop streaming path here -hadoop_binary = 'hadoop' +#!!! Set path to hadoop and hadoop streaming jar here +hadoop_binary = None hadoop_streaming_jar = None -parser = argparse.ArgumentParser(description='Rabit script to submit rabit jobs using hadoop streaming') -parser.add_argument('-s', '--nslaves', required=True, type=int, - help = "number of slaves proccess to be launched") -if hadoop_binary == None: - parser.add_argument('-hb', '--hadoop_binary', required=True, - help="path-to-hadoop binary folder") -if hadoop_streaming_jar == None: - parser.add_argument('-hs', '--hadoop_streaming_jar', required=True, - help='path-to hadoop streamimg jar file') -parser.add_argument('-i', '--input', required=True) -parser.add_argument('-o', '--output', required=True) -parser.add_argument('-m', '--mapper', required=True) -parser.add_argument('-a', '--args', required=True) -parser.add_argument('-f', '--file', required=True) -args = parser.parse_args() +# code +hadoop_home = os.getenv('HADOOP_HOME') +if hadoop_home != None: + if hadoop_binary == None: + hadoop_binary = hadoop_home + '/bin/hadoop' + if hadoop_streaming_jar == None: + hadoop_streaming_jar = hadoop_home + '/bin/hadoop' -if hadoop_binary != None: - args.hadoop_binary = hadoop_binary -if hadoop_streaming_jar != None: - args.hadoop_streaming_jar = hadoop_streaming_jar +if hadoop_binary == None or hadoop_streaming_jar == None: + print 'Warning: Cannot auto-detect path to hadoop and streaming jar, need to set them via arguments -hs and -hb' + print '\tTo enable auto-detection, you can set enviroment variable HADOOP_HOME or modify rabit_hadoop.py line 14' + +parser = argparse.ArgumentParser(description='Rabit script to submit rabit jobs using Hadoop Streaming') +parser.add_argument('-n', '--nslaves', required=True, type=int, + help = 'number of slaves proccess to be launched') +parser.add_argument('-v', '--verbose', default=0, choices=[0, 1], type=int, + help = 'print more messages into the console') +parser.add_argument('-i', '--input', required=True, + help = 'input path in HDFS') +parser.add_argument('-o', '--output', required=True, + help = 'output path in HDFS') +parser.add_argument('-f', '--files', nargs = '*', + help = 'the cached file list in mapreduce') +parser.add_argument('command', nargs='+', + help = 'command for rabit program') +if hadoop_binary == None: + parser.add_argument('-hb', '--hadoop_binary', required = True, + help="path-to-hadoop binary folder") +else: + parser.add_argument('-hb', '--hadoop_binary', default = hadoop_binary, + help="path-to-hadoop binary folder") + +if hadoop_streaming_jar == None: + parser.add_argument('-jar', '--hadoop_streaming_jar', required = True, + help='path-to hadoop streamimg jar file') +else: + parser.add_argument('-jar', '--hadoop_streaming_jar', default = hadoop_streaming_jar, + help='path-to hadoop streamimg jar file') +args = parser.parse_args() def hadoop_streaming(nslaves, slave_args): cmd = '%s jar %s -D mapred.map.tasks=%d' % (args.hadoop_binary, args.hadoop_streaming_jar, nslaves) cmd += ' -input %s -output %s' % (args.input, args.output) - cmd += ' -mapper \"%s %s %s\" -reducer \"/bin/cat\" ' % (args.mapper, args.args, ' '.join(slave_args)) - for f in args.file.split('#'): - cmd += ' -file %s' % (f) + cmd += ' -mapper \"%s\" -reducer \"/bin/cat\" ' % (' '.join(args.command + slave_args)) + fset = set() + if os.path.exists(args.command[0]): + fset.add(args.command[0]) + for flst in args.files: + for f in flst.split('#'): + fset.add(f) + for f in fset: + cmd += ' -file %s' % f print cmd subprocess.check_call(cmd, shell = True) -start = time.time() -tracker.submit(args.nslaves, [], fun_submit= hadoop_streaming) -print 'All run took %s' % (time.time() - start) +tracker.submit(args.nslaves, [], fun_submit = hadoop_streaming, verbose = args.verbose) diff --git a/tracker/rabit_mpi.py b/tracker/rabit_mpi.py index 3b2b68c54..d8aa968f5 100755 --- a/tracker/rabit_mpi.py +++ b/tracker/rabit_mpi.py @@ -1,21 +1,30 @@ #!/usr/bin/python """ -This is an example script to create a customized job submit with mpi -script using rabit engine +This is the demo submission script of rabit, it is created to +submit rabit jobs using hadoop streaming """ +import argparse import sys import os import subprocess -# import the tcp_master.py -# add path to sync -sys.path.append(os.path.dirname(__file__)+'/src/') import rabit_tracker as tracker +parser = argparse.ArgumentParser(description='Rabit script to submit rabit job using MPI') +parser.add_argument('-n', '--nslaves', required=True, type=int, + help = 'number of slaves proccess to be launched') +parser.add_argument('-v', '--verbose', default=0, choices=[0, 1], type=int, + help = 'print more messages into the console') +parser.add_argument('-H', '--hostfile', type=str, + help = 'the hostfile of mpi server') +parser.add_argument('command', nargs='+', + help = 'command for rabit program') +args = parser.parse_args() # -# Note: this submit script is only used for example purpose -# It does not have to be mpirun, it can be any job submission script that starts the job, qsub, hadoop streaming etc. -# -def mpi_submit(nslave, args): +# Note: this submit script is only used for demo purpose +# It does not have to be mpirun, it can be any job submission +# script that starts the job, qsub, hadoop streaming etc. +# +def mpi_submit(nslave, slave_args): """ customized submit script, that submit nslave jobs, each must contain args as parameter note this can be a lambda function containing additional parameters in input @@ -24,17 +33,13 @@ def mpi_submit(nslave, args): args arguments to launch each job this usually includes the parameters of master_uri and parameters passed into submit """ - if args[0] == 'local': - cmd = ' '.join(['mpirun -n %d' % (nslave)] + args[1:]) + sargs = ' '.join(args.command + slave_args) + if args.hostfile is None: + cmd = ' '.join(['mpirun -n %d' % (nslave)] + args.command + slave_args) else: - cmd = ' '.join(['mpirun -n %d --hostfile %s' % (nslave, args[0])] + args[1:]) + ' '.join(['mpirun -n %d --hostfile %s' % (nslave, args.hostfile)] + args.command + slave_args) print cmd subprocess.check_call(cmd, shell = True) -if __name__ == '__main__': - if len(sys.argv) < 2: - print 'Usage: ' - print 'if == local, we will run using local mode' - exit(0) - # call submit, with nslave, the commands to run each job and submit function - tracker.submit(int(sys.argv[1]), sys.argv[2:], fun_submit= mpi_submit) +# call submit, with nslave, the commands to run each job and submit function +tracker.submit(args.nslaves, [], fun_submit = mpi_submit, verbose = args.verbose) diff --git a/tracker/rabit_tracker.py b/tracker/rabit_tracker.py index 0322edf5b..9823ef426 100644 --- a/tracker/rabit_tracker.py +++ b/tracker/rabit_tracker.py @@ -122,7 +122,7 @@ class SlaveEntry: return rmset class Tracker: - def __init__(self, port = 9091, port_end = 9999): + def __init__(self, port = 9091, port_end = 9999, verbose = True): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) for port in range(port, port_end): try: @@ -132,8 +132,9 @@ class Tracker: except socket.error: continue sock.listen(16) - self.sock = sock - print 'start listen on %s:%d' % (socket.gethostname(), self.port) + self.sock = sock + self.verbose = verbose + self.log_print('start listen on %s:%d' % (socket.gethostname(), self.port), 1) def __del__(self): self.sock.close() def slave_args(self): @@ -190,9 +191,12 @@ class Tracker: return ring_map def handle_print(self,slave, msg): sys.stdout.write(msg) - def log_print(self, msg): - sys.stderr.write(msg+'\n') - + def log_print(self, msg, level): + if level == 1: + if self.verbose: + sys.stderr.write(msg + '\n') + else: + sys.stderr.write(msg + '\n') def accept_slaves(self, nslave): # set of nodes that finishs the job shutdown = {} @@ -216,13 +220,12 @@ class Tracker: assert s.rank >= 0 and s.rank not in shutdown assert s.rank not in wait_conn shutdown[s.rank] = s - self.log_print('Recieve %s signal from %d' % (s.cmd, s.rank)) + self.log_print('Recieve %s signal from %d' % (s.cmd, s.rank), 1) continue - assert s.cmd == 'start' or s.cmd == 'recover' + assert s.cmd == 'start' or s.cmd == 'recover' # lazily initialize the slaves if tree_map == None: assert s.cmd == 'start' - print s.world_size if s.world_size > 0: nslave = s.world_size tree_map, parent_map = self.get_tree(nslave) @@ -239,18 +242,20 @@ class Tracker: assert len(todo_nodes) != 0 rank = todo_nodes.pop(0) if s.jobid != 'NULL': - job_map[s.jobid] = rank + job_map[s.jobid] = rank + if len(todo_nodes) == 0: + self.log_print('@tracker All of %d nodes getting started' % nslave, 2) s.assign_rank(rank, wait_conn, tree_map, parent_map, ring_map) - if s.cmd != 'start': - self.log_print('Recieve %s signal from %d' % (s.cmd, s.rank)) + if s.cmd != 'start': + self.log_print('Recieve %s signal from %d' % (s.cmd, s.rank), 1) else: - self.log_print('Recieve %s signal from %s assign rank %d' % (s.cmd, s.host, s.rank)) + self.log_print('Recieve %s signal from %s; assign rank %d' % (s.cmd, s.host, s.rank), 1) if s.wait_accept > 0: - wait_conn[rank] = s - self.log_print('All nodes finishes job') + wait_conn[rank] = s + self.log_print('@tracker All nodes finishes job', 2) -def submit(nslave, args, fun_submit): - master = Tracker() +def submit(nslave, args, fun_submit, verbose): + master = Tracker(verbose = verbose) submit_thread = Thread(target = fun_submit, args = (nslave, args + master.slave_args())) submit_thread.start() master.accept_slaves(nslave)