complete yarn
This commit is contained in:
parent
4f28e32ebd
commit
2c1cfd8be6
@ -2,8 +2,10 @@ Linear and Logistic Regression
|
||||
====
|
||||
* input format: LibSVM
|
||||
* Local Example: [run-linear.sh](run-linear.sh)
|
||||
* Runnig on Hadoop: [run-hadoop.sh](run-hadoop.sh)
|
||||
- Set input data to stdin, and model_out=stdout
|
||||
* Runnig on YARN: [run-yarn.sh](run-yarn.sh)
|
||||
- You will need to have YARN
|
||||
- Modify ```../make/config.mk``` to set USE_HDFS=1
|
||||
- Run build.sh on [../../yarn](../../yarn) on to build yarn jar file
|
||||
|
||||
Parameters
|
||||
===
|
||||
|
||||
@ -54,7 +54,9 @@ class LinearObjFunction : public solver::IObjFunction<float> {
|
||||
}
|
||||
if (task == "train") {
|
||||
lbfgs.Run();
|
||||
this->SaveModel(model_out.c_str(), lbfgs.GetWeight());
|
||||
if (rabit::GetRank() == 0) {
|
||||
this->SaveModel(model_out.c_str(), lbfgs.GetWeight());
|
||||
}
|
||||
} else if (task == "pred") {
|
||||
this->TaskPred();
|
||||
} else {
|
||||
|
||||
19
rabit-learn/linear/run-yarn.sh
Executable file
19
rabit-learn/linear/run-yarn.sh
Executable file
@ -0,0 +1,19 @@
|
||||
#!/bin/bash
|
||||
if [ "$#" -lt 3 ];
|
||||
then
|
||||
echo "Usage: <nworkers> <path_in_HDFS> [param=val]"
|
||||
exit -1
|
||||
fi
|
||||
|
||||
# put the local training file to HDFS
|
||||
hadoop fs -rm -r -f $2/data
|
||||
hadoop fs -rm -r -f $2/mushroom.linear.model
|
||||
hadoop fs -mkdir $2/data
|
||||
|
||||
# submit to hadoop
|
||||
../../tracker/rabit_yarn.py -n $1 --vcores 1 linear.rabit hdfs://$2/data/agaricus.txt.train model_out=hdfs://$2/mushroom.linear.model "${*:3}"
|
||||
|
||||
# get the final model file
|
||||
hadoop fs -get $2/mushroom.linear.model ./linear.model
|
||||
|
||||
./linear.rabit ../data/agaricus.txt.test task=pred model_in=linear.model
|
||||
@ -15,7 +15,7 @@ export CXX = g++
|
||||
export MPICXX = mpicxx
|
||||
|
||||
# whether use HDFS support during compile
|
||||
USE_HDFS = 0
|
||||
USE_HDFS = 1
|
||||
|
||||
# path to libjvm.so
|
||||
LIBJVM=$(JAVA_HOME)/jre/lib/amd64/server
|
||||
|
||||
122
tracker/rabit_yarn.py
Executable file
122
tracker/rabit_yarn.py
Executable file
@ -0,0 +1,122 @@
|
||||
#!/usr/bin/python
|
||||
"""
|
||||
This is a script to submit rabit job using hadoop streaming.
|
||||
It will submit the rabit process as mappers of MapReduce.
|
||||
"""
|
||||
import argparse
|
||||
import sys
|
||||
import os
|
||||
import time
|
||||
import subprocess
|
||||
import warnings
|
||||
import rabit_tracker as tracker
|
||||
|
||||
WRAPPER_PATH = os.path.dirname(__file__) + '/../wrapper'
|
||||
YARN_JAR_PATH = os.path.dirname(__file__) + '/../yarn/rabit-yarn.jar'
|
||||
|
||||
assert os.path.exists(YARN_JAR_PATH), ("cannot find \"%s\", please run build.sh on the yarn folder" % YARN_JAR_PATH)
|
||||
hadoop_binary = 'hadoop'
|
||||
# code
|
||||
hadoop_home = os.getenv('HADOOP_HOME')
|
||||
|
||||
if hadoop_home != None:
|
||||
if hadoop_binary == None:
|
||||
hadoop_binary = hadoop_home + '/bin/hadoop'
|
||||
assert os.path.exists(hadoop_binary), "HADOOP_HOME does not contain the hadoop binary"
|
||||
|
||||
|
||||
parser = argparse.ArgumentParser(description='Rabit script to submit rabit jobs to Yarn.')
|
||||
parser.add_argument('-n', '--nworker', required=True, type=int,
|
||||
help = 'number of worker proccess to be launched')
|
||||
parser.add_argument('-hip', '--host_ip', default='auto', type=str,
|
||||
help = 'host IP address if cannot be automatically guessed, specify the IP of submission machine')
|
||||
parser.add_argument('-v', '--verbose', default=0, choices=[0, 1], type=int,
|
||||
help = 'print more messages into the console')
|
||||
parser.add_argument('-ac', '--auto_file_cache', default=1, choices=[0, 1], type=int,
|
||||
help = 'whether automatically cache the files in the command to hadoop localfile, this is on by default')
|
||||
parser.add_argument('-f', '--files', default = [], action='append',
|
||||
help = 'the cached file list in mapreduce,'\
|
||||
' the submission script will automatically cache all the files which appears in command'\
|
||||
' This will also cause rewritten of all the file names in the command to current path,'\
|
||||
' for example `../../kmeans ../kmeans.conf` will be rewritten to `./kmeans kmeans.conf`'\
|
||||
' because the two files are cached to running folder.'\
|
||||
' You may need this option to cache additional files.'\
|
||||
' You can also use it to manually cache files when auto_file_cache is off')
|
||||
parser.add_argument('--jobname', default='auto', help = 'customize jobname in tracker')
|
||||
parser.add_argument('--tempdir', default='/tmp', help = 'temporary directory in HDFS that can be used to store intermediate results')
|
||||
parser.add_argument('--vcores', default = 1, type=int,
|
||||
help = 'number of vcpores to request in each mapper, set it if each rabit job is multi-threaded')
|
||||
parser.add_argument('-mem', '--memory_mb', default=1024, type=int,
|
||||
help = 'maximum memory used by the process. Guide: set it large (near mapred.cluster.max.map.memory.mb)'\
|
||||
'if you are running multi-threading rabit,'\
|
||||
'so that each node can occupy all the mapper slots in a machine for maximum performance')
|
||||
parser.add_argument('command', nargs='+',
|
||||
help = 'command for rabit program')
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.jobname == 'auto':
|
||||
args.jobname = ('Rabit[nworker=%d]:' % args.nworker) + args.command[0].split('/')[-1];
|
||||
|
||||
if hadoop_binary == None:
|
||||
parser.add_argument('-hb', '--hadoop_binary', required = True,
|
||||
help="path to hadoop binary file")
|
||||
else:
|
||||
parser.add_argument('-hb', '--hadoop_binary', default = hadoop_binary,
|
||||
help="path to hadoop binary file")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.jobname == 'auto':
|
||||
args.jobname = ('Rabit[nworker=%d]:' % args.nworker) + args.command[0].split('/')[-1];
|
||||
|
||||
# detech hadoop version
|
||||
(out, err) = subprocess.Popen('%s version' % args.hadoop_binary, shell = True, stdout=subprocess.PIPE).communicate()
|
||||
out = out.split('\n')[0].split()
|
||||
assert out[0] == 'Hadoop', 'cannot parse hadoop version string'
|
||||
hadoop_version = out[1].split('.')
|
||||
|
||||
(classpath, err) = subprocess.Popen('%s classpath --glob' % args.hadoop_binary, shell = True, stdout=subprocess.PIPE).communicate()
|
||||
|
||||
if hadoop_version < 2:
|
||||
print 'Current Hadoop Version is %s, rabit_yarn will need Yarn(Hadoop 2.0)' % out[1]
|
||||
|
||||
def submit_yarn(nworker, worker_args, worker_env):
|
||||
fset = set([YARN_JAR_PATH])
|
||||
if args.auto_file_cache != 0:
|
||||
for i in range(len(args.command)):
|
||||
f = args.command[i]
|
||||
if os.path.exists(f):
|
||||
fset.add(f)
|
||||
if i == 0:
|
||||
args.command[i] = './' + args.command[i].split('/')[-1]
|
||||
else:
|
||||
args.command[i] = args.command[i].split('/')[-1]
|
||||
if args.command[0].endswith('.py'):
|
||||
flst = [WRAPPER_PATH + '/rabit.py',
|
||||
WRAPPER_PATH + '/librabit_wrapper.so',
|
||||
WRAPPER_PATH + '/librabit_wrapper_mock.so']
|
||||
for f in flst:
|
||||
if os.path.exists(f):
|
||||
fset.add(f)
|
||||
|
||||
cmd = 'java -cp `%s classpath`:%s org.apache.hadoop.yarn.rabit.Client ' % (args.hadoop_binary, YARN_JAR_PATH)
|
||||
env = os.environ.copy()
|
||||
for k, v in worker_env.items():
|
||||
env[k] = str(v)
|
||||
env['rabit_cpu_vcores'] = str(args.vcores)
|
||||
env['rabit_memory_mb'] = str(args.memory_mb)
|
||||
env['rabit_world_size'] = str(args.nworker)
|
||||
|
||||
if args.files != None:
|
||||
for flst in args.files:
|
||||
for f in flst.split('#'):
|
||||
fset.add(f)
|
||||
for f in fset:
|
||||
cmd += ' -file %s' % f
|
||||
cmd += ' -jobname %s ' % args.jobname
|
||||
cmd += ' -tempdir %s ' % args.tempdir
|
||||
cmd += (' '.join(args.command + worker_args))
|
||||
print cmd
|
||||
subprocess.check_call(cmd, shell = True, env = env)
|
||||
|
||||
tracker.submit(args.nworker, [], fun_submit = submit_yarn, verbose = args.verbose, hostIP = args.host_ip)
|
||||
4
yarn/.gitignore
vendored
Normal file
4
yarn/.gitignore
vendored
Normal file
@ -0,0 +1,4 @@
|
||||
bin
|
||||
.classpath
|
||||
.project
|
||||
*.jar
|
||||
@ -1,15 +1,19 @@
|
||||
package org.apache.hadoop.yarn.rabit;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Queue;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.api.ApplicationConstants;
|
||||
@ -19,6 +23,9 @@ import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||
import org.apache.hadoop.yarn.api.records.LocalResource;
|
||||
import org.apache.hadoop.yarn.api.records.LocalResourceType;
|
||||
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
@ -38,6 +45,8 @@ public class ApplicationMaster {
|
||||
private static final Log LOG = LogFactory.getLog(ApplicationMaster.class);
|
||||
// configuration
|
||||
private Configuration conf = new YarnConfiguration();
|
||||
// hdfs handler
|
||||
private FileSystem dfs;
|
||||
|
||||
// number of cores allocated for each task
|
||||
private int numVCores = 1;
|
||||
@ -48,7 +57,7 @@ public class ApplicationMaster {
|
||||
// total number of tasks
|
||||
private int numTasks = 1;
|
||||
// maximum number of attempts to try in each task
|
||||
private int maxNumAttempt = 10;
|
||||
private int maxNumAttempt = 3;
|
||||
// command to launch
|
||||
private String command = "";
|
||||
|
||||
@ -61,6 +70,8 @@ public class ApplicationMaster {
|
||||
|
||||
// whether we start to abort the application, due to whatever fatal reasons
|
||||
private boolean startAbort = false;
|
||||
// worker resources
|
||||
private Map<String, LocalResource> workerResources = new java.util.HashMap<String, LocalResource>();
|
||||
// record the aborting reason
|
||||
private String abortDiagnosis = "";
|
||||
// resource manager
|
||||
@ -81,6 +92,10 @@ public class ApplicationMaster {
|
||||
new ApplicationMaster().run(args);
|
||||
}
|
||||
|
||||
private ApplicationMaster() throws IOException {
|
||||
dfs = FileSystem.get(conf);
|
||||
}
|
||||
|
||||
/**
|
||||
* get integer argument from environment variable
|
||||
*
|
||||
@ -92,11 +107,16 @@ public class ApplicationMaster {
|
||||
* default value
|
||||
* @return the requested result
|
||||
*/
|
||||
private int getEnvInteger(String name, boolean required, int defv) {
|
||||
private int getEnvInteger(String name, boolean required, int defv)
|
||||
throws IOException {
|
||||
String value = System.getenv(name);
|
||||
if (value == null) {
|
||||
if (required)
|
||||
LOG.fatal("environment variable " + name + "not set");
|
||||
if (required) {
|
||||
throw new IOException("environment variable " + name
|
||||
+ " not set");
|
||||
} else {
|
||||
return defv;
|
||||
}
|
||||
}
|
||||
return Integer.valueOf(value);
|
||||
}
|
||||
@ -106,14 +126,37 @@ public class ApplicationMaster {
|
||||
*
|
||||
* @param args
|
||||
*/
|
||||
private void initArgs(String args[]) {
|
||||
for (String c : args) {
|
||||
this.command += c + " ";
|
||||
private void initArgs(String args[]) throws IOException {
|
||||
LOG.info("Invoke initArgs");
|
||||
// cached maps
|
||||
Map<String, Path> cacheFiles = new java.util.HashMap<String, Path>();
|
||||
for (int i = 0; i < args.length; ++i) {
|
||||
if (args[i].equals("-file")) {
|
||||
String[] arr = args[++i].split("#");
|
||||
Path path = new Path(arr[0]);
|
||||
if (arr.length == 1) {
|
||||
cacheFiles.put(path.getName(), path);
|
||||
} else {
|
||||
cacheFiles.put(arr[1], path);
|
||||
}
|
||||
} else {
|
||||
this.command += args[i] + " ";
|
||||
}
|
||||
}
|
||||
for (Map.Entry<String, Path> e : cacheFiles.entrySet()) {
|
||||
LocalResource r = Records.newRecord(LocalResource.class);
|
||||
FileStatus status = dfs.getFileStatus(e.getValue());
|
||||
r.setResource(ConverterUtils.getYarnUrlFromPath(e.getValue()));
|
||||
r.setSize(status.getLen());
|
||||
r.setTimestamp(status.getModificationTime());
|
||||
r.setType(LocalResourceType.FILE);
|
||||
r.setVisibility(LocalResourceVisibility.APPLICATION);
|
||||
workerResources.put(e.getKey(), r);
|
||||
}
|
||||
numVCores = this.getEnvInteger("rabit_cpu_vcores", true, numVCores);
|
||||
numMemoryMB = this.getEnvInteger("rabit_memory_mb", true, numMemoryMB);
|
||||
maxNumAttempt = this.getEnvInteger("rabit_max_attempt", false,
|
||||
maxNumAttempt);
|
||||
numTasks = this.getEnvInteger("rabit_world_size", true, numTasks);
|
||||
maxNumAttempt = this.getEnvInteger("rabit_max_attempt", false, maxNumAttempt);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -121,12 +164,6 @@ public class ApplicationMaster {
|
||||
*/
|
||||
private void run(String args[]) throws Exception {
|
||||
this.initArgs(args);
|
||||
// list of tasks that waits to be submit
|
||||
java.util.Collection<TaskRecord> tasks = new java.util.LinkedList<TaskRecord>();
|
||||
// add waiting tasks
|
||||
for (int i = 0; i < this.numTasks; ++i) {
|
||||
tasks.add(new TaskRecord(i));
|
||||
}
|
||||
this.rmClient = AMRMClientAsync.createAMRMClientAsync(1000,
|
||||
new RMCallbackHandler());
|
||||
this.nmClient = NMClientAsync
|
||||
@ -138,37 +175,52 @@ public class ApplicationMaster {
|
||||
RegisterApplicationMasterResponse response = this.rmClient
|
||||
.registerApplicationMaster(this.appHostName,
|
||||
this.appTrackerPort, this.appTrackerUrl);
|
||||
Resource maxResource = response.getMaximumResourceCapability();
|
||||
if (maxResource.getMemory() < this.numMemoryMB) {
|
||||
LOG.warn("[Rabit] memory requested exceed bound "
|
||||
+ maxResource.getMemory());
|
||||
this.numMemoryMB = maxResource.getMemory();
|
||||
}
|
||||
if (maxResource.getVirtualCores() < this.numVCores) {
|
||||
LOG.warn("[Rabit] memory requested exceed bound "
|
||||
+ maxResource.getVirtualCores());
|
||||
this.numVCores = maxResource.getVirtualCores();
|
||||
}
|
||||
this.submitTasks(tasks);
|
||||
LOG.info("[Rabit] ApplicationMaster started");
|
||||
while (!this.doneAllJobs()) {
|
||||
try {
|
||||
Thread.sleep(100);
|
||||
;
|
||||
} catch (InterruptedException e) {
|
||||
|
||||
boolean success = false;
|
||||
String diagnostics = "";
|
||||
try {
|
||||
// list of tasks that waits to be submit
|
||||
java.util.Collection<TaskRecord> tasks = new java.util.LinkedList<TaskRecord>();
|
||||
// add waiting tasks
|
||||
for (int i = 0; i < this.numTasks; ++i) {
|
||||
tasks.add(new TaskRecord(i));
|
||||
}
|
||||
Resource maxResource = response.getMaximumResourceCapability();
|
||||
|
||||
if (maxResource.getMemory() < this.numMemoryMB) {
|
||||
LOG.warn("[Rabit] memory requested exceed bound "
|
||||
+ maxResource.getMemory());
|
||||
this.numMemoryMB = maxResource.getMemory();
|
||||
}
|
||||
if (maxResource.getVirtualCores() < this.numVCores) {
|
||||
LOG.warn("[Rabit] memory requested exceed bound "
|
||||
+ maxResource.getVirtualCores());
|
||||
this.numVCores = maxResource.getVirtualCores();
|
||||
}
|
||||
this.submitTasks(tasks);
|
||||
LOG.info("[Rabit] ApplicationMaster started");
|
||||
while (!this.doneAllJobs()) {
|
||||
try {
|
||||
Thread.sleep(100);
|
||||
} catch (InterruptedException e) {
|
||||
}
|
||||
}
|
||||
assert (killedTasks.size() + finishedTasks.size() == numTasks);
|
||||
success = finishedTasks.size() == numTasks;
|
||||
LOG.info("Application completed. Stopping running containers");
|
||||
nmClient.stop();
|
||||
diagnostics = "Diagnostics." + ", num_tasks" + this.numTasks
|
||||
+ ", finished=" + this.finishedTasks.size() + ", failed="
|
||||
+ this.killedTasks.size() + "\n" + this.abortDiagnosis;
|
||||
LOG.info(diagnostics);
|
||||
} catch (Exception e) {
|
||||
diagnostics = e.toString();
|
||||
}
|
||||
assert (killedTasks.size() + finishedTasks.size() == numTasks);
|
||||
boolean success = finishedTasks.size() == numTasks;
|
||||
LOG.info("Application completed. Stopping running containers");
|
||||
nmClient.stop();
|
||||
String diagnostics = "Diagnostics." + ", num_tasks" + this.numTasks
|
||||
+ ", finished=" + this.finishedTasks.size() + ", failed="
|
||||
+ this.killedTasks.size() + "\n" + this.abortDiagnosis;
|
||||
rmClient.unregisterApplicationMaster(
|
||||
success ? FinalApplicationStatus.SUCCEEDED
|
||||
: FinalApplicationStatus.FAILED, diagnostics,
|
||||
appTrackerUrl);
|
||||
if (!success) throw new Exception("Application not successful");
|
||||
}
|
||||
|
||||
/**
|
||||
@ -213,14 +265,21 @@ public class ApplicationMaster {
|
||||
task.containerRequest = null;
|
||||
ContainerLaunchContext ctx = Records
|
||||
.newRecord(ContainerLaunchContext.class);
|
||||
String cmd = command + " 1>"
|
||||
String cmd =
|
||||
// use this to setup CLASSPATH correctly for libhdfs
|
||||
"CLASSPATH=${CLASSPATH}:`${HADOOP_PREFIX}/bin/hadoop classpath --glob` "
|
||||
+ this.command + " 1>"
|
||||
+ ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout"
|
||||
+ " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR
|
||||
+ "/stderr";
|
||||
LOG.info(cmd);
|
||||
ctx.setCommands(Collections.singletonList(cmd));
|
||||
LOG.info(workerResources);
|
||||
ctx.setLocalResources(this.workerResources);
|
||||
// setup environment variables
|
||||
Map<String, String> env = new java.util.HashMap<String, String>();
|
||||
// setup class path
|
||||
|
||||
// setup class path, this is kind of duplicated, ignoring
|
||||
StringBuilder cpath = new StringBuilder("${CLASSPATH}:./*");
|
||||
for (String c : conf.getStrings(
|
||||
YarnConfiguration.YARN_APPLICATION_CLASSPATH,
|
||||
@ -228,10 +287,12 @@ public class ApplicationMaster {
|
||||
cpath.append(':');
|
||||
cpath.append(c.trim());
|
||||
}
|
||||
env.put("CLASSPATH", cpath.toString());
|
||||
// setup LD_LIBARY_pATH path for libhdfs
|
||||
// already use hadoop command to get class path in worker, maybe a better solution in future
|
||||
// env.put("CLASSPATH", cpath.toString());
|
||||
// setup LD_LIBARY_PATH path for libhdfs
|
||||
env.put("LD_LIBRARY_PATH",
|
||||
"${LD_LIBRARY_PATH}:$HADOOP_HDFS_HOME/lib/native:$JAVA_HOME/jre/lib/amd64/server");
|
||||
env.put("PYTHONPATH", "${PYTHONPATH}:.");
|
||||
// inherit all rabit variables
|
||||
for (Map.Entry<String, String> e : System.getenv().entrySet()) {
|
||||
if (e.getKey().startsWith("rabit_")) {
|
||||
@ -240,6 +301,7 @@ public class ApplicationMaster {
|
||||
}
|
||||
env.put("rabit_task_id", String.valueOf(task.taskId));
|
||||
env.put("rabit_num_trial", String.valueOf(task.attemptCounter));
|
||||
|
||||
ctx.setEnvironment(env);
|
||||
synchronized (this) {
|
||||
assert (!this.runningTasks.containsKey(container.getId()));
|
||||
|
||||
@ -1,13 +1,14 @@
|
||||
package org.apache.hadoop.yarn.rabit;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.yarn.api.ApplicationConstants;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||
@ -27,35 +28,82 @@ import org.apache.hadoop.yarn.util.Records;
|
||||
|
||||
public class Client {
|
||||
// logger
|
||||
private static final Log LOG = LogFactory.getLog(ApplicationMaster.class);
|
||||
private static final Log LOG = LogFactory.getLog(Client.class);
|
||||
// permission for temp file
|
||||
private static final FsPermission permTemp = new FsPermission("777");
|
||||
// configuration
|
||||
private YarnConfiguration conf = new YarnConfiguration();
|
||||
// hdfs handler
|
||||
private FileSystem dfs;
|
||||
// cached maps
|
||||
private Map<String, Path> cacheFiles = new java.util.HashMap<String, Path>();
|
||||
private Map<String, String> cacheFiles = new java.util.HashMap<String, String>();
|
||||
// enviroment variable to setup cachefiles
|
||||
private String cacheFileArg = "";
|
||||
// args to pass to application master
|
||||
private String appArgs;
|
||||
|
||||
private String appArgs = "";
|
||||
// HDFS Path to store temporal result
|
||||
private String tempdir = "/tmp";
|
||||
// job name
|
||||
private String jobName = "";
|
||||
/**
|
||||
* get the local resource setting
|
||||
* constructor
|
||||
* @throws IOException
|
||||
*/
|
||||
private Client() throws IOException {
|
||||
dfs = FileSystem.get(conf);
|
||||
}
|
||||
|
||||
/**
|
||||
* ge
|
||||
*
|
||||
* @param fmaps
|
||||
* the file maps
|
||||
* @return the resource map
|
||||
* @throws IOException
|
||||
*/
|
||||
private Map<String, LocalResource> getLocalResource() throws IOException {
|
||||
private Map<String, LocalResource> setupCacheFiles(ApplicationId appId) throws IOException {
|
||||
// create temporary rabit directory
|
||||
Path tmpPath = new Path(this.tempdir);
|
||||
if (!dfs.exists(tmpPath)) {
|
||||
dfs.mkdirs(tmpPath, permTemp);
|
||||
LOG.info("HDFS temp directory do not exist, creating.. " + tmpPath);
|
||||
}
|
||||
tmpPath = new Path(tmpPath + "/temp-rabit-yarn-" + appId);
|
||||
if (dfs.exists(tmpPath)) {
|
||||
dfs.delete(tmpPath, true);
|
||||
}
|
||||
// create temporary directory
|
||||
FileSystem.mkdirs(dfs, tmpPath, permTemp);
|
||||
|
||||
StringBuilder cstr = new StringBuilder();
|
||||
Map<String, LocalResource> rmap = new java.util.HashMap<String, LocalResource>();
|
||||
for (Map.Entry<String, Path> e : cacheFiles.entrySet()) {
|
||||
for (Map.Entry<String, String> e : cacheFiles.entrySet()) {
|
||||
LocalResource r = Records.newRecord(LocalResource.class);
|
||||
Path path = e.getValue();
|
||||
FileStatus status = FileSystem.get(conf).getFileStatus(path);
|
||||
Path path = new Path(e.getValue());
|
||||
// copy local data to temporary folder in HDFS
|
||||
if (!e.getValue().startsWith("hdfs://")) {
|
||||
Path dst = new Path("hdfs://" + tmpPath + "/"+ path.getName());
|
||||
dfs.copyFromLocalFile(false, true, path, dst);
|
||||
dfs.setPermission(dst, permTemp);
|
||||
dfs.deleteOnExit(dst);
|
||||
path = dst;
|
||||
}
|
||||
FileStatus status = dfs.getFileStatus(path);
|
||||
r.setResource(ConverterUtils.getYarnUrlFromPath(path));
|
||||
r.setSize(status.getLen());
|
||||
r.setTimestamp(status.getModificationTime());
|
||||
r.setType(LocalResourceType.FILE);
|
||||
r.setVisibility(LocalResourceVisibility.PUBLIC);
|
||||
r.setVisibility(LocalResourceVisibility.APPLICATION);
|
||||
rmap.put(e.getKey(), r);
|
||||
cstr.append(" -file \"");
|
||||
cstr.append(path.toString());
|
||||
cstr.append('#');
|
||||
cstr.append(e.getKey());
|
||||
cstr.append("\"");
|
||||
}
|
||||
|
||||
dfs.deleteOnExit(tmpPath);
|
||||
this.cacheFileArg = cstr.toString();
|
||||
return rmap;
|
||||
}
|
||||
|
||||
@ -80,6 +128,7 @@ public class Client {
|
||||
env.put(e.getKey(), e.getValue());
|
||||
}
|
||||
}
|
||||
LOG.debug(env);
|
||||
return env;
|
||||
}
|
||||
|
||||
@ -89,18 +138,20 @@ public class Client {
|
||||
* @param args
|
||||
*/
|
||||
private void initArgs(String[] args) {
|
||||
// directly pass all args except args0
|
||||
// directly pass all arguments except args0
|
||||
StringBuilder sargs = new StringBuilder("");
|
||||
for (int i = 0; i < args.length; ++i) {
|
||||
if (args[i] == "-file") {
|
||||
String[] arr = args[i + 1].split("#");
|
||||
Path path = new Path(arr[0]);
|
||||
if (args[i].equals("-file")) {
|
||||
String[] arr = args[++i].split("#");
|
||||
if (arr.length == 1) {
|
||||
cacheFiles.put(path.getName(), path);
|
||||
cacheFiles.put(new Path(arr[0]).getName(), arr[0]);
|
||||
} else {
|
||||
cacheFiles.put(arr[1], path);
|
||||
cacheFiles.put(arr[1], arr[0]);
|
||||
}
|
||||
++i;
|
||||
} else if(args[i].equals("-jobname")) {
|
||||
this.jobName = args[++i];
|
||||
} else if(args[i].equals("-tempdir")) {
|
||||
this.tempdir = args[++i];
|
||||
} else {
|
||||
sargs.append(" ");
|
||||
sargs.append(args[i]);
|
||||
@ -128,33 +179,34 @@ public class Client {
|
||||
// Set up the container launch context for the application master
|
||||
ContainerLaunchContext amContainer = Records
|
||||
.newRecord(ContainerLaunchContext.class);
|
||||
amContainer.setCommands(Collections.singletonList("$JAVA_HOME/bin/java"
|
||||
ApplicationSubmissionContext appContext = app
|
||||
.getApplicationSubmissionContext();
|
||||
// Submit application
|
||||
ApplicationId appId = appContext.getApplicationId();
|
||||
// setup cache-files and environment variables
|
||||
amContainer.setLocalResources(this.setupCacheFiles(appId));
|
||||
amContainer.setEnvironment(this.getEnvironment());
|
||||
String cmd = "$JAVA_HOME/bin/java"
|
||||
+ " -Xmx256M"
|
||||
+ " org.apache.hadoop.yarn.rabit.ApplicationMaster"
|
||||
+ this.appArgs + " 1>"
|
||||
+ this.cacheFileArg + ' ' + this.appArgs + " 1>"
|
||||
+ ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout"
|
||||
+ " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR
|
||||
+ "/stderr"));
|
||||
|
||||
// setup cache files
|
||||
amContainer.setLocalResources(this.getLocalResource());
|
||||
amContainer.setEnvironment(this.getEnvironment());
|
||||
+ " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr";
|
||||
LOG.debug(cmd);
|
||||
amContainer.setCommands(Collections.singletonList(cmd));
|
||||
|
||||
// Set up resource type requirements for ApplicationMaster
|
||||
Resource capability = Records.newRecord(Resource.class);
|
||||
capability.setMemory(256);
|
||||
capability.setVirtualCores(1);
|
||||
LOG.info("jobname=" + this.jobName);
|
||||
|
||||
ApplicationSubmissionContext appContext = app
|
||||
.getApplicationSubmissionContext();
|
||||
appContext.setApplicationName("Rabit-YARN");
|
||||
appContext.setApplicationName(jobName + ":RABIT-YARN");
|
||||
appContext.setAMContainerSpec(amContainer);
|
||||
appContext.setResource(capability);
|
||||
appContext.setQueue("default");
|
||||
|
||||
// Submit application
|
||||
ApplicationId appId = appContext.getApplicationId();
|
||||
LOG.info("Submitting application " + appId);
|
||||
|
||||
LOG.info("Submitting application " + appId);
|
||||
yarnClient.submitApplication(appContext);
|
||||
|
||||
ApplicationReport appReport = yarnClient.getApplicationReport(appId);
|
||||
|
||||
@ -1,22 +1,24 @@
|
||||
package org.apache.hadoop.yarn.rabit;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
|
||||
|
||||
/**
|
||||
* data structure to hold the task information
|
||||
*/
|
||||
public class TaskRecord {
|
||||
// task id of the task
|
||||
public int taskId = 0;
|
||||
// number of failed attempts to run the task
|
||||
public int attemptCounter = 0;
|
||||
// container request, can be null if task is already running
|
||||
public ContainerRequest containerRequest = null;
|
||||
// running container, can be null if the task is not launched
|
||||
public Container container = null;
|
||||
// whether we have requested abortion of this task
|
||||
public boolean abortRequested = false;
|
||||
|
||||
public TaskRecord(int taskId) {
|
||||
this.taskId = taskId;
|
||||
}
|
||||
// task id of the task
|
||||
public int taskId = 0;
|
||||
// number of failed attempts to run the task
|
||||
public int attemptCounter = 0;
|
||||
// container request, can be null if task is already running
|
||||
public ContainerRequest containerRequest = null;
|
||||
// running container, can be null if the task is not launched
|
||||
public Container container = null;
|
||||
// whether we have requested abortion of this task
|
||||
public boolean abortRequested = false;
|
||||
|
||||
public TaskRecord(int taskId) {
|
||||
this.taskId = taskId;
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user