diff --git a/rabit-learn/linear/README.md b/rabit-learn/linear/README.md index 68d132b66..b2ee4e0bb 100644 --- a/rabit-learn/linear/README.md +++ b/rabit-learn/linear/README.md @@ -2,8 +2,10 @@ Linear and Logistic Regression ==== * input format: LibSVM * Local Example: [run-linear.sh](run-linear.sh) -* Runnig on Hadoop: [run-hadoop.sh](run-hadoop.sh) - - Set input data to stdin, and model_out=stdout +* Runnig on YARN: [run-yarn.sh](run-yarn.sh) + - You will need to have YARN + - Modify ```../make/config.mk``` to set USE_HDFS=1 + - Run build.sh on [../../yarn](../../yarn) on to build yarn jar file Parameters === diff --git a/rabit-learn/linear/linear.cc b/rabit-learn/linear/linear.cc index b566adf17..53399875c 100644 --- a/rabit-learn/linear/linear.cc +++ b/rabit-learn/linear/linear.cc @@ -54,7 +54,9 @@ class LinearObjFunction : public solver::IObjFunction { } if (task == "train") { lbfgs.Run(); - this->SaveModel(model_out.c_str(), lbfgs.GetWeight()); + if (rabit::GetRank() == 0) { + this->SaveModel(model_out.c_str(), lbfgs.GetWeight()); + } } else if (task == "pred") { this->TaskPred(); } else { diff --git a/rabit-learn/linear/run-yarn.sh b/rabit-learn/linear/run-yarn.sh new file mode 100755 index 000000000..e7ba873a4 --- /dev/null +++ b/rabit-learn/linear/run-yarn.sh @@ -0,0 +1,19 @@ +#!/bin/bash +if [ "$#" -lt 3 ]; +then + echo "Usage: [param=val]" + exit -1 +fi + +# put the local training file to HDFS +hadoop fs -rm -r -f $2/data +hadoop fs -rm -r -f $2/mushroom.linear.model +hadoop fs -mkdir $2/data + +# submit to hadoop +../../tracker/rabit_yarn.py -n $1 --vcores 1 linear.rabit hdfs://$2/data/agaricus.txt.train model_out=hdfs://$2/mushroom.linear.model "${*:3}" + +# get the final model file +hadoop fs -get $2/mushroom.linear.model ./linear.model + +./linear.rabit ../data/agaricus.txt.test task=pred model_in=linear.model diff --git a/rabit-learn/make/config.mk b/rabit-learn/make/config.mk index ab62b3066..936564e05 100644 --- a/rabit-learn/make/config.mk +++ b/rabit-learn/make/config.mk @@ -15,7 +15,7 @@ export CXX = g++ export MPICXX = mpicxx # whether use HDFS support during compile -USE_HDFS = 0 +USE_HDFS = 1 # path to libjvm.so LIBJVM=$(JAVA_HOME)/jre/lib/amd64/server diff --git a/tracker/rabit_yarn.py b/tracker/rabit_yarn.py new file mode 100755 index 000000000..ae391fffd --- /dev/null +++ b/tracker/rabit_yarn.py @@ -0,0 +1,122 @@ +#!/usr/bin/python +""" +This is a script to submit rabit job using hadoop streaming. +It will submit the rabit process as mappers of MapReduce. +""" +import argparse +import sys +import os +import time +import subprocess +import warnings +import rabit_tracker as tracker + +WRAPPER_PATH = os.path.dirname(__file__) + '/../wrapper' +YARN_JAR_PATH = os.path.dirname(__file__) + '/../yarn/rabit-yarn.jar' + +assert os.path.exists(YARN_JAR_PATH), ("cannot find \"%s\", please run build.sh on the yarn folder" % YARN_JAR_PATH) +hadoop_binary = 'hadoop' +# code +hadoop_home = os.getenv('HADOOP_HOME') + +if hadoop_home != None: + if hadoop_binary == None: + hadoop_binary = hadoop_home + '/bin/hadoop' + assert os.path.exists(hadoop_binary), "HADOOP_HOME does not contain the hadoop binary" + + +parser = argparse.ArgumentParser(description='Rabit script to submit rabit jobs to Yarn.') +parser.add_argument('-n', '--nworker', required=True, type=int, + help = 'number of worker proccess to be launched') +parser.add_argument('-hip', '--host_ip', default='auto', type=str, + help = 'host IP address if cannot be automatically guessed, specify the IP of submission machine') +parser.add_argument('-v', '--verbose', default=0, choices=[0, 1], type=int, + help = 'print more messages into the console') +parser.add_argument('-ac', '--auto_file_cache', default=1, choices=[0, 1], type=int, + help = 'whether automatically cache the files in the command to hadoop localfile, this is on by default') +parser.add_argument('-f', '--files', default = [], action='append', + help = 'the cached file list in mapreduce,'\ + ' the submission script will automatically cache all the files which appears in command'\ + ' This will also cause rewritten of all the file names in the command to current path,'\ + ' for example `../../kmeans ../kmeans.conf` will be rewritten to `./kmeans kmeans.conf`'\ + ' because the two files are cached to running folder.'\ + ' You may need this option to cache additional files.'\ + ' You can also use it to manually cache files when auto_file_cache is off') +parser.add_argument('--jobname', default='auto', help = 'customize jobname in tracker') +parser.add_argument('--tempdir', default='/tmp', help = 'temporary directory in HDFS that can be used to store intermediate results') +parser.add_argument('--vcores', default = 1, type=int, + help = 'number of vcpores to request in each mapper, set it if each rabit job is multi-threaded') +parser.add_argument('-mem', '--memory_mb', default=1024, type=int, + help = 'maximum memory used by the process. Guide: set it large (near mapred.cluster.max.map.memory.mb)'\ + 'if you are running multi-threading rabit,'\ + 'so that each node can occupy all the mapper slots in a machine for maximum performance') +parser.add_argument('command', nargs='+', + help = 'command for rabit program') +args = parser.parse_args() + +if args.jobname == 'auto': + args.jobname = ('Rabit[nworker=%d]:' % args.nworker) + args.command[0].split('/')[-1]; + +if hadoop_binary == None: + parser.add_argument('-hb', '--hadoop_binary', required = True, + help="path to hadoop binary file") +else: + parser.add_argument('-hb', '--hadoop_binary', default = hadoop_binary, + help="path to hadoop binary file") + +args = parser.parse_args() + +if args.jobname == 'auto': + args.jobname = ('Rabit[nworker=%d]:' % args.nworker) + args.command[0].split('/')[-1]; + +# detech hadoop version +(out, err) = subprocess.Popen('%s version' % args.hadoop_binary, shell = True, stdout=subprocess.PIPE).communicate() +out = out.split('\n')[0].split() +assert out[0] == 'Hadoop', 'cannot parse hadoop version string' +hadoop_version = out[1].split('.') + +(classpath, err) = subprocess.Popen('%s classpath --glob' % args.hadoop_binary, shell = True, stdout=subprocess.PIPE).communicate() + +if hadoop_version < 2: + print 'Current Hadoop Version is %s, rabit_yarn will need Yarn(Hadoop 2.0)' % out[1] + +def submit_yarn(nworker, worker_args, worker_env): + fset = set([YARN_JAR_PATH]) + if args.auto_file_cache != 0: + for i in range(len(args.command)): + f = args.command[i] + if os.path.exists(f): + fset.add(f) + if i == 0: + args.command[i] = './' + args.command[i].split('/')[-1] + else: + args.command[i] = args.command[i].split('/')[-1] + if args.command[0].endswith('.py'): + flst = [WRAPPER_PATH + '/rabit.py', + WRAPPER_PATH + '/librabit_wrapper.so', + WRAPPER_PATH + '/librabit_wrapper_mock.so'] + for f in flst: + if os.path.exists(f): + fset.add(f) + + cmd = 'java -cp `%s classpath`:%s org.apache.hadoop.yarn.rabit.Client ' % (args.hadoop_binary, YARN_JAR_PATH) + env = os.environ.copy() + for k, v in worker_env.items(): + env[k] = str(v) + env['rabit_cpu_vcores'] = str(args.vcores) + env['rabit_memory_mb'] = str(args.memory_mb) + env['rabit_world_size'] = str(args.nworker) + + if args.files != None: + for flst in args.files: + for f in flst.split('#'): + fset.add(f) + for f in fset: + cmd += ' -file %s' % f + cmd += ' -jobname %s ' % args.jobname + cmd += ' -tempdir %s ' % args.tempdir + cmd += (' '.join(args.command + worker_args)) + print cmd + subprocess.check_call(cmd, shell = True, env = env) + +tracker.submit(args.nworker, [], fun_submit = submit_yarn, verbose = args.verbose, hostIP = args.host_ip) diff --git a/yarn/.gitignore b/yarn/.gitignore new file mode 100644 index 000000000..1162c62ea --- /dev/null +++ b/yarn/.gitignore @@ -0,0 +1,4 @@ +bin +.classpath +.project +*.jar diff --git a/yarn/src/org/apache/hadoop/yarn/rabit/ApplicationMaster.java b/yarn/src/org/apache/hadoop/yarn/rabit/ApplicationMaster.java index 8cf8d74d4..28dc073cb 100644 --- a/yarn/src/org/apache/hadoop/yarn/rabit/ApplicationMaster.java +++ b/yarn/src/org/apache/hadoop/yarn/rabit/ApplicationMaster.java @@ -1,15 +1,19 @@ package org.apache.hadoop.yarn.rabit; +import java.io.IOException; import java.nio.ByteBuffer; import java.util.List; import java.util.Map; import java.util.Queue; import java.util.Collection; import java.util.Collections; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.api.ApplicationConstants; @@ -19,6 +23,9 @@ import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.LocalResourceType; +import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -38,6 +45,8 @@ public class ApplicationMaster { private static final Log LOG = LogFactory.getLog(ApplicationMaster.class); // configuration private Configuration conf = new YarnConfiguration(); + // hdfs handler + private FileSystem dfs; // number of cores allocated for each task private int numVCores = 1; @@ -48,7 +57,7 @@ public class ApplicationMaster { // total number of tasks private int numTasks = 1; // maximum number of attempts to try in each task - private int maxNumAttempt = 10; + private int maxNumAttempt = 3; // command to launch private String command = ""; @@ -61,6 +70,8 @@ public class ApplicationMaster { // whether we start to abort the application, due to whatever fatal reasons private boolean startAbort = false; + // worker resources + private Map workerResources = new java.util.HashMap(); // record the aborting reason private String abortDiagnosis = ""; // resource manager @@ -81,6 +92,10 @@ public class ApplicationMaster { new ApplicationMaster().run(args); } + private ApplicationMaster() throws IOException { + dfs = FileSystem.get(conf); + } + /** * get integer argument from environment variable * @@ -92,11 +107,16 @@ public class ApplicationMaster { * default value * @return the requested result */ - private int getEnvInteger(String name, boolean required, int defv) { + private int getEnvInteger(String name, boolean required, int defv) + throws IOException { String value = System.getenv(name); if (value == null) { - if (required) - LOG.fatal("environment variable " + name + "not set"); + if (required) { + throw new IOException("environment variable " + name + + " not set"); + } else { + return defv; + } } return Integer.valueOf(value); } @@ -106,14 +126,37 @@ public class ApplicationMaster { * * @param args */ - private void initArgs(String args[]) { - for (String c : args) { - this.command += c + " "; + private void initArgs(String args[]) throws IOException { + LOG.info("Invoke initArgs"); + // cached maps + Map cacheFiles = new java.util.HashMap(); + for (int i = 0; i < args.length; ++i) { + if (args[i].equals("-file")) { + String[] arr = args[++i].split("#"); + Path path = new Path(arr[0]); + if (arr.length == 1) { + cacheFiles.put(path.getName(), path); + } else { + cacheFiles.put(arr[1], path); + } + } else { + this.command += args[i] + " "; + } + } + for (Map.Entry e : cacheFiles.entrySet()) { + LocalResource r = Records.newRecord(LocalResource.class); + FileStatus status = dfs.getFileStatus(e.getValue()); + r.setResource(ConverterUtils.getYarnUrlFromPath(e.getValue())); + r.setSize(status.getLen()); + r.setTimestamp(status.getModificationTime()); + r.setType(LocalResourceType.FILE); + r.setVisibility(LocalResourceVisibility.APPLICATION); + workerResources.put(e.getKey(), r); } numVCores = this.getEnvInteger("rabit_cpu_vcores", true, numVCores); numMemoryMB = this.getEnvInteger("rabit_memory_mb", true, numMemoryMB); - maxNumAttempt = this.getEnvInteger("rabit_max_attempt", false, - maxNumAttempt); + numTasks = this.getEnvInteger("rabit_world_size", true, numTasks); + maxNumAttempt = this.getEnvInteger("rabit_max_attempt", false, maxNumAttempt); } /** @@ -121,12 +164,6 @@ public class ApplicationMaster { */ private void run(String args[]) throws Exception { this.initArgs(args); - // list of tasks that waits to be submit - java.util.Collection tasks = new java.util.LinkedList(); - // add waiting tasks - for (int i = 0; i < this.numTasks; ++i) { - tasks.add(new TaskRecord(i)); - } this.rmClient = AMRMClientAsync.createAMRMClientAsync(1000, new RMCallbackHandler()); this.nmClient = NMClientAsync @@ -138,37 +175,52 @@ public class ApplicationMaster { RegisterApplicationMasterResponse response = this.rmClient .registerApplicationMaster(this.appHostName, this.appTrackerPort, this.appTrackerUrl); - Resource maxResource = response.getMaximumResourceCapability(); - if (maxResource.getMemory() < this.numMemoryMB) { - LOG.warn("[Rabit] memory requested exceed bound " - + maxResource.getMemory()); - this.numMemoryMB = maxResource.getMemory(); - } - if (maxResource.getVirtualCores() < this.numVCores) { - LOG.warn("[Rabit] memory requested exceed bound " - + maxResource.getVirtualCores()); - this.numVCores = maxResource.getVirtualCores(); - } - this.submitTasks(tasks); - LOG.info("[Rabit] ApplicationMaster started"); - while (!this.doneAllJobs()) { - try { - Thread.sleep(100); - ; - } catch (InterruptedException e) { + + boolean success = false; + String diagnostics = ""; + try { + // list of tasks that waits to be submit + java.util.Collection tasks = new java.util.LinkedList(); + // add waiting tasks + for (int i = 0; i < this.numTasks; ++i) { + tasks.add(new TaskRecord(i)); } + Resource maxResource = response.getMaximumResourceCapability(); + + if (maxResource.getMemory() < this.numMemoryMB) { + LOG.warn("[Rabit] memory requested exceed bound " + + maxResource.getMemory()); + this.numMemoryMB = maxResource.getMemory(); + } + if (maxResource.getVirtualCores() < this.numVCores) { + LOG.warn("[Rabit] memory requested exceed bound " + + maxResource.getVirtualCores()); + this.numVCores = maxResource.getVirtualCores(); + } + this.submitTasks(tasks); + LOG.info("[Rabit] ApplicationMaster started"); + while (!this.doneAllJobs()) { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + } + } + assert (killedTasks.size() + finishedTasks.size() == numTasks); + success = finishedTasks.size() == numTasks; + LOG.info("Application completed. Stopping running containers"); + nmClient.stop(); + diagnostics = "Diagnostics." + ", num_tasks" + this.numTasks + + ", finished=" + this.finishedTasks.size() + ", failed=" + + this.killedTasks.size() + "\n" + this.abortDiagnosis; + LOG.info(diagnostics); + } catch (Exception e) { + diagnostics = e.toString(); } - assert (killedTasks.size() + finishedTasks.size() == numTasks); - boolean success = finishedTasks.size() == numTasks; - LOG.info("Application completed. Stopping running containers"); - nmClient.stop(); - String diagnostics = "Diagnostics." + ", num_tasks" + this.numTasks - + ", finished=" + this.finishedTasks.size() + ", failed=" - + this.killedTasks.size() + "\n" + this.abortDiagnosis; rmClient.unregisterApplicationMaster( success ? FinalApplicationStatus.SUCCEEDED : FinalApplicationStatus.FAILED, diagnostics, appTrackerUrl); + if (!success) throw new Exception("Application not successful"); } /** @@ -213,14 +265,21 @@ public class ApplicationMaster { task.containerRequest = null; ContainerLaunchContext ctx = Records .newRecord(ContainerLaunchContext.class); - String cmd = command + " 1>" + String cmd = + // use this to setup CLASSPATH correctly for libhdfs + "CLASSPATH=${CLASSPATH}:`${HADOOP_PREFIX}/bin/hadoop classpath --glob` " + + this.command + " 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" + " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr"; + LOG.info(cmd); ctx.setCommands(Collections.singletonList(cmd)); + LOG.info(workerResources); + ctx.setLocalResources(this.workerResources); // setup environment variables Map env = new java.util.HashMap(); - // setup class path + + // setup class path, this is kind of duplicated, ignoring StringBuilder cpath = new StringBuilder("${CLASSPATH}:./*"); for (String c : conf.getStrings( YarnConfiguration.YARN_APPLICATION_CLASSPATH, @@ -228,10 +287,12 @@ public class ApplicationMaster { cpath.append(':'); cpath.append(c.trim()); } - env.put("CLASSPATH", cpath.toString()); - // setup LD_LIBARY_pATH path for libhdfs + // already use hadoop command to get class path in worker, maybe a better solution in future + // env.put("CLASSPATH", cpath.toString()); + // setup LD_LIBARY_PATH path for libhdfs env.put("LD_LIBRARY_PATH", "${LD_LIBRARY_PATH}:$HADOOP_HDFS_HOME/lib/native:$JAVA_HOME/jre/lib/amd64/server"); + env.put("PYTHONPATH", "${PYTHONPATH}:."); // inherit all rabit variables for (Map.Entry e : System.getenv().entrySet()) { if (e.getKey().startsWith("rabit_")) { @@ -240,6 +301,7 @@ public class ApplicationMaster { } env.put("rabit_task_id", String.valueOf(task.taskId)); env.put("rabit_num_trial", String.valueOf(task.attemptCounter)); + ctx.setEnvironment(env); synchronized (this) { assert (!this.runningTasks.containsKey(container.getId())); diff --git a/yarn/src/org/apache/hadoop/yarn/rabit/Client.java b/yarn/src/org/apache/hadoop/yarn/rabit/Client.java index 2352d1638..7edf58d08 100644 --- a/yarn/src/org/apache/hadoop/yarn/rabit/Client.java +++ b/yarn/src/org/apache/hadoop/yarn/rabit/Client.java @@ -1,13 +1,14 @@ package org.apache.hadoop.yarn.rabit; - import java.io.IOException; import java.util.Collections; import java.util.Map; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; @@ -27,35 +28,82 @@ import org.apache.hadoop.yarn.util.Records; public class Client { // logger - private static final Log LOG = LogFactory.getLog(ApplicationMaster.class); + private static final Log LOG = LogFactory.getLog(Client.class); + // permission for temp file + private static final FsPermission permTemp = new FsPermission("777"); // configuration private YarnConfiguration conf = new YarnConfiguration(); + // hdfs handler + private FileSystem dfs; // cached maps - private Map cacheFiles = new java.util.HashMap(); + private Map cacheFiles = new java.util.HashMap(); + // enviroment variable to setup cachefiles + private String cacheFileArg = ""; // args to pass to application master - private String appArgs; - + private String appArgs = ""; + // HDFS Path to store temporal result + private String tempdir = "/tmp"; + // job name + private String jobName = ""; /** - * get the local resource setting + * constructor + * @throws IOException + */ + private Client() throws IOException { + dfs = FileSystem.get(conf); + } + + /** + * ge * * @param fmaps * the file maps * @return the resource map * @throws IOException */ - private Map getLocalResource() throws IOException { + private Map setupCacheFiles(ApplicationId appId) throws IOException { + // create temporary rabit directory + Path tmpPath = new Path(this.tempdir); + if (!dfs.exists(tmpPath)) { + dfs.mkdirs(tmpPath, permTemp); + LOG.info("HDFS temp directory do not exist, creating.. " + tmpPath); + } + tmpPath = new Path(tmpPath + "/temp-rabit-yarn-" + appId); + if (dfs.exists(tmpPath)) { + dfs.delete(tmpPath, true); + } + // create temporary directory + FileSystem.mkdirs(dfs, tmpPath, permTemp); + + StringBuilder cstr = new StringBuilder(); Map rmap = new java.util.HashMap(); - for (Map.Entry e : cacheFiles.entrySet()) { + for (Map.Entry e : cacheFiles.entrySet()) { LocalResource r = Records.newRecord(LocalResource.class); - Path path = e.getValue(); - FileStatus status = FileSystem.get(conf).getFileStatus(path); + Path path = new Path(e.getValue()); + // copy local data to temporary folder in HDFS + if (!e.getValue().startsWith("hdfs://")) { + Path dst = new Path("hdfs://" + tmpPath + "/"+ path.getName()); + dfs.copyFromLocalFile(false, true, path, dst); + dfs.setPermission(dst, permTemp); + dfs.deleteOnExit(dst); + path = dst; + } + FileStatus status = dfs.getFileStatus(path); r.setResource(ConverterUtils.getYarnUrlFromPath(path)); r.setSize(status.getLen()); r.setTimestamp(status.getModificationTime()); r.setType(LocalResourceType.FILE); - r.setVisibility(LocalResourceVisibility.PUBLIC); + r.setVisibility(LocalResourceVisibility.APPLICATION); rmap.put(e.getKey(), r); + cstr.append(" -file \""); + cstr.append(path.toString()); + cstr.append('#'); + cstr.append(e.getKey()); + cstr.append("\""); } + + dfs.deleteOnExit(tmpPath); + this.cacheFileArg = cstr.toString(); return rmap; } @@ -80,6 +128,7 @@ public class Client { env.put(e.getKey(), e.getValue()); } } + LOG.debug(env); return env; } @@ -89,18 +138,20 @@ public class Client { * @param args */ private void initArgs(String[] args) { - // directly pass all args except args0 + // directly pass all arguments except args0 StringBuilder sargs = new StringBuilder(""); for (int i = 0; i < args.length; ++i) { - if (args[i] == "-file") { - String[] arr = args[i + 1].split("#"); - Path path = new Path(arr[0]); + if (args[i].equals("-file")) { + String[] arr = args[++i].split("#"); if (arr.length == 1) { - cacheFiles.put(path.getName(), path); + cacheFiles.put(new Path(arr[0]).getName(), arr[0]); } else { - cacheFiles.put(arr[1], path); + cacheFiles.put(arr[1], arr[0]); } - ++i; + } else if(args[i].equals("-jobname")) { + this.jobName = args[++i]; + } else if(args[i].equals("-tempdir")) { + this.tempdir = args[++i]; } else { sargs.append(" "); sargs.append(args[i]); @@ -128,33 +179,34 @@ public class Client { // Set up the container launch context for the application master ContainerLaunchContext amContainer = Records .newRecord(ContainerLaunchContext.class); - amContainer.setCommands(Collections.singletonList("$JAVA_HOME/bin/java" + ApplicationSubmissionContext appContext = app + .getApplicationSubmissionContext(); + // Submit application + ApplicationId appId = appContext.getApplicationId(); + // setup cache-files and environment variables + amContainer.setLocalResources(this.setupCacheFiles(appId)); + amContainer.setEnvironment(this.getEnvironment()); + String cmd = "$JAVA_HOME/bin/java" + " -Xmx256M" + " org.apache.hadoop.yarn.rabit.ApplicationMaster" - + this.appArgs + " 1>" + + this.cacheFileArg + ' ' + this.appArgs + " 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" - + " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR - + "/stderr")); - - // setup cache files - amContainer.setLocalResources(this.getLocalResource()); - amContainer.setEnvironment(this.getEnvironment()); + + " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr"; + LOG.debug(cmd); + amContainer.setCommands(Collections.singletonList(cmd)); // Set up resource type requirements for ApplicationMaster Resource capability = Records.newRecord(Resource.class); capability.setMemory(256); capability.setVirtualCores(1); + LOG.info("jobname=" + this.jobName); - ApplicationSubmissionContext appContext = app - .getApplicationSubmissionContext(); - appContext.setApplicationName("Rabit-YARN"); + appContext.setApplicationName(jobName + ":RABIT-YARN"); appContext.setAMContainerSpec(amContainer); appContext.setResource(capability); appContext.setQueue("default"); - - // Submit application - ApplicationId appId = appContext.getApplicationId(); - LOG.info("Submitting application " + appId); + + LOG.info("Submitting application " + appId); yarnClient.submitApplication(appContext); ApplicationReport appReport = yarnClient.getApplicationReport(appId); diff --git a/yarn/src/org/apache/hadoop/yarn/rabit/TaskRecord.java b/yarn/src/org/apache/hadoop/yarn/rabit/TaskRecord.java index dfcfcb9eb..c1b70d320 100644 --- a/yarn/src/org/apache/hadoop/yarn/rabit/TaskRecord.java +++ b/yarn/src/org/apache/hadoop/yarn/rabit/TaskRecord.java @@ -1,22 +1,24 @@ package org.apache.hadoop.yarn.rabit; + import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; + /** * data structure to hold the task information */ public class TaskRecord { - // task id of the task - public int taskId = 0; - // number of failed attempts to run the task - public int attemptCounter = 0; - // container request, can be null if task is already running - public ContainerRequest containerRequest = null; - // running container, can be null if the task is not launched - public Container container = null; - // whether we have requested abortion of this task - public boolean abortRequested = false; - - public TaskRecord(int taskId) { - this.taskId = taskId; - } + // task id of the task + public int taskId = 0; + // number of failed attempts to run the task + public int attemptCounter = 0; + // container request, can be null if task is already running + public ContainerRequest containerRequest = null; + // running container, can be null if the task is not launched + public Container container = null; + // whether we have requested abortion of this task + public boolean abortRequested = false; + + public TaskRecord(int taskId) { + this.taskId = taskId; + } }