use unified script, auto detect hadoop version

This commit is contained in:
tqchen 2015-01-11 11:46:12 -08:00
parent bfc3f61010
commit db2ebf7410
2 changed files with 22 additions and 20 deletions

View File

@ -8,6 +8,7 @@ import sys
import os import os
import time import time
import subprocess import subprocess
import warnings
import rabit_tracker as tracker import rabit_tracker as tracker
@ -26,8 +27,8 @@ if hadoop_home != None:
assert os.path.exists(hadoop_streaming_jar), "HADDOP_HOME does not contain the haddop streaming jar" assert os.path.exists(hadoop_streaming_jar), "HADDOP_HOME does not contain the haddop streaming jar"
if hadoop_binary == None or hadoop_streaming_jar == None: if hadoop_binary == None or hadoop_streaming_jar == None:
print 'Warning: Cannot auto-detect path to hadoop and hadoop-streaming jar, need to set them via arguments -hs and -hb' warnings.warn('Warning: Cannot auto-detect path to hadoop and hadoop-streaming jar, need to set them via arguments -hs and -hb\n'\
print '\tTo enable auto-detection, you can set enviroment variable HADOOP_HOME or modify rabit_hadoop.py line 14' '\tTo enable auto-detection, you can set enviroment variable HADOOP_HOME or modify rabit_hadoop.py line 14')
parser = argparse.ArgumentParser(description='Rabit script to submit rabit jobs using Hadoop Streaming.'\ parser = argparse.ArgumentParser(description='Rabit script to submit rabit jobs using Hadoop Streaming.'\
'This script support both Hadoop 1.0 and Yarn(MRv2), Yarn is recommended') 'This script support both Hadoop 1.0 and Yarn(MRv2), Yarn is recommended')
@ -79,7 +80,17 @@ args = parser.parse_args()
if args.jobname == 'auto': if args.jobname == 'auto':
args.jobname = ('Rabit[nworker=%d]:' % args.nworker) + args.command[0].split('/')[-1]; args.jobname = ('Rabit[nworker=%d]:' % args.nworker) + args.command[0].split('/')[-1];
def hadoop_streaming(nworker, worker_args, yarn = False): # detech hadoop version
(out, err) = subprocess.Popen('%s version' % args.hadoop_binary, shell = True, stdout=subprocess.PIPE).communicate()
out = out.split('\n')[0].split()
assert out[0] == 'Hadoop', 'cannot parse hadoop version string'
hadoop_version = out[1].split('.')
use_yarn = int(hadoop_version[0]) >= 2
if not use_yarn:
print 'Current Hadoop Version is %s' % out[1]
def hadoop_streaming(nworker, worker_args, use_yarn):
fset = set() fset = set()
if args.auto_file_cache: if args.auto_file_cache:
for i in range(len(args.command)): for i in range(len(args.command)):
@ -92,7 +103,7 @@ def hadoop_streaming(nworker, worker_args, yarn = False):
args.command[i] = args.command[i].split('/')[-1] args.command[i] = args.command[i].split('/')[-1]
kmap = {} kmap = {}
# setup keymaps # setup keymaps
if yarn: if use_yarn:
kmap['nworker'] = 'mapreduce.job.maps' kmap['nworker'] = 'mapreduce.job.maps'
kmap['jobname'] = 'mapreduce.job.name' kmap['jobname'] = 'mapreduce.job.name'
kmap['nthread'] = 'mapreduce.map.cpu.vcores' kmap['nthread'] = 'mapreduce.map.cpu.vcores'
@ -108,9 +119,11 @@ def hadoop_streaming(nworker, worker_args, yarn = False):
cmd += ' -D%s=%d' % (kmap['nworker'], nworker) cmd += ' -D%s=%d' % (kmap['nworker'], nworker)
cmd += ' -D%s=%s' % (kmap['jobname'], args.jobname) cmd += ' -D%s=%s' % (kmap['jobname'], args.jobname)
if args.nthread != -1: if args.nthread != -1:
assert kmap['nthread'] is not None, 'nthread can only be set in Yarn(Hadoop 2.x) cluster'\ if kmap['nthread'] is None:
'it is recommended to use Yarn to submit rabit jobs' warnings.warn('nthread can only be set in Yarn(Hadoop version greater than 2.0),'\
cmd += ' -D%s=%d' % (kmap['ntread'], args.nthread) 'it is recommended to use Yarn to submit rabit jobs')
else:
cmd += ' -D%s=%d' % (kmap['nthread'], args.nthread)
cmd += ' -D%s=%d' % (kmap['timeout'], args.timeout) cmd += ' -D%s=%d' % (kmap['timeout'], args.timeout)
if args.memory_mb != -1: if args.memory_mb != -1:
cmd += ' -D%s=%d' % (kmap['timeout'], args.timeout) cmd += ' -D%s=%d' % (kmap['timeout'], args.timeout)
@ -126,6 +139,5 @@ def hadoop_streaming(nworker, worker_args, yarn = False):
print cmd print cmd
subprocess.check_call(cmd, shell = True) subprocess.check_call(cmd, shell = True)
if __name__ == 'main': fun_submit = lambda nworker, worker_args: hadoop_streaming(nworker, worker_args, int(hadoop_version[0]) >= 2)
fun_submit = lambda nworker, worker_args: hadoop_streaming(nworker, worker_args, yarn=False)
tracker.submit(args.nworker, [], fun_submit = fun_submit, verbose = args.verbose) tracker.submit(args.nworker, [], fun_submit = fun_submit, verbose = args.verbose)

View File

@ -1,10 +0,0 @@
#!/usr/bin/python
"""
This is a script to submit rabit job using Yarn
submit the rabit process as mappers of MapReduce
"""
import rabit_hadoop
if __name__ == 'main':
fun_submit = lambda nworker, worker_args: hadoop_streaming(nworker, worker_args, yarn=True)
tracker.submit(args.nworker, [], fun_submit = fun_submit, verbose = args.verbose)