change formater

This commit is contained in:
tqchen 2015-03-08 12:29:07 -07:00
parent 2fbda812bc
commit 4f28e32ebd
5 changed files with 543 additions and 461 deletions

View File

@ -6,15 +6,15 @@ then
fi
# put the local training file to HDFS
#hadoop fs -rm -r -f $2/data
hadoop fs -rm -r -f $2/data
hadoop fs -rm -r -f $2/mushroom.linear.model
#hadoop fs -mkdir $2/data
#hadoop fs -put ../data/agaricus.txt.train $2/data
hadoop fs -mkdir $2/data
hadoop fs -put ../data/agaricus.txt.train $2/data
# submit to hadoop
../../tracker/rabit_hadoop_streaming.py -n $1 --vcores 1 -i $2/data/agaricus.txt.train -o $2/mushroom.linear.model linear.rabit stdin model_out=stdout "${*:3}"
# get the final model file
#hadoop fs -get $2/mushroom.linear.model/part-00000 ./linear.model
hadoop fs -get $2/mushroom.linear.model/part-00000 ./linear.model
#./linear.rabit ../data/agaricus.txt.test task=pred model_in=linear.model
./linear.rabit ../data/agaricus.txt.test task=pred model_in=linear.model

View File

@ -15,7 +15,7 @@ export CXX = g++
export MPICXX = mpicxx
# whether use HDFS support during compile
USE_HDFS = 1
USE_HDFS = 0
# path to libjvm.so
LIBJVM=$(JAVA_HOME)/jre/lib/amd64/server

View File

