From 6b30fb2bea6475a1f3cf205d8632031173eebd1c Mon Sep 17 00:00:00 2001 From: tqchen Date: Sat, 10 Jan 2015 09:58:10 -0800 Subject: [PATCH] update cache script --- tracker/rabit_hadoop.py | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/tracker/rabit_hadoop.py b/tracker/rabit_hadoop.py index e9b53afbd..da9e161ce 100755 --- a/tracker/rabit_hadoop.py +++ b/tracker/rabit_hadoop.py @@ -41,7 +41,9 @@ parser.add_argument('-ac', '--auto_file_cache', default=1, choices=[0, 1], type= help = 'whether automatically cache the files in the command to hadoop localfile, this is on by default') parser.add_argument('-f', '--files', nargs = '*', help = 'the cached file list in mapreduce,'\ - ' the submission script will automatically cache all the files which appears in command.'\ + ' the submission script will automatically cache all the files which appears in command to local folder'\ + ' This will also cause rewritten of all the file names in the command to current path,'\ + ' for example `../../kmeans ../kmeans.conf` will be rewritten to ./kmeans kmeans.conf because the two files are cached to running folder.'\ ' You may need this option to cache additional files.'\ ' You can also use it to manually cache files when auto_file_cache is off') parser.add_argument('--jobname', help = 'customize jobname in tracker') @@ -66,15 +68,20 @@ if args.jobname is None: args.jobname = ('Rabit(nworker=%d):' % args.nworker) + args.command[0].split('/')[-1]; def hadoop_streaming(nworker, worker_args): + fset = set() + if args.auto_file_cache: + for i in range(len(args.command)): + f = args.command[i] + if os.path.exists(f): + fset.add(f) + if i == 0: + args.command[i] = './' + args.command[i].split('/')[-1] + else: + args.command[i] = args.command[i].split('/')[-1] cmd = '%s jar %s -D mapred.map.tasks=%d' % (args.hadoop_binary, args.hadoop_streaming_jar, nworker) cmd += ' -D mapred.job.name=%s' % (args.jobname) cmd += ' -input %s -output %s' % (args.input, args.output) cmd += ' -mapper \"%s\" -reducer \"/bin/cat\" ' % (' '.join(args.command + worker_args)) - fset = set() - if args.auto_file_cache: - for f in args.command: - if os.path.exists(f): - fset.add(f) if args.files != None: for flst in args.files: for f in flst.split('#'):