cleanup submission script

This commit is contained in:
tqchen 2014-12-29 06:11:58 -08:00
parent 27d6977a3e
commit d64d0ef1dc
4 changed files with 101 additions and 68 deletions

View File

@ -10,17 +10,17 @@ endif
local_recover: local_recover:
../tracker/rabit_mpi.py $(nslave) local test_local_recover $(ndata) rabit_local_replica=1 ../tracker/rabit_mpi.py -n $(nslave) test_local_recover $(ndata) rabit_local_replica=1
local_recover_10_10k: local_recover_10_10k:
../tracker/rabit_mpi.py 10 local test_local_recover 10000 rabit_local_replica=1 ../tracker/rabit_mpi.py -n 10 test_local_recover 10000 rabit_local_replica=1
# this experiment test recovery with actually process exit, use keepalive to keep program alive # this experiment test recovery with actually process exit, use keepalive to keep program alive
model_recover_10_10k: model_recover_10_10k:
../tracker/rabit_mpi.py 10 local keepalive.sh test_model_recover 10000 mock=0,0,1,0 mock=1,1,1,0 ../tracker/rabit_mpi.py -n 10 keepalive.sh test_model_recover 10000 mock=0,0,1,0 mock=1,1,1,0
model_recover_10_10k_die_same: model_recover_10_10k_die_same:
../tracker/rabit_mpi.py 10 local keepalive.sh test_model_recover 10000 mock=0,0,1,0 mock=1,1,1,0 mock=0,1,1,0 mock=4,1,1,0 mock=9,1,1,0 ../tracker/rabit_mpi.py -n 10 keepalive.sh test_model_recover 10000 mock=0,0,1,0 mock=1,1,1,0 mock=0,1,1,0 mock=4,1,1,0 mock=9,1,1,0
model_recover_10_10k_die_hard: model_recover_10_10k_die_hard:
../tracker/rabit_mpi.py 10 local keepalive.sh test_model_recover 10000 mock=0,0,1,0 mock=1,1,1,0 mock=1,1,1,1 mock=0,1,1,0 mock=4,1,1,0 mock=9,1,1,0 mock=8,1,2,0 mock=4,1,3,0 ../tracker/rabit_mpi.py -n 10 keepalive.sh test_model_recover 10000 mock=0,0,1,0 mock=1,1,1,0 mock=1,1,1,1 mock=0,1,1,0 mock=4,1,1,0 mock=9,1,1,0 mock=8,1,2,0 mock=4,1,3,0

View File

