diff --git a/rabit-learn/linear/run-hadoop.sh b/rabit-learn/linear/run-hadoop-old.sh similarity index 61% rename from rabit-learn/linear/run-hadoop.sh rename to rabit-learn/linear/run-hadoop-old.sh index 924a4998f..e90515f28 100755 --- a/rabit-learn/linear/run-hadoop.sh +++ b/rabit-learn/linear/run-hadoop-old.sh @@ -6,15 +6,15 @@ then fi # put the local training file to HDFS -#hadoop fs -rm -r -f $2/data +hadoop fs -rm -r -f $2/data hadoop fs -rm -r -f $2/mushroom.linear.model -#hadoop fs -mkdir $2/data -#hadoop fs -put ../data/agaricus.txt.train $2/data +hadoop fs -mkdir $2/data +hadoop fs -put ../data/agaricus.txt.train $2/data # submit to hadoop ../../tracker/rabit_hadoop_streaming.py -n $1 --vcores 1 -i $2/data/agaricus.txt.train -o $2/mushroom.linear.model linear.rabit stdin model_out=stdout "${*:3}" # get the final model file -#hadoop fs -get $2/mushroom.linear.model/part-00000 ./linear.model +hadoop fs -get $2/mushroom.linear.model/part-00000 ./linear.model -#./linear.rabit ../data/agaricus.txt.test task=pred model_in=linear.model +./linear.rabit ../data/agaricus.txt.test task=pred model_in=linear.model diff --git a/rabit-learn/make/config.mk b/rabit-learn/make/config.mk index 936564e05..ab62b3066 100644 --- a/rabit-learn/make/config.mk +++ b/rabit-learn/make/config.mk @@ -15,7 +15,7 @@ export CXX = g++ export MPICXX = mpicxx # whether use HDFS support during compile -USE_HDFS = 1 +USE_HDFS = 0 # path to libjvm.so LIBJVM=$(JAVA_HOME)/jre/lib/amd64/server diff --git a/tracker/rabit_hadoop.py b/tracker/rabit_hadoop_streaming.py similarity index 91% rename from tracker/rabit_hadoop.py rename to tracker/rabit_hadoop_streaming.py index 2d72c1aba..d2b47adf9 100755 --- a/tracker/rabit_hadoop.py +++ b/tracker/rabit_hadoop_streaming.py @@ -34,13 +34,11 @@ if hadoop_binary == None or hadoop_streaming_jar == None: ', or modify rabit_hadoop.py line 16', stacklevel = 2) parser = argparse.ArgumentParser(description='Rabit script to submit rabit jobs using Hadoop Streaming.'\ - 'This script support both Hadoop 1.0 and Yarn(MRv2), Yarn is recommended') + 'It is Highly recommended to use rabit_yarn.py instead') parser.add_argument('-n', '--nworker', required=True, type=int, help = 'number of worker proccess to be launched') parser.add_argument('-hip', '--host_ip', default='auto', type=str, help = 'host IP address if cannot be automatically guessed, specify the IP of submission machine') -parser.add_argument('-nt', '--nthread', default = -1, type=int, - help = 'number of thread in each mapper to be launched, set it if each rabit job is multi-threaded') parser.add_argument('-i', '--input', required=True, help = 'input path in HDFS') parser.add_argument('-o', '--output', required=True, @@ -61,6 +59,8 @@ parser.add_argument('--jobname', default='auto', help = 'customize jobname in tr parser.add_argument('--timeout', default=600000000, type=int, help = 'timeout (in million seconds) of each mapper job, automatically set to a very long time,'\ 'normally you do not need to set this ') +parser.add_argument('--vcores', default = -1, type=int, + help = 'number of vcpores to request in each mapper, set it if each rabit job is multi-threaded') parser.add_argument('-mem', '--memory_mb', default=-1, type=int, help = 'maximum memory used by the process. Guide: set it large (near mapred.cluster.max.map.memory.mb)'\ 'if you are running multi-threading rabit,'\ @@ -95,7 +95,9 @@ use_yarn = int(hadoop_version[0]) >= 2 print 'Current Hadoop Version is %s' % out[1] def hadoop_streaming(nworker, worker_args, worker_envs, use_yarn): - fset = set() + worker_envs['CLASSPATH'] = '`$HADOOP_HOME/bin/hadoop classpath --glob` ' + worker_envs['LD_LIBRARY_PATH'] = '{LD_LIBRARY_PATH}:$HADOOP_HDFS_HOME/lib/native:$JAVA_HOME/jre/lib/amd64/server' + fset = set() if args.auto_file_cache: for i in range(len(args.command)): f = args.command[i] @@ -132,12 +134,12 @@ def hadoop_streaming(nworker, worker_args, worker_envs, use_yarn): cmd += ' -D%s=%s' % (kmap['jobname'], args.jobname) envstr = ','.join('%s=%s' % (k, str(v)) for k, v in worker_envs.items()) cmd += ' -D%s=\"%s\"' % (kmap['env'], envstr) - if args.nthread != -1: + if args.vcores != -1: if kmap['nthread'] is None: warnings.warn('nthread can only be set in Yarn(Hadoop version greater than 2.0),'\ 'it is recommended to use Yarn to submit rabit jobs', stacklevel = 2) else: - cmd += ' -D%s=%d' % (kmap['nthread'], args.nthread) + cmd += ' -D%s=%d' % (kmap['nthread'], args.vcores) cmd += ' -D%s=%d' % (kmap['timeout'], args.timeout) if args.memory_mb != -1: cmd += ' -D%s=%d' % (kmap['timeout'], args.timeout) @@ -153,5 +155,5 @@ def hadoop_streaming(nworker, worker_args, worker_envs, use_yarn): print cmd subprocess.check_call(cmd, shell = True) -fun_submit = lambda nworker, worker_args: hadoop_streaming(nworker, worker_args, int(hadoop_version[0]) >= 2) +fun_submit = lambda nworker, worker_args, worker_envs: hadoop_streaming(nworker, worker_args, worker_envs, int(hadoop_version[0]) >= 2) tracker.submit(args.nworker, [], fun_submit = fun_submit, verbose = args.verbose, hostIP = args.host_ip) diff --git a/yarn/src/org/apache/hadoop/yarn/rabit/ApplicationMaster.java b/yarn/src/org/apache/hadoop/yarn/rabit/ApplicationMaster.java index 2f900c511..8cf8d74d4 100644 --- a/yarn/src/org/apache/hadoop/yarn/rabit/ApplicationMaster.java +++ b/yarn/src/org/apache/hadoop/yarn/rabit/ApplicationMaster.java @@ -34,340 +34,413 @@ import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; * @author Tianqi Chen */ public class ApplicationMaster { - // logger - private static final Log LOG = LogFactory.getLog(ApplicationMaster.class); - // configuration - private Configuration conf = new YarnConfiguration(); - - // number of cores allocated for each task - private int numVCores = 1; - // memory needed requested for the task - private int numMemoryMB = 10; - // priority of the app master - private int appPriority = 0; - // total number of tasks - private int numTasks = 1; - // maximum number of attempts to try in each task - private int maxNumAttempt = 10; - // command to launch - private String command = ""; - - // application tracker hostname - private String appHostName = ""; - // tracker URL to do - private String appTrackerUrl = ""; - // tracker port - private int appTrackerPort = 0; + // logger + private static final Log LOG = LogFactory.getLog(ApplicationMaster.class); + // configuration + private Configuration conf = new YarnConfiguration(); + + // number of cores allocated for each task + private int numVCores = 1; + // memory needed requested for the task + private int numMemoryMB = 10; + // priority of the app master + private int appPriority = 0; + // total number of tasks + private int numTasks = 1; + // maximum number of attempts to try in each task + private int maxNumAttempt = 10; + // command to launch + private String command = ""; + + // application tracker hostname + private String appHostName = ""; + // tracker URL to do + private String appTrackerUrl = ""; + // tracker port + private int appTrackerPort = 0; + + // whether we start to abort the application, due to whatever fatal reasons + private boolean startAbort = false; + // record the aborting reason + private String abortDiagnosis = ""; + // resource manager + private AMRMClientAsync rmClient = null; + // node manager + private NMClientAsync nmClient = null; + + // list of tasks that pending for resources to be allocated + private final Queue pendingTasks = new java.util.LinkedList(); + // map containerId->task record of tasks that was running + private final Map runningTasks = new java.util.HashMap(); + // collection of tasks + private final Collection finishedTasks = new java.util.LinkedList(); + // collection of killed tasks + private final Collection killedTasks = new java.util.LinkedList(); - // whether we start to abort the application, due to whatever fatal reasons - private boolean startAbort = false; - // record the aborting reason - private String abortDiagnosis = ""; - // resource manager - private AMRMClientAsync rmClient = null; - // node manager - private NMClientAsync nmClient = null; - - // list of tasks that pending for resources to be allocated - private final Queue pendingTasks = new java.util.LinkedList(); - // map containerId->task record of tasks that was running - private final Map runningTasks = new java.util.HashMap(); - // collection of tasks - private final Collection finishedTasks = new java.util.LinkedList(); - // collection of killed tasks - private final Collection killedTasks = new java.util.LinkedList(); - public static void main(String[] args) throws Exception { - new ApplicationMaster().run(args); + new ApplicationMaster().run(args); } + /** * get integer argument from environment variable - * @param name name of key - * @param required whether this is required - * @param defv default value + * + * @param name + * name of key + * @param required + * whether this is required + * @param defv + * default value * @return the requested result */ private int getEnvInteger(String name, boolean required, int defv) { - String value = System.getenv(name); - if (value == null) { - if (required) LOG.fatal("environment variable " + name + "not set"); - } - return Integer.valueOf(value); + String value = System.getenv(name); + if (value == null) { + if (required) + LOG.fatal("environment variable " + name + "not set"); + } + return Integer.valueOf(value); } + /** * initialize from arguments and command lines + * * @param args */ - private void initArgs(String args[]) { - for (String c : args) { - this.command += c + " "; - } - numVCores = this.getEnvInteger("rabit_cpu_vcores", true, numVCores); - numMemoryMB = this.getEnvInteger("rabit_memory_mb", true, numMemoryMB); - maxNumAttempt = this.getEnvInteger("rabit_max_attempt", false, maxNumAttempt); - } + private void initArgs(String args[]) { + for (String c : args) { + this.command += c + " "; + } + numVCores = this.getEnvInteger("rabit_cpu_vcores", true, numVCores); + numMemoryMB = this.getEnvInteger("rabit_memory_mb", true, numMemoryMB); + maxNumAttempt = this.getEnvInteger("rabit_max_attempt", false, + maxNumAttempt); + } + /** - * called to start the application - */ - private void run(String args[]) throws Exception { - this.initArgs(args); - // list of tasks that waits to be submit - java.util.Collection tasks = new java.util.LinkedList(); - // add waiting tasks - for (int i = 0; i < this.numTasks; ++i) { - tasks.add(new TaskRecord(i)); - } - this.rmClient = AMRMClientAsync.createAMRMClientAsync(1000, new RMCallbackHandler()); - this.nmClient = NMClientAsync.createNMClientAsync(new NMCallbackHandler()); - this.rmClient.init(conf); - this.rmClient.start(); - this.nmClient.init(conf); - this.nmClient.start(); - RegisterApplicationMasterResponse response = - this.rmClient.registerApplicationMaster(this.appHostName, this.appTrackerPort, this.appTrackerUrl); - Resource maxResource = response.getMaximumResourceCapability(); - if (maxResource.getMemory() < this.numMemoryMB) { - LOG.warn("[Rabit] memory requested exceed bound " + maxResource.getMemory()); - this.numMemoryMB = maxResource.getMemory(); - } - if (maxResource.getVirtualCores() < this.numVCores) { - LOG.warn("[Rabit] memory requested exceed bound " + maxResource.getVirtualCores()); - this.numVCores = maxResource.getVirtualCores(); - } - this.submitTasks(tasks); - LOG.info("[Rabit] ApplicationMaster started"); - while (!this.doneAllJobs()) { - try { - Thread.sleep(100);; - } catch (InterruptedException e) { - } - } - assert (killedTasks.size() + finishedTasks.size() == numTasks); - boolean success = finishedTasks.size() == numTasks; - LOG.info("Application completed. Stopping running containers"); - nmClient.stop(); - String diagnostics = "Diagnostics." + ", num_tasks" + this.numTasks - + ", finished=" + this.finishedTasks.size() + ", failed=" - + this.killedTasks.size() + "\n" + this.abortDiagnosis; - rmClient.unregisterApplicationMaster - (success ? FinalApplicationStatus.SUCCEEDED : FinalApplicationStatus.FAILED, - diagnostics, appTrackerUrl); - } - /** - * check if the job finishes - * @return whether we finished all the jobs - */ - private synchronized boolean doneAllJobs() { - return pendingTasks.size() == 0 && runningTasks.size() == 0; - } - /** - * submit tasks to request containers for the tasks - * @param tasks a collection of tasks we want to ask container for - */ - private synchronized void submitTasks(Collection tasks) { - for (TaskRecord r : tasks) { - Resource resource = Records.newRecord(Resource.class); - resource.setMemory(numMemoryMB); - resource.setVirtualCores(numVCores); - Priority priority = Records.newRecord(Priority.class); - priority.setPriority(this.appPriority); - r.containerRequest = new ContainerRequest(resource, null, null, priority); - rmClient.addContainerRequest(r.containerRequest); - pendingTasks.add(r); - } - } - /** - * launch the task on container - * @param container container to run the task - * @param task the task - */ - private void launchTask(Container container, TaskRecord task) { - task.container = container; - task.containerRequest = null; - ContainerLaunchContext ctx = - Records.newRecord(ContainerLaunchContext.class); - String cmd = command + " 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" - + " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr"; + * called to start the application + */ + private void run(String args[]) throws Exception { + this.initArgs(args); + // list of tasks that waits to be submit + java.util.Collection tasks = new java.util.LinkedList(); + // add waiting tasks + for (int i = 0; i < this.numTasks; ++i) { + tasks.add(new TaskRecord(i)); + } + this.rmClient = AMRMClientAsync.createAMRMClientAsync(1000, + new RMCallbackHandler()); + this.nmClient = NMClientAsync + .createNMClientAsync(new NMCallbackHandler()); + this.rmClient.init(conf); + this.rmClient.start(); + this.nmClient.init(conf); + this.nmClient.start(); + RegisterApplicationMasterResponse response = this.rmClient + .registerApplicationMaster(this.appHostName, + this.appTrackerPort, this.appTrackerUrl); + Resource maxResource = response.getMaximumResourceCapability(); + if (maxResource.getMemory() < this.numMemoryMB) { + LOG.warn("[Rabit] memory requested exceed bound " + + maxResource.getMemory()); + this.numMemoryMB = maxResource.getMemory(); + } + if (maxResource.getVirtualCores() < this.numVCores) { + LOG.warn("[Rabit] memory requested exceed bound " + + maxResource.getVirtualCores()); + this.numVCores = maxResource.getVirtualCores(); + } + this.submitTasks(tasks); + LOG.info("[Rabit] ApplicationMaster started"); + while (!this.doneAllJobs()) { + try { + Thread.sleep(100); + ; + } catch (InterruptedException e) { + } + } + assert (killedTasks.size() + finishedTasks.size() == numTasks); + boolean success = finishedTasks.size() == numTasks; + LOG.info("Application completed. Stopping running containers"); + nmClient.stop(); + String diagnostics = "Diagnostics." + ", num_tasks" + this.numTasks + + ", finished=" + this.finishedTasks.size() + ", failed=" + + this.killedTasks.size() + "\n" + this.abortDiagnosis; + rmClient.unregisterApplicationMaster( + success ? FinalApplicationStatus.SUCCEEDED + : FinalApplicationStatus.FAILED, diagnostics, + appTrackerUrl); + } + + /** + * check if the job finishes + * + * @return whether we finished all the jobs + */ + private synchronized boolean doneAllJobs() { + return pendingTasks.size() == 0 && runningTasks.size() == 0; + } + + /** + * submit tasks to request containers for the tasks + * + * @param tasks + * a collection of tasks we want to ask container for + */ + private synchronized void submitTasks(Collection tasks) { + for (TaskRecord r : tasks) { + Resource resource = Records.newRecord(Resource.class); + resource.setMemory(numMemoryMB); + resource.setVirtualCores(numVCores); + Priority priority = Records.newRecord(Priority.class); + priority.setPriority(this.appPriority); + r.containerRequest = new ContainerRequest(resource, null, null, + priority); + rmClient.addContainerRequest(r.containerRequest); + pendingTasks.add(r); + } + } + + /** + * launch the task on container + * + * @param container + * container to run the task + * @param task + * the task + */ + private void launchTask(Container container, TaskRecord task) { + task.container = container; + task.containerRequest = null; + ContainerLaunchContext ctx = Records + .newRecord(ContainerLaunchContext.class); + String cmd = command + " 1>" + + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" + + " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + + "/stderr"; ctx.setCommands(Collections.singletonList(cmd)); // setup environment variables Map env = new java.util.HashMap(); // setup class path - StringBuilder cpath = new StringBuilder("${CLASSPATH}:./*"); - for (String c : conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH, - YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) { - cpath.append(':'); - cpath.append(c.trim()); - } - env.put("CLASSPATH", cpath.toString()); - // setup LD_LIBARY_pATH path for libhdfs - env.put("LD_LIBRARY_PATH", "${LD_LIBRARY_PATH}:$HADOOP_HDFS_HOME/lib/native"); + StringBuilder cpath = new StringBuilder("${CLASSPATH}:./*"); + for (String c : conf.getStrings( + YarnConfiguration.YARN_APPLICATION_CLASSPATH, + YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) { + cpath.append(':'); + cpath.append(c.trim()); + } + env.put("CLASSPATH", cpath.toString()); + // setup LD_LIBARY_pATH path for libhdfs + env.put("LD_LIBRARY_PATH", + "${LD_LIBRARY_PATH}:$HADOOP_HDFS_HOME/lib/native:$JAVA_HOME/jre/lib/amd64/server"); // inherit all rabit variables for (Map.Entry e : System.getenv().entrySet()) { - if (e.getKey().startsWith("rabit_")) { - env.put(e.getKey(), e.getValue()); + if (e.getKey().startsWith("rabit_")) { + env.put(e.getKey(), e.getValue()); } } env.put("rabit_task_id", String.valueOf(task.taskId)); env.put("rabit_num_trial", String.valueOf(task.attemptCounter)); ctx.setEnvironment(env); synchronized (this) { - assert (!this.runningTasks.containsKey(container.getId())); - this.runningTasks.put(container.getId(), task); - this.nmClient.startContainerAsync(container, ctx); + assert (!this.runningTasks.containsKey(container.getId())); + this.runningTasks.put(container.getId(), task); + this.nmClient.startContainerAsync(container, ctx); } - } - /** - * free the containers that have not yet been launched - * @param containers - */ - private synchronized void freeUnusedContainers(Collection containers) { - } - /** - * handle method for AMRMClientAsync.CallbackHandler container allocation - * @param containers - */ - private synchronized void onContainersAllocated(List containers) { - if (this.startAbort) { - this.freeUnusedContainers(containers); - return; - } - Collection freelist = new java.util.LinkedList(); - for (Container c : containers) { - TaskRecord task; - task = pendingTasks.poll(); - if (task == null) { - freelist.add(c); continue; - } - this.launchTask(c, task); - } - this.freeUnusedContainers(freelist); - } - /** - * start aborting the job - * @param msg the fatal message - */ - private synchronized void abortJob(String msg) { - if (!this.startAbort) this.abortDiagnosis = msg; - this.startAbort = true; - for (TaskRecord r : this.runningTasks.values()) { - if (!r.abortRequested) { - nmClient.stopContainerAsync(r.container.getId(), r.container.getNodeId()); - r.abortRequested = true; - } - } - this.killedTasks.addAll(this.pendingTasks); - for (TaskRecord r : this.pendingTasks) { - rmClient.removeContainerRequest(r.containerRequest); - } - this.pendingTasks.clear(); - LOG.info(msg); - } - /** - * handle non fatal failures - * @param cid - */ - private synchronized void handleFailure(Collection failed) { - Collection tasks = new java.util.LinkedList(); - for (ContainerId cid : failed) { - TaskRecord r = runningTasks.remove(cid); - if (r == null) continue; - r.attemptCounter += 1; - r.container = null; - tasks.add(r); - if (r.attemptCounter >= this.maxNumAttempt) { - this.abortJob("[Rabit] Task " + r.taskId + " failed more than " + r.attemptCounter + "times"); - } - } - if (this.startAbort) { - this.killedTasks.addAll(tasks); - } else { - this.submitTasks(tasks); - } - } - /** - * handle method for AMRMClientAsync.CallbackHandler container allocation - * @param status list of status - */ - private synchronized void onContainersCompleted(List status) { - Collection failed = new java.util.LinkedList(); - for (ContainerStatus s : status) { - assert (s.getState().equals(ContainerState.COMPLETE)); - int exstatus = s.getExitStatus(); - TaskRecord r = runningTasks.get(s.getContainerId()); - if (r == null) continue; - if (exstatus == ContainerExitStatus.SUCCESS) { - finishedTasks.add(r); - runningTasks.remove(s.getContainerId()); - } else { - switch (exstatus) { - case ContainerExitStatus.KILLED_EXCEEDED_PMEM: - this.abortJob("[Rabit] Task " + r.taskId + " killed because of exceeding allocated physical memory"); - break; - case ContainerExitStatus.KILLED_EXCEEDED_VMEM: - this.abortJob("[Rabit] Task " + r.taskId + " killed because of exceeding allocated virtual memory"); - break; - default: - LOG.info("[Rabit] Task " + r.taskId + " exited with status " + exstatus); - failed.add(s.getContainerId()); - } - } - } - this.handleFailure(failed); - } - /** - * callback handler for resource manager - */ - private class RMCallbackHandler implements AMRMClientAsync.CallbackHandler { - @Override - public float getProgress() { - return 1.0f - (float)(pendingTasks.size()) / numTasks; - } - @Override - public void onContainersAllocated(List containers) { - ApplicationMaster.this.onContainersAllocated(containers); - } - @Override - public void onContainersCompleted(List status) { - ApplicationMaster.this.onContainersCompleted(status); - } - @Override - public void onError(Throwable ex) { - ApplicationMaster.this.abortJob("[Rabit] Resource manager Error " + ex.toString()); - } - @Override - public void onNodesUpdated(List nodereport) { - } - @Override - public void onShutdownRequest() { - ApplicationMaster.this.abortJob("[Rabit] Get shutdown request, start to shutdown..."); - } - } - private class NMCallbackHandler implements NMClientAsync.CallbackHandler { - @Override - public void onContainerStarted(ContainerId cid, Map services) { - LOG.debug("onContainerStarted Invoked"); - } - @Override - public void onContainerStatusReceived(ContainerId cid, ContainerStatus status) { - LOG.debug("onContainerStatusReceived Invoked"); - } - @Override - public void onContainerStopped(ContainerId cid) { - LOG.debug("onContainerStopped Invoked"); - } - @Override - public void onGetContainerStatusError(ContainerId cid, Throwable ex) { - LOG.debug("onGetContainerStatusError Invoked: " + ex.toString()); - ApplicationMaster.this.handleFailure(Collections.singletonList(cid)); - } - @Override - public void onStartContainerError(ContainerId cid, Throwable ex) { - LOG.debug("onStartContainerError Invoked: " + ex.toString()); - ApplicationMaster.this.handleFailure(Collections.singletonList(cid)); - } - @Override - public void onStopContainerError(ContainerId cid, Throwable ex) { - LOG.info("onStopContainerError Invoked: " + ex.toString()); - } - } + } + + /** + * free the containers that have not yet been launched + * + * @param containers + */ + private synchronized void freeUnusedContainers( + Collection containers) { + } + + /** + * handle method for AMRMClientAsync.CallbackHandler container allocation + * + * @param containers + */ + private synchronized void onContainersAllocated(List containers) { + if (this.startAbort) { + this.freeUnusedContainers(containers); + return; + } + Collection freelist = new java.util.LinkedList(); + for (Container c : containers) { + TaskRecord task; + task = pendingTasks.poll(); + if (task == null) { + freelist.add(c); + continue; + } + this.launchTask(c, task); + } + this.freeUnusedContainers(freelist); + } + + /** + * start aborting the job + * + * @param msg + * the fatal message + */ + private synchronized void abortJob(String msg) { + if (!this.startAbort) + this.abortDiagnosis = msg; + this.startAbort = true; + for (TaskRecord r : this.runningTasks.values()) { + if (!r.abortRequested) { + nmClient.stopContainerAsync(r.container.getId(), + r.container.getNodeId()); + r.abortRequested = true; + } + } + this.killedTasks.addAll(this.pendingTasks); + for (TaskRecord r : this.pendingTasks) { + rmClient.removeContainerRequest(r.containerRequest); + } + this.pendingTasks.clear(); + LOG.info(msg); + } + + /** + * handle non fatal failures + * + * @param cid + */ + private synchronized void handleFailure(Collection failed) { + Collection tasks = new java.util.LinkedList(); + for (ContainerId cid : failed) { + TaskRecord r = runningTasks.remove(cid); + if (r == null) + continue; + r.attemptCounter += 1; + r.container = null; + tasks.add(r); + if (r.attemptCounter >= this.maxNumAttempt) { + this.abortJob("[Rabit] Task " + r.taskId + " failed more than " + + r.attemptCounter + "times"); + } + } + if (this.startAbort) { + this.killedTasks.addAll(tasks); + } else { + this.submitTasks(tasks); + } + } + + /** + * handle method for AMRMClientAsync.CallbackHandler container allocation + * + * @param status + * list of status + */ + private synchronized void onContainersCompleted(List status) { + Collection failed = new java.util.LinkedList(); + for (ContainerStatus s : status) { + assert (s.getState().equals(ContainerState.COMPLETE)); + int exstatus = s.getExitStatus(); + TaskRecord r = runningTasks.get(s.getContainerId()); + if (r == null) + continue; + if (exstatus == ContainerExitStatus.SUCCESS) { + finishedTasks.add(r); + runningTasks.remove(s.getContainerId()); + } else { + switch (exstatus) { + case ContainerExitStatus.KILLED_EXCEEDED_PMEM: + this.abortJob("[Rabit] Task " + + r.taskId + + " killed because of exceeding allocated physical memory"); + break; + case ContainerExitStatus.KILLED_EXCEEDED_VMEM: + this.abortJob("[Rabit] Task " + + r.taskId + + " killed because of exceeding allocated virtual memory"); + break; + default: + LOG.info("[Rabit] Task " + r.taskId + + " exited with status " + exstatus); + failed.add(s.getContainerId()); + } + } + } + this.handleFailure(failed); + } + + /** + * callback handler for resource manager + */ + private class RMCallbackHandler implements AMRMClientAsync.CallbackHandler { + @Override + public float getProgress() { + return 1.0f - (float) (pendingTasks.size()) / numTasks; + } + + @Override + public void onContainersAllocated(List containers) { + ApplicationMaster.this.onContainersAllocated(containers); + } + + @Override + public void onContainersCompleted(List status) { + ApplicationMaster.this.onContainersCompleted(status); + } + + @Override + public void onError(Throwable ex) { + ApplicationMaster.this.abortJob("[Rabit] Resource manager Error " + + ex.toString()); + } + + @Override + public void onNodesUpdated(List nodereport) { + } + + @Override + public void onShutdownRequest() { + ApplicationMaster.this + .abortJob("[Rabit] Get shutdown request, start to shutdown..."); + } + } + + private class NMCallbackHandler implements NMClientAsync.CallbackHandler { + @Override + public void onContainerStarted(ContainerId cid, + Map services) { + LOG.debug("onContainerStarted Invoked"); + } + + @Override + public void onContainerStatusReceived(ContainerId cid, + ContainerStatus status) { + LOG.debug("onContainerStatusReceived Invoked"); + } + + @Override + public void onContainerStopped(ContainerId cid) { + LOG.debug("onContainerStopped Invoked"); + } + + @Override + public void onGetContainerStatusError(ContainerId cid, Throwable ex) { + LOG.debug("onGetContainerStatusError Invoked: " + ex.toString()); + ApplicationMaster.this + .handleFailure(Collections.singletonList(cid)); + } + + @Override + public void onStartContainerError(ContainerId cid, Throwable ex) { + LOG.debug("onStartContainerError Invoked: " + ex.toString()); + ApplicationMaster.this + .handleFailure(Collections.singletonList(cid)); + } + + @Override + public void onStopContainerError(ContainerId cid, Throwable ex) { + LOG.info("onStopContainerError Invoked: " + ex.toString()); + } + } } diff --git a/yarn/src/org/apache/hadoop/yarn/rabit/Client.java b/yarn/src/org/apache/hadoop/yarn/rabit/Client.java index 745f84559..2352d1638 100644 --- a/yarn/src/org/apache/hadoop/yarn/rabit/Client.java +++ b/yarn/src/org/apache/hadoop/yarn/rabit/Client.java @@ -26,149 +26,156 @@ import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.Records; public class Client { - // logger - private static final Log LOG = LogFactory.getLog(ApplicationMaster.class); - // configuration - private YarnConfiguration conf = new YarnConfiguration(); - // cached maps - private Map cacheFiles = new java.util.HashMap(); - // args to pass to application master - private String appArgs; - /** - * get the local resource setting - * @param fmaps the file maps - * @return the resource map - * @throws IOException - */ - private Map getLocalResource() throws IOException { - Map rmap = new java.util.HashMap(); - for (Map.Entry e : cacheFiles.entrySet()){ - LocalResource r = Records.newRecord(LocalResource.class); - Path path = e.getValue(); - FileStatus status = FileSystem.get(conf).getFileStatus(path); - r.setResource(ConverterUtils.getYarnUrlFromPath(path)); - r.setSize(status.getLen()); - r.setTimestamp(status.getModificationTime()); - r.setType(LocalResourceType.FILE); - r.setVisibility(LocalResourceVisibility.PUBLIC); - rmap.put(e.getKey(), r); - } - return rmap; - } - /** - * get the environment variables for container - * @return the env variable for child class - */ - private Map getEnvironment() { - // Setup environment variables - Map env = new java.util.HashMap(); - String cpath = "${CLASSPATH}:./*"; - for (String c : conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH, - YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) { - cpath += ':'; - cpath += c.trim(); - } - env.put("CLASSPATH", cpath); + // logger + private static final Log LOG = LogFactory.getLog(ApplicationMaster.class); + // configuration + private YarnConfiguration conf = new YarnConfiguration(); + // cached maps + private Map cacheFiles = new java.util.HashMap(); + // args to pass to application master + private String appArgs; + + /** + * get the local resource setting + * + * @param fmaps + * the file maps + * @return the resource map + * @throws IOException + */ + private Map getLocalResource() throws IOException { + Map rmap = new java.util.HashMap(); + for (Map.Entry e : cacheFiles.entrySet()) { + LocalResource r = Records.newRecord(LocalResource.class); + Path path = e.getValue(); + FileStatus status = FileSystem.get(conf).getFileStatus(path); + r.setResource(ConverterUtils.getYarnUrlFromPath(path)); + r.setSize(status.getLen()); + r.setTimestamp(status.getModificationTime()); + r.setType(LocalResourceType.FILE); + r.setVisibility(LocalResourceVisibility.PUBLIC); + rmap.put(e.getKey(), r); + } + return rmap; + } + + /** + * get the environment variables for container + * + * @return the env variable for child class + */ + private Map getEnvironment() { + // Setup environment variables + Map env = new java.util.HashMap(); + String cpath = "${CLASSPATH}:./*"; + for (String c : conf.getStrings( + YarnConfiguration.YARN_APPLICATION_CLASSPATH, + YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) { + cpath += ':'; + cpath += c.trim(); + } + env.put("CLASSPATH", cpath); for (Map.Entry e : System.getenv().entrySet()) { - if (e.getKey().startsWith("rabit_")) { - env.put(e.getKey(), e.getValue()); + if (e.getKey().startsWith("rabit_")) { + env.put(e.getKey(), e.getValue()); } } return env; - } - /** - * initialize the settings - * @param args - */ - private void initArgs(String[] args) { - // directly pass all args except args0 - StringBuilder sargs = new StringBuilder(""); - for (int i = 0; i < args.length; ++i) { - if (args[i] == "-file") { - String[] arr = args[i + 1].split("#"); - Path path = new Path(arr[0]); - if (arr.length == 1) { - cacheFiles.put(path.getName(), path); - } else { - cacheFiles.put(arr[1], path); - } - ++ i; - } else { - sargs.append(" "); - sargs.append(args[i]); - } - } - this.appArgs = sargs.toString(); - } - private void run(String[] args) throws Exception { - if (args.length == 0) { - System.out.println("Usage: [options] [commands..]"); - System.out.println("options: [-file filename]"); - return; - } - this.initArgs(args); - // Create yarnClient - YarnConfiguration conf = new YarnConfiguration(); - YarnClient yarnClient = YarnClient.createYarnClient(); - yarnClient.init(conf); - yarnClient.start(); - - // Create application via yarnClient - YarnClientApplication app = yarnClient.createApplication(); + } - // Set up the container launch context for the application master - ContainerLaunchContext amContainer = - Records.newRecord(ContainerLaunchContext.class); - amContainer.setCommands( - Collections.singletonList( - "$JAVA_HOME/bin/java" + - " -Xmx256M" + - " org.apache.hadoop.yarn.rabit.ApplicationMaster" + this.appArgs + - " 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" + - " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr" - ) - ); - - // setup cache files - amContainer.setLocalResources(this.getLocalResource()); - amContainer.setEnvironment(this.getEnvironment()); - - // Set up resource type requirements for ApplicationMaster - Resource capability = Records.newRecord(Resource.class); - capability.setMemory(256); - capability.setVirtualCores(1); + /** + * initialize the settings + * + * @param args + */ + private void initArgs(String[] args) { + // directly pass all args except args0 + StringBuilder sargs = new StringBuilder(""); + for (int i = 0; i < args.length; ++i) { + if (args[i] == "-file") { + String[] arr = args[i + 1].split("#"); + Path path = new Path(arr[0]); + if (arr.length == 1) { + cacheFiles.put(path.getName(), path); + } else { + cacheFiles.put(arr[1], path); + } + ++i; + } else { + sargs.append(" "); + sargs.append(args[i]); + } + } + this.appArgs = sargs.toString(); + } - - ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext(); - appContext.setApplicationName("Rabit-YARN"); - appContext.setAMContainerSpec(amContainer); - appContext.setResource(capability); - appContext.setQueue("default"); - - // Submit application - ApplicationId appId = appContext.getApplicationId(); - LOG.info("Submitting application " + appId); - yarnClient.submitApplication(appContext); - - ApplicationReport appReport = yarnClient.getApplicationReport(appId); - YarnApplicationState appState = appReport.getYarnApplicationState(); - while (appState != YarnApplicationState.FINISHED && - appState != YarnApplicationState.KILLED && - appState != YarnApplicationState.FAILED) { - Thread.sleep(100); - appReport = yarnClient.getApplicationReport(appId); - appState = appReport.getYarnApplicationState(); - } - - System.out.println( - "Application " + appId + " finished with" + - " state " + appState + - " at " + appReport.getFinishTime()); - if (!appReport.getFinalApplicationStatus().equals(FinalApplicationStatus.SUCCEEDED)) { - System.err.println(appReport.getDiagnostics()); - } - } - public static void main(String[] args) throws Exception { - new Client().run(args); - } + private void run(String[] args) throws Exception { + if (args.length == 0) { + System.out.println("Usage: [options] [commands..]"); + System.out.println("options: [-file filename]"); + return; + } + this.initArgs(args); + // Create yarnClient + YarnConfiguration conf = new YarnConfiguration(); + YarnClient yarnClient = YarnClient.createYarnClient(); + yarnClient.init(conf); + yarnClient.start(); + + // Create application via yarnClient + YarnClientApplication app = yarnClient.createApplication(); + + // Set up the container launch context for the application master + ContainerLaunchContext amContainer = Records + .newRecord(ContainerLaunchContext.class); + amContainer.setCommands(Collections.singletonList("$JAVA_HOME/bin/java" + + " -Xmx256M" + + " org.apache.hadoop.yarn.rabit.ApplicationMaster" + + this.appArgs + " 1>" + + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" + + " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + + "/stderr")); + + // setup cache files + amContainer.setLocalResources(this.getLocalResource()); + amContainer.setEnvironment(this.getEnvironment()); + + // Set up resource type requirements for ApplicationMaster + Resource capability = Records.newRecord(Resource.class); + capability.setMemory(256); + capability.setVirtualCores(1); + + ApplicationSubmissionContext appContext = app + .getApplicationSubmissionContext(); + appContext.setApplicationName("Rabit-YARN"); + appContext.setAMContainerSpec(amContainer); + appContext.setResource(capability); + appContext.setQueue("default"); + + // Submit application + ApplicationId appId = appContext.getApplicationId(); + LOG.info("Submitting application " + appId); + yarnClient.submitApplication(appContext); + + ApplicationReport appReport = yarnClient.getApplicationReport(appId); + YarnApplicationState appState = appReport.getYarnApplicationState(); + while (appState != YarnApplicationState.FINISHED + && appState != YarnApplicationState.KILLED + && appState != YarnApplicationState.FAILED) { + Thread.sleep(100); + appReport = yarnClient.getApplicationReport(appId); + appState = appReport.getYarnApplicationState(); + } + + System.out.println("Application " + appId + " finished with" + + " state " + appState + " at " + appReport.getFinishTime()); + if (!appReport.getFinalApplicationStatus().equals( + FinalApplicationStatus.SUCCEEDED)) { + System.err.println(appReport.getDiagnostics()); + } + } + + public static void main(String[] args) throws Exception { + new Client().run(args); + } }