add temporal solution, run_yarn_prog.py
This commit is contained in:
parent
e5a9e31d13
commit
d25de54008
@ -6,13 +6,13 @@ then
|
||||
fi
|
||||
|
||||
# put the local training file to HDFS
|
||||
#hadoop fs -rm -r -f $2/data
|
||||
hadoop fs -rm -r -f $2/mushroom.linear.model
|
||||
#hadoop fs -mkdir $2/data
|
||||
|
||||
hadoop fs -mkdir $2/data
|
||||
hadoop fs -put ../data/agaricus.txt.train $2/data
|
||||
|
||||
# submit to hadoop
|
||||
../../tracker/rabit_yarn.py -n $1 --vcores 1 linear.rabit hdfs://$2/data/agaricus.txt.train model_out=hdfs://$2/mushroom.linear.model "${*:3}"
|
||||
../../tracker/rabit_yarn.py -n $1 --vcores 1 ../../yarn/run_yarn_prog.py ./linear.rabit hdfs://$2/data/agaricus.txt.train model_out=hdfs://$2/mushroom.linear.model "${*:3}"
|
||||
|
||||
# get the final model file
|
||||
hadoop fs -get $2/mushroom.linear.model ./linear.model
|
||||
|
||||
@ -96,7 +96,7 @@ def submit_yarn(nworker, worker_args, worker_env):
|
||||
if i == 0:
|
||||
args.command[i] = './' + args.command[i].split('/')[-1]
|
||||
else:
|
||||
args.command[i] = args.command[i].split('/')[-1]
|
||||
args.command[i] = './' + args.command[i].split('/')[-1]
|
||||
if args.command[0].endswith('.py'):
|
||||
flst = [WRAPPER_PATH + '/rabit.py',
|
||||
WRAPPER_PATH + '/librabit_wrapper.so',
|
||||
|
||||
25
yarn/run_yarn_prog.py
Executable file
25
yarn/run_yarn_prog.py
Executable file
@ -0,0 +1,25 @@
|
||||
#!/usr/bin/env python
|
||||
"""
|
||||
this script helps setup classpath env for HDFS
|
||||
"""
|
||||
import glob
|
||||
import sys
|
||||
import os
|
||||
import subprocess
|
||||
|
||||
if len(sys.argv) < 2:
|
||||
print 'Usage: the program you want to run'
|
||||
|
||||
hadoop_home = os.getenv('HADOOP_HOME')
|
||||
if hadoop_home is None:
|
||||
hadoop_home = os.getenv('HADOOP_PREFIX')
|
||||
assert hadoop_home is not None, 'need to set HADOOP_HOME'
|
||||
|
||||
(classpath, err) = subprocess.Popen('%s/bin/hadoop classpath' % hadoop_home, shell = True, stdout=subprocess.PIPE, env = os.environ).communicate()
|
||||
cpath = []
|
||||
for f in classpath.split(':'):
|
||||
cpath += glob.glob(f)
|
||||
|
||||
env = os.environ.copy()
|
||||
env['CLASSPATH'] = '${CLASSPATH}:' + (':'.join(cpath))
|
||||
subprocess.check_call(' '.join(sys.argv[1:]), shell = True, env = env)
|
||||
@ -283,11 +283,10 @@ public class ApplicationMaster {
|
||||
|
||||
String cmd =
|
||||
// use this to setup CLASSPATH correctly for libhdfs
|
||||
"CLASSPATH=${CLASSPATH}:`" + hadoop + " classpath --glob` "
|
||||
+ this.command + " 1>"
|
||||
+ ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout"
|
||||
+ " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR
|
||||
+ "/stderr";
|
||||
this.command + " 1>"
|
||||
+ ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout"
|
||||
+ " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR
|
||||
+ "/stderr";
|
||||
ctx.setCommands(Collections.singletonList(cmd));
|
||||
LOG.info(workerResources);
|
||||
ctx.setLocalResources(this.workerResources);
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user