From 9b66e7edf237ada0469ea9fe6ce383cb300c1c2d Mon Sep 17 00:00:00 2001 From: tqchen Date: Thu, 12 Mar 2015 20:57:49 -0700 Subject: [PATCH] fix hadoop --- rabit-learn/io/hdfs-inl.h | 3 + rabit-learn/linear/run-yarn.sh | 5 +- tracker/rabit_yarn.py | 2 +- .../hadoop/yarn/rabit/ApplicationMaster.java | 110 ++++++++++++------ .../org/apache/hadoop/yarn/rabit/Client.java | 1 + 5 files changed, 80 insertions(+), 41 deletions(-) diff --git a/rabit-learn/io/hdfs-inl.h b/rabit-learn/io/hdfs-inl.h index 7b4abae73..a4ec4b253 100644 --- a/rabit-learn/io/hdfs-inl.h +++ b/rabit-learn/io/hdfs-inl.h @@ -97,15 +97,18 @@ class HDFSSplit : public LineSplitBase { public: explicit HDFSSplit(const char *uri, unsigned rank, unsigned nsplit) { fs_ = hdfsConnect("default", 0); + utils::Check(fs_ != NULL, "error when connecting to default HDFS"); std::vector paths; LineSplitBase::SplitNames(&paths, uri, "#"); // get the files std::vector fsize; for (size_t i = 0; i < paths.size(); ++i) { hdfsFileInfo *info = hdfsGetPathInfo(fs_, paths[i].c_str()); + utils::Check(info != NULL, "path %s do not exist", paths[i].c_str()); if (info->mKind == 'D') { int nentry; hdfsFileInfo *files = hdfsListDirectory(fs_, info->mName, &nentry); + utils::Check(files != NULL, "error when ListDirectory %s", info->mName); for (int i = 0; i < nentry; ++i) { if (files[i].mKind == 'F') { fsize.push_back(files[i].mSize); diff --git a/rabit-learn/linear/run-yarn.sh b/rabit-learn/linear/run-yarn.sh index e7ba873a4..419c662b5 100755 --- a/rabit-learn/linear/run-yarn.sh +++ b/rabit-learn/linear/run-yarn.sh @@ -6,9 +6,10 @@ then fi # put the local training file to HDFS -hadoop fs -rm -r -f $2/data +#hadoop fs -rm -r -f $2/data hadoop fs -rm -r -f $2/mushroom.linear.model -hadoop fs -mkdir $2/data +#hadoop fs -mkdir $2/data +hadoop fs -put ../data/agaricus.txt.train $2/data # submit to hadoop ../../tracker/rabit_yarn.py -n $1 --vcores 1 linear.rabit hdfs://$2/data/agaricus.txt.train model_out=hdfs://$2/mushroom.linear.model "${*:3}" diff --git a/tracker/rabit_yarn.py b/tracker/rabit_yarn.py index 6b471c4b3..a921b4f86 100755 --- a/tracker/rabit_yarn.py +++ b/tracker/rabit_yarn.py @@ -21,7 +21,7 @@ if not os.path.exists(YARN_JAR_PATH): subprocess.check_call(cmd, shell = True, env = os.environ) assert os.path.exists(YARN_JAR_PATH), "failed to build rabit-yarn.jar, try it manually" -hadoop_binary = 'hadoop' +hadoop_binary = None # code hadoop_home = os.getenv('HADOOP_HOME') diff --git a/yarn/src/org/apache/hadoop/yarn/rabit/ApplicationMaster.java b/yarn/src/org/apache/hadoop/yarn/rabit/ApplicationMaster.java index 64dec550c..9b7885c56 100644 --- a/yarn/src/org/apache/hadoop/yarn/rabit/ApplicationMaster.java +++ b/yarn/src/org/apache/hadoop/yarn/rabit/ApplicationMaster.java @@ -1,5 +1,6 @@ package org.apache.hadoop.yarn.rabit; +import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; import java.util.List; @@ -7,6 +8,7 @@ import java.util.Map; import java.util.Queue; import java.util.Collection; import java.util.Collections; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -161,7 +163,8 @@ public class ApplicationMaster { numVCores = this.getEnvInteger("rabit_cpu_vcores", true, numVCores); numMemoryMB = this.getEnvInteger("rabit_memory_mb", true, numMemoryMB); numTasks = this.getEnvInteger("rabit_world_size", true, numTasks); - maxNumAttempt = this.getEnvInteger("rabit_max_attempt", false, maxNumAttempt); + maxNumAttempt = this.getEnvInteger("rabit_max_attempt", false, + maxNumAttempt); } /** @@ -180,7 +183,7 @@ public class ApplicationMaster { RegisterApplicationMasterResponse response = this.rmClient .registerApplicationMaster(this.appHostName, this.appTrackerPort, this.appTrackerUrl); - + boolean success = false; String diagnostics = ""; try { @@ -225,7 +228,8 @@ public class ApplicationMaster { success ? FinalApplicationStatus.SUCCEEDED : FinalApplicationStatus.FAILED, diagnostics, appTrackerUrl); - if (!success) throw new Exception("Application not successful"); + if (!success) + throw new Exception("Application not successful"); } /** @@ -270,17 +274,10 @@ public class ApplicationMaster { task.containerRequest = null; ContainerLaunchContext ctx = Records .newRecord(ContainerLaunchContext.class); - String hadoop = "hadoop"; - if (System.getenv("HADOOP_HOME") != null) { - hadoop = "${HADOOP_HOME}/bin/hadoop"; - } else if (System.getenv("HADOOP_PREFIX") != null) { - hadoop = "${HADOOP_PREFIX}/bin/hadoop"; - } - - String cmd = - // use this to setup CLASSPATH correctly for libhdfs - "CLASSPATH=${CLASSPATH}:`" + hadoop + " classpath --glob` " - + this.command + " 1>" + String cmd = + // use this to setup CLASSPATH correctly for libhdfs + // "CLASSPATH=${CLASSPATH}:`" + hadoop + " classpath --glob` " + this.command + " 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" + " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr"; @@ -289,17 +286,45 @@ public class ApplicationMaster { ctx.setLocalResources(this.workerResources); // setup environment variables Map env = new java.util.HashMap(); - + // setup class path, this is kind of duplicated, ignoring StringBuilder cpath = new StringBuilder("${CLASSPATH}:./*"); for (String c : conf.getStrings( YarnConfiguration.YARN_APPLICATION_CLASSPATH, YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) { - cpath.append(':'); - cpath.append(c.trim()); + String[] arrPath = c.split(":"); + for (String ps : arrPath) { + if (ps.endsWith("*.jar") || ps.endsWith("*")) { + ps = ps.substring(0, ps.lastIndexOf('*')); + String prefix = ps.substring(0, ps.lastIndexOf('/')); + if (ps.startsWith("$")) { + String[] arr =ps.split("/", 2); + if (arr.length != 2) continue; + try { + ps = System.getenv(arr[0].substring(1)) + '/' + arr[1]; + } catch (Exception e){ + continue; + } + } + File dir = new File(ps); + if (dir.isDirectory()) { + for (File f: dir.listFiles()) { + if (f.isFile() && f.getPath().endsWith(".jar")) { + cpath.append(":"); + cpath.append(prefix + '/' + f.getName()); + } + } + } + } else { + cpath.append(':'); + cpath.append(ps.trim()); + } + } } - // already use hadoop command to get class path in worker, maybe a better solution in future - // env.put("CLASSPATH", cpath.toString()); + // already use hadoop command to get class path in worker, maybe a + // better solution in future + env.put("CLASSPATH", cpath.toString()); + //LOG.info("CLASSPATH =" + cpath.toString()); // setup LD_LIBARY_PATH path for libhdfs env.put("LD_LIBRARY_PATH", "${LD_LIBRARY_PATH}:$HADOOP_HDFS_HOME/lib/native:$JAVA_HOME/jre/lib/amd64/server"); @@ -312,7 +337,7 @@ public class ApplicationMaster { } env.put("rabit_task_id", String.valueOf(task.taskId)); env.put("rabit_num_trial", String.valueOf(task.attemptCounter)); - + ctx.setEnvironment(env); synchronized (this) { assert (!this.runningTasks.containsKey(container.getId())); @@ -390,9 +415,14 @@ public class ApplicationMaster { if (r == null) { continue; } - LOG.info("Task " + r.taskId + "failed on " + r.container.getId() + ". See LOG at : " + - String.format("http://%s/node/containerlogs/%s/" + userName, - r.container.getNodeHttpAddress(), r.container.getId())); + LOG.info("Task " + + r.taskId + + "failed on " + + r.container.getId() + + ". See LOG at : " + + String.format("http://%s/node/containerlogs/%s/" + + userName, r.container.getNodeHttpAddress(), + r.container.getId())); r.attemptCounter += 1; r.container = null; tasks.add(r); @@ -426,22 +456,26 @@ public class ApplicationMaster { finishedTasks.add(r); runningTasks.remove(s.getContainerId()); } else { - switch (exstatus) { - case ContainerExitStatus.KILLED_EXCEEDED_PMEM: - this.abortJob("[Rabit] Task " - + r.taskId - + " killed because of exceeding allocated physical memory"); - break; - case ContainerExitStatus.KILLED_EXCEEDED_VMEM: - this.abortJob("[Rabit] Task " - + r.taskId - + " killed because of exceeding allocated virtual memory"); - break; - default: - LOG.info("[Rabit] Task " + r.taskId - + " exited with status " + exstatus); - failed.add(s.getContainerId()); + try { + if (exstatus == ContainerExitStatus.class.getField( + "KILLED_EXCEEDED_PMEM").getInt(null)) { + this.abortJob("[Rabit] Task " + + r.taskId + + " killed because of exceeding allocated physical memory"); + continue; + } + if (exstatus == ContainerExitStatus.class.getField( + "KILLED_EXCEEDED_VMEM").getInt(null)) { + this.abortJob("[Rabit] Task " + + r.taskId + + " killed because of exceeding allocated virtual memory"); + continue; + } + } catch (Exception e) { } + LOG.info("[Rabit] Task " + r.taskId + " exited with status " + + exstatus); + failed.add(s.getContainerId()); } } this.handleFailure(failed); diff --git a/yarn/src/org/apache/hadoop/yarn/rabit/Client.java b/yarn/src/org/apache/hadoop/yarn/rabit/Client.java index 7edf58d08..423e11dd3 100644 --- a/yarn/src/org/apache/hadoop/yarn/rabit/Client.java +++ b/yarn/src/org/apache/hadoop/yarn/rabit/Client.java @@ -50,6 +50,7 @@ public class Client { * @throws IOException */ private Client() throws IOException { + conf.addResource(new Path(System.getenv("HADOOP_CONF_DIR") +"/core-site.xml")); dfs = FileSystem.get(conf); }