@ -10,40 +10,63 @@ import time
import subprocess import subprocess
import rabit_tracker as tracker import rabit_tracker as tracker
#!!! you can directly set hadoop binary path and hadoop streaming path here #!!! Set path to hadoop and hadoop streaming jar here
hadoop_binary = 'hadoop' hadoop_binary = None
hadoop_streaming_jar = None hadoop_streaming_jar = None
parser = argparse.ArgumentParser(description='Rabit script to submit rabit jobs using hadoop streaming') # code
parser.add_argument('-s', '--nslaves', required=True, type=int, hadoop_home = os.getenv('HADOOP_HOME')
help = "number of slaves proccess to be launched") if hadoop_home != None:
if hadoop_binary == None: if hadoop_binary == None:
parser.add_argument('-hb', '--hadoop_binary', required=True, hadoop_binary = hadoop_home + '/bin/hadoop'
help="path-to-hadoop binary folder") if hadoop_streaming_jar == None:
if hadoop_streaming_jar == None: hadoop_streaming_jar = hadoop_home + '/bin/hadoop'
parser.add_argument('-hs', '--hadoop_streaming_jar', required=True,
help='path-to hadoop streamimg jar file')
parser.add_argument('-i', '--input', required=True)
parser.add_argument('-o', '--output', required=True)
parser.add_argument('-m', '--mapper', required=True)
parser.add_argument('-a', '--args', required=True)
parser.add_argument('-f', '--file', required=True)
args = parser.parse_args()
if hadoop_binary != None: if hadoop_binary == None or hadoop_streaming_jar == None:
args.hadoop_binary = hadoop_binary print 'Warning: Cannot auto-detect path to hadoop and streaming jar, need to set them via arguments -hs and -hb'
if hadoop_streaming_jar != None: print '\tTo enable auto-detection, you can set enviroment variable HADOOP_HOME or modify rabit_hadoop.py line 14'
args.hadoop_streaming_jar = hadoop_streaming_jar
parser = argparse.ArgumentParser(description='Rabit script to submit rabit jobs using Hadoop Streaming')
parser.add_argument('-n', '--nslaves', required=True, type=int,
help = 'number of slaves proccess to be launched')
parser.add_argument('-v', '--verbose', default=0, choices=[0, 1], type=int,
help = 'print more messages into the console')
parser.add_argument('-i', '--input', required=True,
help = 'input path in HDFS')
parser.add_argument('-o', '--output', required=True,
help = 'output path in HDFS')
parser.add_argument('-f', '--files', nargs = '*',
help = 'the cached file list in mapreduce')
parser.add_argument('command', nargs='+',
help = 'command for rabit program')
if hadoop_binary == None:
parser.add_argument('-hb', '--hadoop_binary', required = True,
help="path-to-hadoop binary folder")
else:
parser.add_argument('-hb', '--hadoop_binary', default = hadoop_binary,
help="path-to-hadoop binary folder")
if hadoop_streaming_jar == None:
parser.add_argument('-jar', '--hadoop_streaming_jar', required = True,
help='path-to hadoop streamimg jar file')
else:
parser.add_argument('-jar', '--hadoop_streaming_jar', default = hadoop_streaming_jar,
help='path-to hadoop streamimg jar file')
args = parser.parse_args()
def hadoop_streaming(nslaves, slave_args): def hadoop_streaming(nslaves, slave_args):
cmd = '%s jar %s -D mapred.map.tasks=%d' % (args.hadoop_binary, args.hadoop_streaming_jar, nslaves) cmd = '%s jar %s -D mapred.map.tasks=%d' % (args.hadoop_binary, args.hadoop_streaming_jar, nslaves)
cmd += ' -input %s -output %s' % (args.input, args.output) cmd += ' -input %s -output %s' % (args.input, args.output)
cmd += ' -mapper \"%s %s %s\" -reducer \"/bin/cat\" ' % (args.mapper, args.args, ' '.join(slave_args)) cmd += ' -mapper \"%s\" -reducer \"/bin/cat\" ' % (' '.join(args.command + slave_args))
for f in args.file.split('#'): fset = set()
cmd += ' -file %s' % (f) if os.path.exists(args.command[0]):
fset.add(args.command[0])
for flst in args.files:
for f in flst.split('#'):
fset.add(f)
for f in fset:
cmd += ' -file %s' % f
print cmd print cmd
subprocess.check_call(cmd, shell = True) subprocess.check_call(cmd, shell = True)
start = time.time() tracker.submit(args.nslaves, [], fun_submit = hadoop_streaming, verbose = args.verbose)
tracker.submit(args.nslaves, [], fun_submit= hadoop_streaming)
print 'All run took %s' % (time.time() - start)

View File

@ -1,21 +1,30 @@
#!/usr/bin/python #!/usr/bin/python
""" """
This is an example script to create a customized job submit with mpi This is the demo submission script of rabit, it is created to
script using rabit engine submit rabit jobs using hadoop streaming
""" """
import argparse
import sys import sys
import os import os
import subprocess import subprocess
# import the tcp_master.py
# add path to sync
sys.path.append(os.path.dirname(__file__)+'/src/')
import rabit_tracker as tracker import rabit_tracker as tracker
parser = argparse.ArgumentParser(description='Rabit script to submit rabit job using MPI')
parser.add_argument('-n', '--nslaves', required=True, type=int,
help = 'number of slaves proccess to be launched')
parser.add_argument('-v', '--verbose', default=0, choices=[0, 1], type=int,
help = 'print more messages into the console')
parser.add_argument('-H', '--hostfile', type=str,
help = 'the hostfile of mpi server')
parser.add_argument('command', nargs='+',
help = 'command for rabit program')
args = parser.parse_args()
# #
# Note: this submit script is only used for example purpose # Note: this submit script is only used for demo purpose
# It does not have to be mpirun, it can be any job submission script that starts the job, qsub, hadoop streaming etc. # It does not have to be mpirun, it can be any job submission
# # script that starts the job, qsub, hadoop streaming etc.
def mpi_submit(nslave, args): #
def mpi_submit(nslave, slave_args):
""" """
customized submit script, that submit nslave jobs, each must contain args as parameter customized submit script, that submit nslave jobs, each must contain args as parameter
note this can be a lambda function containing additional parameters in input note this can be a lambda function containing additional parameters in input
@ -24,17 +33,13 @@ def mpi_submit(nslave, args):
args arguments to launch each job args arguments to launch each job
this usually includes the parameters of master_uri and parameters passed into submit this usually includes the parameters of master_uri and parameters passed into submit
""" """
if args[0] == 'local': sargs = ' '.join(args.command + slave_args)
cmd = ' '.join(['mpirun -n %d' % (nslave)] + args[1:]) if args.hostfile is None:
cmd = ' '.join(['mpirun -n %d' % (nslave)] + args.command + slave_args)
else: else:
cmd = ' '.join(['mpirun -n %d --hostfile %s' % (nslave, args[0])] + args[1:]) ' '.join(['mpirun -n %d --hostfile %s' % (nslave, args.hostfile)] + args.command + slave_args)
print cmd print cmd
subprocess.check_call(cmd, shell = True) subprocess.check_call(cmd, shell = True)
if __name__ == '__main__': # call submit, with nslave, the commands to run each job and submit function
if len(sys.argv) < 2: tracker.submit(args.nslaves, [], fun_submit = mpi_submit, verbose = args.verbose)
print 'Usage: <nslave> <machine_file> <cmd>'
print 'if <machine_file> == local, we will run using local mode'
exit(0)
# call submit, with nslave, the commands to run each job and submit function
tracker.submit(int(sys.argv[1]), sys.argv[2:], fun_submit= mpi_submit)

