From bdfa1a0220778c72665aabd548951c2e40a29e57 Mon Sep 17 00:00:00 2001 From: tqchen Date: Mon, 29 Dec 2014 18:42:24 -0800 Subject: [PATCH] change nslave to nworker --- tracker/rabit_hadoop.py | 23 ++++++++++++++--------- tracker/rabit_mpi.py | 6 +++--- 2 files changed, 17 insertions(+), 12 deletions(-) diff --git a/tracker/rabit_hadoop.py b/tracker/rabit_hadoop.py index 0d2a33b90..70f3aea9b 100755 --- a/tracker/rabit_hadoop.py +++ b/tracker/rabit_hadoop.py @@ -29,20 +29,21 @@ if hadoop_binary == None or hadoop_streaming_jar == None: print '\tTo enable auto-detection, you can set enviroment variable HADOOP_HOME or modify rabit_hadoop.py line 14' parser = argparse.ArgumentParser(description='Rabit script to submit rabit jobs using Hadoop Streaming') -parser.add_argument('-n', '--nslaves', required=True, type=int, - help = 'number of slaves proccess to be launched') -parser.add_argument('-v', '--verbose', default=0, choices=[0, 1], type=int, - help = 'print more messages into the console') -parser.add_argument('-ac', '--auto_file_cache', default=1, choices=[0, 1], type=int, - help = 'whether automatically cache the files in the command to hadoop localfile, this is on by defaultz') +parser.add_argument('-n', '--nworker', required=True, type=int, + help = 'number of worker proccess to be launched') parser.add_argument('-i', '--input', required=True, help = 'input path in HDFS') parser.add_argument('-o', '--output', required=True, help = 'output path in HDFS') +parser.add_argument('-v', '--verbose', default=0, choices=[0, 1], type=int, + help = 'print more messages into the console') +parser.add_argument('-ac', '--auto_file_cache', default=1, choices=[0, 1], type=int, + help = 'whether automatically cache the files in the command to hadoop localfile, this is on by default') parser.add_argument('-f', '--files', nargs = '*', help = 'the cached file list in mapreduce,'\ ' the submission script will automatically cache all the files which appears in command.'\ ' you may need this option to cache additional files, or manually cache files when auto_file_cache is off') +parser.add_argument('--jobname', help = 'customize jobname in tracker') if hadoop_binary == None: parser.add_argument('-hb', '--hadoop_binary', required = True, help="path-to-hadoop binary folder") @@ -60,8 +61,12 @@ parser.add_argument('command', nargs='+', help = 'command for rabit program') args = parser.parse_args() -def hadoop_streaming(nslaves, slave_args): - cmd = '%s jar %s -D mapred.map.tasks=%d' % (args.hadoop_binary, args.hadoop_streaming_jar, nslaves) +if args.jobname is None: + args.jobname = ('Rabit(nworker=%d):' % args.nworker) + args.command[0].split('/')[-1]; + +def hadoop_streaming(nworker, slave_args): + cmd = '%s jar %s -D mapred.map.tasks=%d' % (args.hadoop_binary, args.hadoop_streaming_jar, nworker) + cmd += ' -D mapred.job.name=%d' % (a) cmd += ' -input %s -output %s' % (args.input, args.output) cmd += ' -mapper \"%s\" -reducer \"/bin/cat\" ' % (' '.join(args.command + slave_args)) fset = set() @@ -77,4 +82,4 @@ def hadoop_streaming(nslaves, slave_args): print cmd subprocess.check_call(cmd, shell = True) -tracker.submit(args.nslaves, [], fun_submit = hadoop_streaming, verbose = args.verbose) +tracker.submit(args.nworker, [], fun_submit = hadoop_streaming, verbose = args.verbose) diff --git a/tracker/rabit_mpi.py b/tracker/rabit_mpi.py index d8aa968f5..662c173bc 100755 --- a/tracker/rabit_mpi.py +++ b/tracker/rabit_mpi.py @@ -10,8 +10,8 @@ import subprocess import rabit_tracker as tracker parser = argparse.ArgumentParser(description='Rabit script to submit rabit job using MPI') -parser.add_argument('-n', '--nslaves', required=True, type=int, - help = 'number of slaves proccess to be launched') +parser.add_argument('-n', '--nworker', required=True, type=int, + help = 'number of worker proccess to be launched') parser.add_argument('-v', '--verbose', default=0, choices=[0, 1], type=int, help = 'print more messages into the console') parser.add_argument('-H', '--hostfile', type=str, @@ -42,4 +42,4 @@ def mpi_submit(nslave, slave_args): subprocess.check_call(cmd, shell = True) # call submit, with nslave, the commands to run each job and submit function -tracker.submit(args.nslaves, [], fun_submit = mpi_submit, verbose = args.verbose) +tracker.submit(args.nworker, [], fun_submit = mpi_submit, verbose = args.verbose)