@ -34,13 +34,11 @@ if hadoop_binary == None or hadoop_streaming_jar == None:
', or modify rabit_hadoop.py line 16', stacklevel = 2)
parser = argparse.ArgumentParser(description='Rabit script to submit rabit jobs using Hadoop Streaming.'\
'This script support both Hadoop 1.0 and Yarn(MRv2), Yarn is recommended')
'It is Highly recommended to use rabit_yarn.py instead')
parser.add_argument('-n', '--nworker', required=True, type=int,
help = 'number of worker proccess to be launched')
parser.add_argument('-hip', '--host_ip', default='auto', type=str,
help = 'host IP address if cannot be automatically guessed, specify the IP of submission machine')
parser.add_argument('-nt', '--nthread', default = -1, type=int,
help = 'number of thread in each mapper to be launched, set it if each rabit job is multi-threaded')
parser.add_argument('-i', '--input', required=True,
help = 'input path in HDFS')
parser.add_argument('-o', '--output', required=True,
@ -61,6 +59,8 @@ parser.add_argument('--jobname', default='auto', help = 'customize jobname in tr
parser.add_argument('--timeout', default=600000000, type=int,
help = 'timeout (in million seconds) of each mapper job, automatically set to a very long time,'\
'normally you do not need to set this ')
parser.add_argument('--vcores', default = -1, type=int,
help = 'number of vcpores to request in each mapper, set it if each rabit job is multi-threaded')
parser.add_argument('-mem', '--memory_mb', default=-1, type=int,
help = 'maximum memory used by the process. Guide: set it large (near mapred.cluster.max.map.memory.mb)'\
'if you are running multi-threading rabit,'\
@ -95,6 +95,8 @@ use_yarn = int(hadoop_version[0]) >= 2
print 'Current Hadoop Version is %s' % out[1]
def hadoop_streaming(nworker, worker_args, worker_envs, use_yarn):
worker_envs['CLASSPATH'] = '`$HADOOP_HOME/bin/hadoop classpath --glob` '
worker_envs['LD_LIBRARY_PATH'] = '{LD_LIBRARY_PATH}:$HADOOP_HDFS_HOME/lib/native:$JAVA_HOME/jre/lib/amd64/server'
fset = set()
if args.auto_file_cache:
for i in range(len(args.command)):
@ -132,12 +134,12 @@ def hadoop_streaming(nworker, worker_args, worker_envs, use_yarn):
cmd += ' -D%s=%s' % (kmap['jobname'], args.jobname)
envstr = ','.join('%s=%s' % (k, str(v)) for k, v in worker_envs.items())
cmd += ' -D%s=\"%s\"' % (kmap['env'], envstr)
if args.nthread != -1:
if args.vcores != -1:
if kmap['nthread'] is None:
warnings.warn('nthread can only be set in Yarn(Hadoop version greater than 2.0),'\
'it is recommended to use Yarn to submit rabit jobs', stacklevel = 2)
else:
cmd += ' -D%s=%d' % (kmap['nthread'], args.nthread)
cmd += ' -D%s=%d' % (kmap['nthread'], args.vcores)
cmd += ' -D%s=%d' % (kmap['timeout'], args.timeout)
if args.memory_mb != -1:
cmd += ' -D%s=%d' % (kmap['timeout'], args.timeout)
@ -153,5 +155,5 @@ def hadoop_streaming(nworker, worker_args, worker_envs, use_yarn):
print cmd
subprocess.check_call(cmd, shell = True)
fun_submit = lambda nworker, worker_args: hadoop_streaming(nworker, worker_args, int(hadoop_version[0]) >= 2)
fun_submit = lambda nworker, worker_args, worker_envs: hadoop_streaming(nworker, worker_args, worker_envs, int(hadoop_version[0]) >= 2)
tracker.submit(args.nworker, [], fun_submit = fun_submit, verbose = args.verbose, hostIP = args.host_ip)

View File

@ -80,22 +80,30 @@ public class ApplicationMaster {
public static void main(String[] args) throws Exception {
new ApplicationMaster().run(args);
}
/**
* get integer argument from environment variable
* @param name name of key
* @param required whether this is required
* @param defv default value
*
* @param name
* name of key
* @param required
* whether this is required
* @param defv
* default value
* @return the requested result
*/
private int getEnvInteger(String name, boolean required, int defv) {
String value = System.getenv(name);
if (value == null) {
if (required) LOG.fatal("environment variable " + name + "not set");
if (required)
LOG.fatal("environment variable " + name + "not set");
}
return Integer.valueOf(value);
}
/**
* initialize from arguments and command lines
*
* @param args
*/
private void initArgs(String args[]) {
@ -104,8 +112,10 @@ public class ApplicationMaster {
}
numVCores = this.getEnvInteger("rabit_cpu_vcores", true, numVCores);
numMemoryMB = this.getEnvInteger("rabit_memory_mb", true, numMemoryMB);
maxNumAttempt = this.getEnvInteger("rabit_max_attempt", false, maxNumAttempt);
maxNumAttempt = this.getEnvInteger("rabit_max_attempt", false,
maxNumAttempt);
}
/**
* called to start the application
*/
@ -117,28 +127,34 @@ public class ApplicationMaster {
for (int i = 0; i < this.numTasks; ++i) {
tasks.add(new TaskRecord(i));
}
this.rmClient = AMRMClientAsync.createAMRMClientAsync(1000, new RMCallbackHandler());
this.nmClient = NMClientAsync.createNMClientAsync(new NMCallbackHandler());
this.rmClient = AMRMClientAsync.createAMRMClientAsync(1000,
new RMCallbackHandler());
this.nmClient = NMClientAsync
.createNMClientAsync(new NMCallbackHandler());
this.rmClient.init(conf);
this.rmClient.start();
this.nmClient.init(conf);
this.nmClient.start();
RegisterApplicationMasterResponse response =
this.rmClient.registerApplicationMaster(this.appHostName, this.appTrackerPort, this.appTrackerUrl);
RegisterApplicationMasterResponse response = this.rmClient
.registerApplicationMaster(this.appHostName,
this.appTrackerPort, this.appTrackerUrl);
Resource maxResource = response.getMaximumResourceCapability();
if (maxResource.getMemory() < this.numMemoryMB) {
LOG.warn("[Rabit] memory requested exceed bound " + maxResource.getMemory());
LOG.warn("[Rabit] memory requested exceed bound "
+ maxResource.getMemory());
this.numMemoryMB = maxResource.getMemory();
}
if (maxResource.getVirtualCores() < this.numVCores) {
LOG.warn("[Rabit] memory requested exceed bound " + maxResource.getVirtualCores());
LOG.warn("[Rabit] memory requested exceed bound "
+ maxResource.getVirtualCores());
this.numVCores = maxResource.getVirtualCores();
}
this.submitTasks(tasks);
LOG.info("[Rabit] ApplicationMaster started");
while (!this.doneAllJobs()) {
try {
Thread.sleep(100);;
Thread.sleep(100);
;
} catch (InterruptedException e) {
}
}
@ -149,20 +165,26 @@ public class ApplicationMaster {
String diagnostics = "Diagnostics." + ", num_tasks" + this.numTasks
+ ", finished=" + this.finishedTasks.size() + ", failed="
+ this.killedTasks.size() + "\n" + this.abortDiagnosis;
rmClient.unregisterApplicationMaster
(success ? FinalApplicationStatus.SUCCEEDED : FinalApplicationStatus.FAILED,
diagnostics, appTrackerUrl);
rmClient.unregisterApplicationMaster(
success ? FinalApplicationStatus.SUCCEEDED
: FinalApplicationStatus.FAILED, diagnostics,
appTrackerUrl);
}
/**
* check if the job finishes
*
* @return whether we finished all the jobs
*/
private synchronized boolean doneAllJobs() {
return pendingTasks.size() == 0 && runningTasks.size() == 0;
}
/**
* submit tasks to request containers for the tasks
* @param tasks a collection of tasks we want to ask container for
*
* @param tasks
* a collection of tasks we want to ask container for
*/
private synchronized void submitTasks(Collection<TaskRecord> tasks) {
for (TaskRecord r : tasks) {
@ -171,36 +193,45 @@ public class ApplicationMaster {
resource.setVirtualCores(numVCores);
Priority priority = Records.newRecord(Priority.class);
priority.setPriority(this.appPriority);
r.containerRequest = new ContainerRequest(resource, null, null, priority);
r.containerRequest = new ContainerRequest(resource, null, null,
priority);
rmClient.addContainerRequest(r.containerRequest);
pendingTasks.add(r);
}
}
/**
* launch the task on container
* @param container container to run the task
* @param task the task
*
* @param container
* container to run the task
* @param task
* the task
*/
private void launchTask(Container container, TaskRecord task) {
task.container = container;
task.containerRequest = null;
ContainerLaunchContext ctx =
Records.newRecord(ContainerLaunchContext.class);
String cmd = command + " 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout"
+ " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr";
ContainerLaunchContext ctx = Records
.newRecord(ContainerLaunchContext.class);
String cmd = command + " 1>"
+ ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout"
+ " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR
+ "/stderr";
ctx.setCommands(Collections.singletonList(cmd));
// setup environment variables
Map<String, String> env = new java.util.HashMap<String, String>();
// setup class path
StringBuilder cpath = new StringBuilder("${CLASSPATH}:./*");
for (String c : conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH,
for (String c : conf.getStrings(
YarnConfiguration.YARN_APPLICATION_CLASSPATH,
YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
cpath.append(':');
cpath.append(c.trim());
}
env.put("CLASSPATH", cpath.toString());
// setup LD_LIBARY_pATH path for libhdfs
env.put("LD_LIBRARY_PATH", "${LD_LIBRARY_PATH}:$HADOOP_HDFS_HOME/lib/native");
env.put("LD_LIBRARY_PATH",
"${LD_LIBRARY_PATH}:$HADOOP_HDFS_HOME/lib/native:$JAVA_HOME/jre/lib/amd64/server");
// inherit all rabit variables
for (Map.Entry<String, String> e : System.getenv().entrySet()) {
if (e.getKey().startsWith("rabit_")) {
@ -216,14 +247,19 @@ public class ApplicationMaster {
this.nmClient.startContainerAsync(container, ctx);
}
}
/**
* free the containers that have not yet been launched
*
* @param containers
*/
private synchronized void freeUnusedContainers(Collection<Container> containers) {
private synchronized void freeUnusedContainers(
Collection<Container> containers) {
}
/**
* handle method for AMRMClientAsync.CallbackHandler container allocation
*
* @param containers
*/
private synchronized void onContainersAllocated(List<Container> containers) {
@ -236,22 +272,28 @@ public class ApplicationMaster {
TaskRecord task;
task = pendingTasks.poll();
if (task == null) {
freelist.add(c); continue;
freelist.add(c);
continue;
}
this.launchTask(c, task);
}
this.freeUnusedContainers(freelist);
}
/**
* start aborting the job
* @param msg the fatal message
*
* @param msg
* the fatal message
*/
private synchronized void abortJob(String msg) {
if (!this.startAbort) this.abortDiagnosis = msg;
if (!this.startAbort)
this.abortDiagnosis = msg;
this.startAbort = true;
for (TaskRecord r : this.runningTasks.values()) {
if (!r.abortRequested) {
nmClient.stopContainerAsync(r.container.getId(), r.container.getNodeId());
nmClient.stopContainerAsync(r.container.getId(),
r.container.getNodeId());
r.abortRequested = true;
}
}
@ -262,20 +304,24 @@ public class ApplicationMaster {
this.pendingTasks.clear();
LOG.info(msg);
}
/**
* handle non fatal failures
*
* @param cid
*/
private synchronized void handleFailure(Collection<ContainerId> failed) {
Collection<TaskRecord> tasks = new java.util.LinkedList<TaskRecord>();
for (ContainerId cid : failed) {
TaskRecord r = runningTasks.remove(cid);
if (r == null) continue;
if (r == null)
continue;
r.attemptCounter += 1;
r.container = null;
tasks.add(r);
if (r.attemptCounter >= this.maxNumAttempt) {
this.abortJob("[Rabit] Task " + r.taskId + " failed more than " + r.attemptCounter + "times");
this.abortJob("[Rabit] Task " + r.taskId + " failed more than "
+ r.attemptCounter + "times");
}
}
if (this.startAbort) {
@ -284,9 +330,12 @@ public class ApplicationMaster {
this.submitTasks(tasks);
}
}
/**
* handle method for AMRMClientAsync.CallbackHandler container allocation
* @param status list of status
*
* @param status
* list of status
*/
private synchronized void onContainersCompleted(List<ContainerStatus> status) {
Collection<ContainerId> failed = new java.util.LinkedList<ContainerId>();
@ -294,26 +343,33 @@ public class ApplicationMaster {
assert (s.getState().equals(ContainerState.COMPLETE));
int exstatus = s.getExitStatus();
TaskRecord r = runningTasks.get(s.getContainerId());
if (r == null) continue;
if (r == null)
continue;
if (exstatus == ContainerExitStatus.SUCCESS) {
finishedTasks.add(r);
runningTasks.remove(s.getContainerId());
} else {
switch (exstatus) {
case ContainerExitStatus.KILLED_EXCEEDED_PMEM:
this.abortJob("[Rabit] Task " + r.taskId + " killed because of exceeding allocated physical memory");
this.abortJob("[Rabit] Task "
+ r.taskId
+ " killed because of exceeding allocated physical memory");
break;
case ContainerExitStatus.KILLED_EXCEEDED_VMEM:
this.abortJob("[Rabit] Task " + r.taskId + " killed because of exceeding allocated virtual memory");
this.abortJob("[Rabit] Task "
+ r.taskId
+ " killed because of exceeding allocated virtual memory");
break;
default:
LOG.info("[Rabit] Task " + r.taskId + " exited with status " + exstatus);
LOG.info("[Rabit] Task " + r.taskId
+ " exited with status " + exstatus);
failed.add(s.getContainerId());
}
}
}
this.handleFailure(failed);
}
/**
* callback handler for resource manager
*/
@ -322,49 +378,66 @@ public class ApplicationMaster {
public float getProgress() {
return 1.0f - (float) (pendingTasks.size()) / numTasks;
}
@Override
public void onContainersAllocated(List<Container> containers) {
ApplicationMaster.this.onContainersAllocated(containers);
}
@Override
public void onContainersCompleted(List<ContainerStatus> status) {
ApplicationMaster.this.onContainersCompleted(status);
}
@Override
public void onError(Throwable ex) {
ApplicationMaster.this.abortJob("[Rabit] Resource manager Error " + ex.toString());
ApplicationMaster.this.abortJob("[Rabit] Resource manager Error "
+ ex.toString());
}
@Override
public void onNodesUpdated(List<NodeReport> nodereport) {
}
@Override
public void onShutdownRequest() {
ApplicationMaster.this.abortJob("[Rabit] Get shutdown request, start to shutdown...");
ApplicationMaster.this
.abortJob("[Rabit] Get shutdown request, start to shutdown...");
}
}
private class NMCallbackHandler implements NMClientAsync.CallbackHandler {
@Override
public void onContainerStarted(ContainerId cid, Map<String, ByteBuffer> services) {
public void onContainerStarted(ContainerId cid,
Map<String, ByteBuffer> services) {
LOG.debug("onContainerStarted Invoked");
}
@Override
public void onContainerStatusReceived(ContainerId cid, ContainerStatus status) {
public void onContainerStatusReceived(ContainerId cid,
ContainerStatus status) {
LOG.debug("onContainerStatusReceived Invoked");
}
@Override
public void onContainerStopped(ContainerId cid) {
LOG.debug("onContainerStopped Invoked");
}
@Override
public void onGetContainerStatusError(ContainerId cid, Throwable ex) {
LOG.debug("onGetContainerStatusError Invoked: " + ex.toString());
ApplicationMaster.this.handleFailure(Collections.singletonList(cid));
ApplicationMaster.this
.handleFailure(Collections.singletonList(cid));
}
@Override
public void onStartContainerError(ContainerId cid, Throwable ex) {
LOG.debug("onStartContainerError Invoked: " + ex.toString());
ApplicationMaster.this.handleFailure(Collections.singletonList(cid));
ApplicationMaster.this
.handleFailure(Collections.singletonList(cid));
}
@Override
public void onStopContainerError(ContainerId cid, Throwable ex) {
LOG.info("onStopContainerError Invoked: " + ex.toString());

View File

@ -34,9 +34,12 @@ public class Client {
private Map<String, Path> cacheFiles = new java.util.HashMap<String, Path>();
// args to pass to application master
private String appArgs;
/**
* get the local resource setting
* @param fmaps the file maps
*
* @param fmaps
* the file maps
* @return the resource map
* @throws IOException
*/
@ -55,15 +58,18 @@ public class Client {
}
return rmap;
}
/**
* get the environment variables for container
*
* @return the env variable for child class
*/
private Map<String, String> getEnvironment() {
// Setup environment variables
Map<String, String> env = new java.util.HashMap<String, String>();
String cpath = "${CLASSPATH}:./*";
for (String c : conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH,
for (String c : conf.getStrings(
YarnConfiguration.YARN_APPLICATION_CLASSPATH,
YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
cpath += ':';
cpath += c.trim();
@ -76,8 +82,10 @@ public class Client {
}
return env;
}
/**
* initialize the settings
*
* @param args
*/
private void initArgs(String[] args) {
@ -100,6 +108,7 @@ public class Client {
}
this.appArgs = sargs.toString();
}
private void run(String[] args) throws Exception {
if (args.length == 0) {
System.out.println("Usage: [options] [commands..]");
@ -117,17 +126,15 @@ public class Client {
YarnClientApplication app = yarnClient.createApplication();
// Set up the container launch context for the application master
ContainerLaunchContext amContainer =
Records.newRecord(ContainerLaunchContext.class);
amContainer.setCommands(
Collections.singletonList(
"$JAVA_HOME/bin/java" +
" -Xmx256M" +
" org.apache.hadoop.yarn.rabit.ApplicationMaster" + this.appArgs +
" 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" +
" 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr"
)
);
ContainerLaunchContext amContainer = Records
.newRecord(ContainerLaunchContext.class);
amContainer.setCommands(Collections.singletonList("$JAVA_HOME/bin/java"
+ " -Xmx256M"
+ " org.apache.hadoop.yarn.rabit.ApplicationMaster"
+ this.appArgs + " 1>"
+ ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout"
+ " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR
+ "/stderr"));
// setup cache files
amContainer.setLocalResources(this.getLocalResource());
@ -138,8 +145,8 @@ public class Client {
capability.setMemory(256);
capability.setVirtualCores(1);
ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext();
ApplicationSubmissionContext appContext = app
.getApplicationSubmissionContext();
appContext.setApplicationName("Rabit-YARN");
appContext.setAMContainerSpec(amContainer);
appContext.setResource(capability);
@ -152,22 +159,22 @@ public class Client {
ApplicationReport appReport = yarnClient.getApplicationReport(appId);
YarnApplicationState appState = appReport.getYarnApplicationState();
while (appState != YarnApplicationState.FINISHED &&
appState != YarnApplicationState.KILLED &&
appState != YarnApplicationState.FAILED) {
while (appState != YarnApplicationState.FINISHED
&& appState != YarnApplicationState.KILLED
&& appState != YarnApplicationState.FAILED) {
Thread.sleep(100);
appReport = yarnClient.getApplicationReport(appId);
appState = appReport.getYarnApplicationState();
}
System.out.println(
"Application " + appId + " finished with" +
" state " + appState +
" at " + appReport.getFinishTime());
if (!appReport.getFinalApplicationStatus().equals(FinalApplicationStatus.SUCCEEDED)) {
System.out.println("Application " + appId + " finished with"
+ " state " + appState + " at " + appReport.getFinishTime());
if (!appReport.getFinalApplicationStatus().equals(
FinalApplicationStatus.SUCCEEDED)) {
System.err.println(appReport.getDiagnostics());
}
}
public static void main(String[] args) throws Exception {
new Client().run(args);
}