View File

@ -122,7 +122,7 @@ class SlaveEntry:
return rmset return rmset
class Tracker: class Tracker:
def __init__(self, port = 9091, port_end = 9999): def __init__(self, port = 9091, port_end = 9999, verbose = True):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
for port in range(port, port_end): for port in range(port, port_end):
try: try:
@ -132,8 +132,9 @@ class Tracker:
except socket.error: except socket.error:
continue continue
sock.listen(16) sock.listen(16)
self.sock = sock self.sock = sock
print 'start listen on %s:%d' % (socket.gethostname(), self.port) self.verbose = verbose
self.log_print('start listen on %s:%d' % (socket.gethostname(), self.port), 1)
def __del__(self): def __del__(self):
self.sock.close() self.sock.close()
def slave_args(self): def slave_args(self):
@ -190,9 +191,12 @@ class Tracker:
return ring_map return ring_map
def handle_print(self,slave, msg): def handle_print(self,slave, msg):
sys.stdout.write(msg) sys.stdout.write(msg)
def log_print(self, msg): def log_print(self, msg, level):
sys.stderr.write(msg+'\n') if level == 1:
if self.verbose:
sys.stderr.write(msg + '\n')
else:
sys.stderr.write(msg + '\n')
def accept_slaves(self, nslave): def accept_slaves(self, nslave):
# set of nodes that finishs the job # set of nodes that finishs the job
shutdown = {} shutdown = {}
@ -216,13 +220,12 @@ class Tracker:
assert s.rank >= 0 and s.rank not in shutdown assert s.rank >= 0 and s.rank not in shutdown
assert s.rank not in wait_conn assert s.rank not in wait_conn
shutdown[s.rank] = s shutdown[s.rank] = s
self.log_print('Recieve %s signal from %d' % (s.cmd, s.rank)) self.log_print('Recieve %s signal from %d' % (s.cmd, s.rank), 1)
continue continue
assert s.cmd == 'start' or s.cmd == 'recover' assert s.cmd == 'start' or s.cmd == 'recover'
# lazily initialize the slaves # lazily initialize the slaves
if tree_map == None: if tree_map == None:
assert s.cmd == 'start' assert s.cmd == 'start'
print s.world_size
if s.world_size > 0: if s.world_size > 0:
nslave = s.world_size nslave = s.world_size
tree_map, parent_map = self.get_tree(nslave) tree_map, parent_map = self.get_tree(nslave)
@ -239,18 +242,20 @@ class Tracker:
assert len(todo_nodes) != 0 assert len(todo_nodes) != 0
rank = todo_nodes.pop(0) rank = todo_nodes.pop(0)
if s.jobid != 'NULL': if s.jobid != 'NULL':
job_map[s.jobid] = rank job_map[s.jobid] = rank
if len(todo_nodes) == 0:
self.log_print('@tracker All of %d nodes getting started' % nslave, 2)
s.assign_rank(rank, wait_conn, tree_map, parent_map, ring_map) s.assign_rank(rank, wait_conn, tree_map, parent_map, ring_map)
if s.cmd != 'start': if s.cmd != 'start':
self.log_print('Recieve %s signal from %d' % (s.cmd, s.rank)) self.log_print('Recieve %s signal from %d' % (s.cmd, s.rank), 1)
else: else:
self.log_print('Recieve %s signal from %s assign rank %d' % (s.cmd, s.host, s.rank)) self.log_print('Recieve %s signal from %s; assign rank %d' % (s.cmd, s.host, s.rank), 1)
if s.wait_accept > 0: if s.wait_accept > 0:
wait_conn[rank] = s wait_conn[rank] = s
self.log_print('All nodes finishes job') self.log_print('@tracker All nodes finishes job', 2)
def submit(nslave, args, fun_submit): def submit(nslave, args, fun_submit, verbose):
master = Tracker() master = Tracker(verbose = verbose)
submit_thread = Thread(target = fun_submit, args = (nslave, args + master.slave_args())) submit_thread = Thread(target = fun_submit, args = (nslave, args + master.slave_args()))
submit_thread.start() submit_thread.start()
master.accept_slaves(nslave) master.accept_slaves(nslave)