fix hadoop

This commit is contained in:
tqchen 2015-03-12 20:57:49 -07:00
parent 6812f14886
commit 9b66e7edf2
5 changed files with 80 additions and 41 deletions

View File

@ -97,15 +97,18 @@ class HDFSSplit : public LineSplitBase {
public: public:
explicit HDFSSplit(const char *uri, unsigned rank, unsigned nsplit) { explicit HDFSSplit(const char *uri, unsigned rank, unsigned nsplit) {
fs_ = hdfsConnect("default", 0); fs_ = hdfsConnect("default", 0);
utils::Check(fs_ != NULL, "error when connecting to default HDFS");
std::vector<std::string> paths; std::vector<std::string> paths;
LineSplitBase::SplitNames(&paths, uri, "#"); LineSplitBase::SplitNames(&paths, uri, "#");
// get the files // get the files
std::vector<size_t> fsize; std::vector<size_t> fsize;
for (size_t i = 0; i < paths.size(); ++i) { for (size_t i = 0; i < paths.size(); ++i) {
hdfsFileInfo *info = hdfsGetPathInfo(fs_, paths[i].c_str()); hdfsFileInfo *info = hdfsGetPathInfo(fs_, paths[i].c_str());
utils::Check(info != NULL, "path %s do not exist", paths[i].c_str());
if (info->mKind == 'D') { if (info->mKind == 'D') {
int nentry; int nentry;
hdfsFileInfo *files = hdfsListDirectory(fs_, info->mName, &nentry); hdfsFileInfo *files = hdfsListDirectory(fs_, info->mName, &nentry);
utils::Check(files != NULL, "error when ListDirectory %s", info->mName);
for (int i = 0; i < nentry; ++i) { for (int i = 0; i < nentry; ++i) {
if (files[i].mKind == 'F') { if (files[i].mKind == 'F') {
fsize.push_back(files[i].mSize); fsize.push_back(files[i].mSize);

View File

@ -6,9 +6,10 @@ then
fi fi
# put the local training file to HDFS # put the local training file to HDFS
hadoop fs -rm -r -f $2/data #hadoop fs -rm -r -f $2/data
hadoop fs -rm -r -f $2/mushroom.linear.model hadoop fs -rm -r -f $2/mushroom.linear.model
hadoop fs -mkdir $2/data #hadoop fs -mkdir $2/data
hadoop fs -put ../data/agaricus.txt.train $2/data
# submit to hadoop # submit to hadoop
../../tracker/rabit_yarn.py -n $1 --vcores 1 linear.rabit hdfs://$2/data/agaricus.txt.train model_out=hdfs://$2/mushroom.linear.model "${*:3}" ../../tracker/rabit_yarn.py -n $1 --vcores 1 linear.rabit hdfs://$2/data/agaricus.txt.train model_out=hdfs://$2/mushroom.linear.model "${*:3}"

View File

@ -21,7 +21,7 @@ if not os.path.exists(YARN_JAR_PATH):
subprocess.check_call(cmd, shell = True, env = os.environ) subprocess.check_call(cmd, shell = True, env = os.environ)
assert os.path.exists(YARN_JAR_PATH), "failed to build rabit-yarn.jar, try it manually" assert os.path.exists(YARN_JAR_PATH), "failed to build rabit-yarn.jar, try it manually"
hadoop_binary = 'hadoop' hadoop_binary = None
# code # code
hadoop_home = os.getenv('HADOOP_HOME') hadoop_home = os.getenv('HADOOP_HOME')

View File

@ -1,5 +1,6 @@
package org.apache.hadoop.yarn.rabit; package org.apache.hadoop.yarn.rabit;
import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.List; import java.util.List;
@ -7,6 +8,7 @@ import java.util.Map;
import java.util.Queue; import java.util.Queue;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -161,7 +163,8 @@ public class ApplicationMaster {
numVCores = this.getEnvInteger("rabit_cpu_vcores", true, numVCores); numVCores = this.getEnvInteger("rabit_cpu_vcores", true, numVCores);
numMemoryMB = this.getEnvInteger("rabit_memory_mb", true, numMemoryMB); numMemoryMB = this.getEnvInteger("rabit_memory_mb", true, numMemoryMB);
numTasks = this.getEnvInteger("rabit_world_size", true, numTasks); numTasks = this.getEnvInteger("rabit_world_size", true, numTasks);
maxNumAttempt = this.getEnvInteger("rabit_max_attempt", false, maxNumAttempt); maxNumAttempt = this.getEnvInteger("rabit_max_attempt", false,
maxNumAttempt);
} }
/** /**
@ -180,7 +183,7 @@ public class ApplicationMaster {
RegisterApplicationMasterResponse response = this.rmClient RegisterApplicationMasterResponse response = this.rmClient
.registerApplicationMaster(this.appHostName, .registerApplicationMaster(this.appHostName,
this.appTrackerPort, this.appTrackerUrl); this.appTrackerPort, this.appTrackerUrl);
boolean success = false; boolean success = false;
String diagnostics = ""; String diagnostics = "";
try { try {
@ -225,7 +228,8 @@ public class ApplicationMaster {
success ? FinalApplicationStatus.SUCCEEDED success ? FinalApplicationStatus.SUCCEEDED
: FinalApplicationStatus.FAILED, diagnostics, : FinalApplicationStatus.FAILED, diagnostics,
appTrackerUrl); appTrackerUrl);
if (!success) throw new Exception("Application not successful"); if (!success)
throw new Exception("Application not successful");
} }
/** /**
@ -270,17 +274,10 @@ public class ApplicationMaster {
task.containerRequest = null; task.containerRequest = null;
ContainerLaunchContext ctx = Records ContainerLaunchContext ctx = Records
.newRecord(ContainerLaunchContext.class); .newRecord(ContainerLaunchContext.class);
String hadoop = "hadoop"; String cmd =
if (System.getenv("HADOOP_HOME") != null) { // use this to setup CLASSPATH correctly for libhdfs
hadoop = "${HADOOP_HOME}/bin/hadoop"; // "CLASSPATH=${CLASSPATH}:`" + hadoop + " classpath --glob` "
} else if (System.getenv("HADOOP_PREFIX") != null) { this.command + " 1>"
hadoop = "${HADOOP_PREFIX}/bin/hadoop";
}
String cmd =
// use this to setup CLASSPATH correctly for libhdfs
"CLASSPATH=${CLASSPATH}:`" + hadoop + " classpath --glob` "
+ this.command + " 1>"
+ ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout"
+ " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR
+ "/stderr"; + "/stderr";
@ -289,17 +286,45 @@ public class ApplicationMaster {
ctx.setLocalResources(this.workerResources); ctx.setLocalResources(this.workerResources);
// setup environment variables // setup environment variables
Map<String, String> env = new java.util.HashMap<String, String>(); Map<String, String> env = new java.util.HashMap<String, String>();
// setup class path, this is kind of duplicated, ignoring // setup class path, this is kind of duplicated, ignoring
StringBuilder cpath = new StringBuilder("${CLASSPATH}:./*"); StringBuilder cpath = new StringBuilder("${CLASSPATH}:./*");
for (String c : conf.getStrings( for (String c : conf.getStrings(
YarnConfiguration.YARN_APPLICATION_CLASSPATH, YarnConfiguration.YARN_APPLICATION_CLASSPATH,
YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) { YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
cpath.append(':'); String[] arrPath = c.split(":");
cpath.append(c.trim()); for (String ps : arrPath) {
if (ps.endsWith("*.jar") || ps.endsWith("*")) {
ps = ps.substring(0, ps.lastIndexOf('*'));
String prefix = ps.substring(0, ps.lastIndexOf('/'));
if (ps.startsWith("$")) {
String[] arr =ps.split("/", 2);
if (arr.length != 2) continue;
try {
ps = System.getenv(arr[0].substring(1)) + '/' + arr[1];
} catch (Exception e){
continue;
}
}
File dir = new File(ps);
if (dir.isDirectory()) {
for (File f: dir.listFiles()) {
if (f.isFile() && f.getPath().endsWith(".jar")) {
cpath.append(":");
cpath.append(prefix + '/' + f.getName());
}
}
}
} else {
cpath.append(':');
cpath.append(ps.trim());
}
}
} }
// already use hadoop command to get class path in worker, maybe a better solution in future // already use hadoop command to get class path in worker, maybe a
// env.put("CLASSPATH", cpath.toString()); // better solution in future
env.put("CLASSPATH", cpath.toString());
//LOG.info("CLASSPATH =" + cpath.toString());
// setup LD_LIBARY_PATH path for libhdfs // setup LD_LIBARY_PATH path for libhdfs
env.put("LD_LIBRARY_PATH", env.put("LD_LIBRARY_PATH",
"${LD_LIBRARY_PATH}:$HADOOP_HDFS_HOME/lib/native:$JAVA_HOME/jre/lib/amd64/server"); "${LD_LIBRARY_PATH}:$HADOOP_HDFS_HOME/lib/native:$JAVA_HOME/jre/lib/amd64/server");
@ -312,7 +337,7 @@ public class ApplicationMaster {
} }
env.put("rabit_task_id", String.valueOf(task.taskId)); env.put("rabit_task_id", String.valueOf(task.taskId));
env.put("rabit_num_trial", String.valueOf(task.attemptCounter)); env.put("rabit_num_trial", String.valueOf(task.attemptCounter));
ctx.setEnvironment(env); ctx.setEnvironment(env);
synchronized (this) { synchronized (this) {
assert (!this.runningTasks.containsKey(container.getId())); assert (!this.runningTasks.containsKey(container.getId()));
@ -390,9 +415,14 @@ public class ApplicationMaster {
if (r == null) { if (r == null) {
continue; continue;
} }
LOG.info("Task " + r.taskId + "failed on " + r.container.getId() + ". See LOG at : " + LOG.info("Task "
String.format("http://%s/node/containerlogs/%s/" + userName, + r.taskId
r.container.getNodeHttpAddress(), r.container.getId())); + "failed on "
+ r.container.getId()
+ ". See LOG at : "
+ String.format("http://%s/node/containerlogs/%s/"
+ userName, r.container.getNodeHttpAddress(),
r.container.getId()));
r.attemptCounter += 1; r.attemptCounter += 1;
r.container = null; r.container = null;
tasks.add(r); tasks.add(r);
@ -426,22 +456,26 @@ public class ApplicationMaster {
finishedTasks.add(r); finishedTasks.add(r);
runningTasks.remove(s.getContainerId()); runningTasks.remove(s.getContainerId());
} else { } else {
switch (exstatus) { try {
case ContainerExitStatus.KILLED_EXCEEDED_PMEM: if (exstatus == ContainerExitStatus.class.getField(
this.abortJob("[Rabit] Task " "KILLED_EXCEEDED_PMEM").getInt(null)) {
+ r.taskId this.abortJob("[Rabit] Task "
+ " killed because of exceeding allocated physical memory"); + r.taskId
break; + " killed because of exceeding allocated physical memory");
case ContainerExitStatus.KILLED_EXCEEDED_VMEM: continue;
this.abortJob("[Rabit] Task " }
+ r.taskId if (exstatus == ContainerExitStatus.class.getField(
+ " killed because of exceeding allocated virtual memory"); "KILLED_EXCEEDED_VMEM").getInt(null)) {
break; this.abortJob("[Rabit] Task "
default: + r.taskId
LOG.info("[Rabit] Task " + r.taskId + " killed because of exceeding allocated virtual memory");
+ " exited with status " + exstatus); continue;
failed.add(s.getContainerId()); }
} catch (Exception e) {
} }
LOG.info("[Rabit] Task " + r.taskId + " exited with status "
+ exstatus);
failed.add(s.getContainerId());
} }
} }
this.handleFailure(failed); this.handleFailure(failed);

View File

@ -50,6 +50,7 @@ public class Client {
* @throws IOException * @throws IOException
*/ */
private Client() throws IOException { private Client() throws IOException {
conf.addResource(new Path(System.getenv("HADOOP_CONF_DIR") +"/core-site.xml"));
dfs = FileSystem.get(conf); dfs = FileSystem.get(conf);
} }