Merge commit '68c2aaa7fe8c1f4688cef2ace67642e85fd1c9d2'

This commit is contained in:
tqchen 2015-03-27 11:09:38 -07:00
commit 135d461c40
2 changed files with 34 additions and 12 deletions

View File

@ -15,6 +15,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
@ -36,6 +37,7 @@ import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.client.api.async.NMClientAsync; import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
/** /**
@ -66,6 +68,10 @@ public class ApplicationMaster {
// username // username
private String userName = ""; private String userName = "";
// user credentials
private Credentials credentials = null;
// security tokens
private ByteBuffer securityTokens = null;
// application tracker hostname // application tracker hostname
private String appHostName = ""; private String appHostName = "";
// tracker URL to do // tracker URL to do
@ -99,8 +105,12 @@ public class ApplicationMaster {
private ApplicationMaster() throws IOException { private ApplicationMaster() throws IOException {
dfs = FileSystem.get(conf); dfs = FileSystem.get(conf);
userName = UserGroupInformation.getCurrentUser().getShortUserName();
credentials = UserGroupInformation.getCurrentUser().getCredentials();
DataOutputBuffer buffer = new DataOutputBuffer();
this.credentials.writeTokenStorageToStream(buffer);
this.securityTokens = ByteBuffer.wrap(buffer.getData());
} }
/** /**
* get integer argument from environment variable * get integer argument from environment variable
* *
@ -132,7 +142,7 @@ public class ApplicationMaster {
* @param args * @param args
*/ */
private void initArgs(String args[]) throws IOException { private void initArgs(String args[]) throws IOException {
LOG.info("Invoke initArgs"); LOG.info("Start AM as user=" + this.userName);
// get user name // get user name
userName = UserGroupInformation.getCurrentUser().getShortUserName(); userName = UserGroupInformation.getCurrentUser().getShortUserName();
// cached maps // cached maps
@ -183,7 +193,7 @@ public class ApplicationMaster {
RegisterApplicationMasterResponse response = this.rmClient RegisterApplicationMasterResponse response = this.rmClient
.registerApplicationMaster(this.appHostName, .registerApplicationMaster(this.appHostName,
this.appTrackerPort, this.appTrackerUrl); this.appTrackerPort, this.appTrackerUrl);
boolean success = false; boolean success = false;
String diagnostics = ""; String diagnostics = "";
try { try {
@ -274,13 +284,6 @@ public class ApplicationMaster {
task.containerRequest = null; task.containerRequest = null;
ContainerLaunchContext ctx = Records ContainerLaunchContext ctx = Records
.newRecord(ContainerLaunchContext.class); .newRecord(ContainerLaunchContext.class);
String hadoop = "hadoop";
if (System.getenv("HADOOP_HOME") != null) {
hadoop = "${HADOOP_HOME}/bin/hadoop";
} else if (System.getenv("HADOOP_PREFIX") != null) {
hadoop = "${HADOOP_PREFIX}/bin/hadoop";
}
String cmd = String cmd =
// use this to setup CLASSPATH correctly for libhdfs // use this to setup CLASSPATH correctly for libhdfs
this.command + " 1>" this.command + " 1>"
@ -288,6 +291,7 @@ public class ApplicationMaster {
+ " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR
+ "/stderr"; + "/stderr";
ctx.setCommands(Collections.singletonList(cmd)); ctx.setCommands(Collections.singletonList(cmd));
ctx.setTokens(this.securityTokens);
LOG.info(workerResources); LOG.info(workerResources);
ctx.setLocalResources(this.workerResources); ctx.setLocalResources(this.workerResources);
// setup environment variables // setup environment variables

View File

@ -1,5 +1,6 @@
package org.apache.hadoop.yarn.rabit; package org.apache.hadoop.yarn.rabit;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections; import java.util.Collections;
import java.util.Map; import java.util.Map;
@ -9,7 +10,9 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationReport;
@ -47,6 +50,8 @@ public class Client {
private String tempdir = "/tmp"; private String tempdir = "/tmp";
// user name // user name
private String userName = ""; private String userName = "";
// user credentials
private Credentials credentials = null;
// job name // job name
private String jobName = ""; private String jobName = "";
// queue // queue
@ -60,10 +65,22 @@ public class Client {
conf.addResource(new Path(System.getenv("HADOOP_CONF_DIR") +"/hdfs-site.xml")); conf.addResource(new Path(System.getenv("HADOOP_CONF_DIR") +"/hdfs-site.xml"));
dfs = FileSystem.get(conf); dfs = FileSystem.get(conf);
userName = UserGroupInformation.getCurrentUser().getShortUserName(); userName = UserGroupInformation.getCurrentUser().getShortUserName();
credentials = UserGroupInformation.getCurrentUser().getCredentials();
} }
/** /**
* ge * setup security token given current user
* @return the ByeBuffer containing the security tokens
* @throws IOException
*/
private ByteBuffer setupTokens() throws IOException {
DataOutputBuffer buffer = new DataOutputBuffer();
this.credentials.writeTokenStorageToStream(buffer);
return ByteBuffer.wrap(buffer.getData());
}
/**
* setup all the cached files
* *
* @param fmaps * @param fmaps
* the file maps * the file maps
@ -194,9 +211,10 @@ public class Client {
.newRecord(ContainerLaunchContext.class); .newRecord(ContainerLaunchContext.class);
ApplicationSubmissionContext appContext = app ApplicationSubmissionContext appContext = app
.getApplicationSubmissionContext(); .getApplicationSubmissionContext();
// Submit application // Submit application
ApplicationId appId = appContext.getApplicationId(); ApplicationId appId = appContext.getApplicationId();
// setup security token
amContainer.setTokens(this.setupTokens());
// setup cache-files and environment variables // setup cache-files and environment variables
amContainer.setLocalResources(this.setupCacheFiles(appId)); amContainer.setLocalResources(this.setupCacheFiles(appId));
amContainer.setEnvironment(this.getEnvironment()); amContainer.setEnvironment(this.getEnvironment());