diff --git a/tracker/rabit_hadoop.py b/tracker/rabit_hadoop.py index 8ecb2c04d..ecd05ce1f 100755 --- a/tracker/rabit_hadoop.py +++ b/tracker/rabit_hadoop.py @@ -8,6 +8,7 @@ import sys import os import time import subprocess +import warnings import rabit_tracker as tracker @@ -26,8 +27,8 @@ if hadoop_home != None: assert os.path.exists(hadoop_streaming_jar), "HADDOP_HOME does not contain the haddop streaming jar" if hadoop_binary == None or hadoop_streaming_jar == None: - print 'Warning: Cannot auto-detect path to hadoop and hadoop-streaming jar, need to set them via arguments -hs and -hb' - print '\tTo enable auto-detection, you can set enviroment variable HADOOP_HOME or modify rabit_hadoop.py line 14' + warnings.warn('Warning: Cannot auto-detect path to hadoop and hadoop-streaming jar, need to set them via arguments -hs and -hb\n'\ + '\tTo enable auto-detection, you can set enviroment variable HADOOP_HOME or modify rabit_hadoop.py line 14') parser = argparse.ArgumentParser(description='Rabit script to submit rabit jobs using Hadoop Streaming.'\ 'This script support both Hadoop 1.0 and Yarn(MRv2), Yarn is recommended') @@ -79,7 +80,17 @@ args = parser.parse_args() if args.jobname == 'auto': args.jobname = ('Rabit[nworker=%d]:' % args.nworker) + args.command[0].split('/')[-1]; -def hadoop_streaming(nworker, worker_args, yarn = False): +# detech hadoop version +(out, err) = subprocess.Popen('%s version' % args.hadoop_binary, shell = True, stdout=subprocess.PIPE).communicate() +out = out.split('\n')[0].split() +assert out[0] == 'Hadoop', 'cannot parse hadoop version string' +hadoop_version = out[1].split('.') +use_yarn = int(hadoop_version[0]) >= 2 + +if not use_yarn: + print 'Current Hadoop Version is %s' % out[1] + +def hadoop_streaming(nworker, worker_args, use_yarn): fset = set() if args.auto_file_cache: for i in range(len(args.command)): @@ -92,7 +103,7 @@ def hadoop_streaming(nworker, worker_args, yarn = False): args.command[i] = args.command[i].split('/')[-1] kmap = {} # setup keymaps - if yarn: + if use_yarn: kmap['nworker'] = 'mapreduce.job.maps' kmap['jobname'] = 'mapreduce.job.name' kmap['nthread'] = 'mapreduce.map.cpu.vcores' @@ -108,9 +119,11 @@ def hadoop_streaming(nworker, worker_args, yarn = False): cmd += ' -D%s=%d' % (kmap['nworker'], nworker) cmd += ' -D%s=%s' % (kmap['jobname'], args.jobname) if args.nthread != -1: - assert kmap['nthread'] is not None, 'nthread can only be set in Yarn(Hadoop 2.x) cluster'\ - 'it is recommended to use Yarn to submit rabit jobs' - cmd += ' -D%s=%d' % (kmap['ntread'], args.nthread) + if kmap['nthread'] is None: + warnings.warn('nthread can only be set in Yarn(Hadoop version greater than 2.0),'\ + 'it is recommended to use Yarn to submit rabit jobs') + else: + cmd += ' -D%s=%d' % (kmap['nthread'], args.nthread) cmd += ' -D%s=%d' % (kmap['timeout'], args.timeout) if args.memory_mb != -1: cmd += ' -D%s=%d' % (kmap['timeout'], args.timeout) @@ -126,6 +139,5 @@ def hadoop_streaming(nworker, worker_args, yarn = False): print cmd subprocess.check_call(cmd, shell = True) -if __name__ == 'main': - fun_submit = lambda nworker, worker_args: hadoop_streaming(nworker, worker_args, yarn=False) - tracker.submit(args.nworker, [], fun_submit = fun_submit, verbose = args.verbose) +fun_submit = lambda nworker, worker_args: hadoop_streaming(nworker, worker_args, int(hadoop_version[0]) >= 2) +tracker.submit(args.nworker, [], fun_submit = fun_submit, verbose = args.verbose) diff --git a/tracker/rabit_yarn.py b/tracker/rabit_yarn.py deleted file mode 100755 index ed80595eb..000000000 --- a/tracker/rabit_yarn.py +++ /dev/null @@ -1,10 +0,0 @@ -#!/usr/bin/python -""" -This is a script to submit rabit job using Yarn -submit the rabit process as mappers of MapReduce -""" -import rabit_hadoop - -if __name__ == 'main': - fun_submit = lambda nworker, worker_args: hadoop_streaming(nworker, worker_args, yarn=True) - tracker.submit(args.nworker, [], fun_submit = fun_submit, verbose = args.verbose)