This commit is contained in:
tqchen 2015-01-11 11:15:12 -08:00
parent 78bfe867e6
commit bfc3f61010
2 changed files with 4 additions and 3 deletions

View File

@ -108,7 +108,8 @@ def hadoop_streaming(nworker, worker_args, yarn = False):
cmd += ' -D%s=%d' % (kmap['nworker'], nworker) cmd += ' -D%s=%d' % (kmap['nworker'], nworker)
cmd += ' -D%s=%s' % (kmap['jobname'], args.jobname) cmd += ' -D%s=%s' % (kmap['jobname'], args.jobname)
if args.nthread != -1: if args.nthread != -1:
assert kmap['nthread'] is not None, "nthread can only be set in Yarn cluster, it is highly recommended to " assert kmap['nthread'] is not None, 'nthread can only be set in Yarn(Hadoop 2.x) cluster'\
'it is recommended to use Yarn to submit rabit jobs'
cmd += ' -D%s=%d' % (kmap['ntread'], args.nthread) cmd += ' -D%s=%d' % (kmap['ntread'], args.nthread)
cmd += ' -D%s=%d' % (kmap['timeout'], args.timeout) cmd += ' -D%s=%d' % (kmap['timeout'], args.timeout)
if args.memory_mb != -1: if args.memory_mb != -1:
@ -126,5 +127,5 @@ def hadoop_streaming(nworker, worker_args, yarn = False):
subprocess.check_call(cmd, shell = True) subprocess.check_call(cmd, shell = True)
if __name__ == 'main': if __name__ == 'main':
fun_submit = lambda nworker, worker_args: hadoop_streaming(nworker, worker_args, False) fun_submit = lambda nworker, worker_args: hadoop_streaming(nworker, worker_args, yarn=False)
tracker.submit(args.nworker, [], fun_submit = fun_submit, verbose = args.verbose) tracker.submit(args.nworker, [], fun_submit = fun_submit, verbose = args.verbose)

View File

@ -6,5 +6,5 @@ submit the rabit process as mappers of MapReduce
import rabit_hadoop import rabit_hadoop
if __name__ == 'main': if __name__ == 'main':
fun_submit = lambda nworker, worker_args: hadoop_streaming(nworker, worker_args, True) fun_submit = lambda nworker, worker_args: hadoop_streaming(nworker, worker_args, yarn=True)
tracker.submit(args.nworker, [], fun_submit = fun_submit, verbose = args.verbose) tracker.submit(args.nworker, [], fun_submit = fun_submit, verbose = args.verbose)