Author: sseth
Date: Fri Sep 21 18:11:35 2012
New Revision: 1388596
URL: http://svn.apache.org/viewvc?rev=1388596&view=rev
Log:
MAPREDUCE-4663. Container Launch should be independent of o.a.h.m.Task (sseth)
Added:
hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerHelpers.java
Modified:
hadoop/common/branches/MR-3902/hadoop-mapreduce-project/CHANGES.txt.MR-3902
hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM2.java
hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapred/YarnChild2.java
hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/MRAppMaster.java
hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TaskAttemptImplHelpers.java
hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/recover/RecoveryService.java
hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerAllocator.java
hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerImpl.java
hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerLaunchRequestEvent.java
hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerMap.java
hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/MRApp.java
hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/rm/TestRMContainerAllocator.java
Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/CHANGES.txt.MR-3902
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/CHANGES.txt.MR-3902?rev=1388596&r1=1388595&r2=1388596&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/CHANGES.txt.MR-3902 (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/CHANGES.txt.MR-3902 Fri Sep 21 18:11:35 2012
@@ -24,3 +24,5 @@ Branch MR-3902
MAPREDUCE-4665. Use the configured shuffle port and application ACLs (sseth)
MAPREDUCE-4664. ContainerHeartbeatHandler should be pinged on a getTask call (sseth)
+
+ MAPREDUCE-4663. Container Launch should be independent of o.a.h.m.Task (sseth)
Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM2.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM2.java?rev=1388596&r1=1388595&r2=1388596&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM2.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM2.java Fri Sep 21 18:11:35 2012
@@ -28,6 +28,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.TaskLog.LogName;
import org.apache.hadoop.mapreduce.ID;
import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
@@ -65,13 +66,11 @@ public class MapReduceChildJVM2 {
}
}
- public static void setVMEnv(Map<String, String> environment,
- Task task) {
-
- JobConf conf = task.conf;
+ public static void setVMEnv(Map<String, String> environment, JobConf conf,
+ TaskType taskType) {
// Add the env variables passed by the user
- String mapredChildEnv = getChildEnv(conf, task.isMapTask());
+ String mapredChildEnv = getChildEnv(conf, taskType == TaskType.MAP);
Apps.setEnvFromInputString(environment, mapredChildEnv);
// Set logging level in the environment.
@@ -79,7 +78,7 @@ public class MapReduceChildJVM2 {
// streaming) it will have the correct loglevel.
environment.put(
"HADOOP_ROOT_LOGGER",
- getChildLogLevel(conf, task.isMapTask()) + ",CLA");
+ getChildLogLevel(conf, taskType == TaskType.MAP) + ",CLA");
// TODO: The following is useful for instance in streaming tasks. Should be
// set in ApplicationMaster's env by the RM.
@@ -93,7 +92,7 @@ public class MapReduceChildJVM2 {
// properties.
long logSize = TaskLog.getTaskLogLength(conf);
Vector<String> logProps = new Vector<String>(4);
- setupLog4jProperties(task, logProps, logSize);
+ setupLog4jProperties(conf, taskType, logProps, logSize);
Iterator<String> it = logProps.iterator();
StringBuffer buffer = new StringBuffer();
while (it.hasNext()) {
@@ -148,21 +147,19 @@ public class MapReduceChildJVM2 {
return adminClasspath + " " + userClasspath;
}
- private static void setupLog4jProperties(Task task,
- Vector<String> vargs,
- long logSize) {
- String logLevel = getChildLogLevel(task.conf, task.isMapTask());
+ private static void setupLog4jProperties(JobConf conf, TaskType taskType,
+ Vector<String> vargs, long logSize) {
+ String logLevel = getChildLogLevel(conf, taskType == TaskType.MAP);
MRApps.addLog4jSystemProperties(logLevel, logSize, vargs);
}
public static List<String> getVMCommand(
- InetSocketAddress taskAttemptListenerAddr, Task task,
- ID jvmID) {
+ InetSocketAddress taskAttemptListenerAddr, JobConf conf, TaskType taskType,
+ ID jvmID, JobID jobID, boolean shouldProfile) {
- TaskAttemptID attemptID = task.getTaskID();
- JobConf conf = task.conf;
+ // TaskAttemptID attemptID = task.getTaskID();
- Vector<String> vargs = new Vector<String>(8);
+ Vector<String> vargs = new Vector<String>(9);
vargs.add("exec");
vargs.add(Environment.JAVA_HOME.$() + "/bin/java");
@@ -195,8 +192,9 @@ public class MapReduceChildJVM2 {
// </value>
// </property>
//
- String javaOpts = getChildJavaOpts(conf, task.isMapTask());
- javaOpts = javaOpts.replace("@taskid@", attemptID.toString());
+ String javaOpts = getChildJavaOpts(conf, taskType == TaskType.MAP);
+ // Broken by the AM re-factor. TaskID is not JVM specific.
+ // javaOpts = javaOpts.replace("@taskid@", attemptID.toString());
String [] javaOptsSplit = javaOpts.split(" ");
for (int i = 0; i < javaOptsSplit.length; i++) {
vargs.add(javaOptsSplit[i]);
@@ -208,24 +206,16 @@ public class MapReduceChildJVM2 {
// Setup the log4j prop
long logSize = TaskLog.getTaskLogLength(conf);
- setupLog4jProperties(task, vargs, logSize);
+ setupLog4jProperties(conf, taskType, vargs, logSize);
- if (conf.getProfileEnabled()) {
- if (conf.getProfileTaskRange(task.isMapTask()
- ).isIncluded(task.getPartition())) {
- vargs.add(
- String.format(
- conf.getProfileParams(),
- getTaskLogFile(TaskLog.LogName.PROFILE)
- )
- );
- if (task.isMapTask()) {
- vargs.add(conf.get(MRJobConfig.TASK_MAP_PROFILE_PARAMS, ""));
- }
- else {
- vargs.add(conf.get(MRJobConfig.TASK_REDUCE_PROFILE_PARAMS, ""));
- }
-
+ // Decision to profile needs to be made in the scheduler.
+ if (shouldProfile) {
+ vargs.add(String.format(conf.getProfileParams(),
+ getTaskLogFile(TaskLog.LogName.PROFILE)));
+ if (taskType == TaskType.MAP) {
+ vargs.add(conf.get(MRJobConfig.TASK_MAP_PROFILE_PARAMS, ""));
+ } else {
+ vargs.add(conf.get(MRJobConfig.TASK_REDUCE_PROFILE_PARAMS, ""));
}
}
@@ -233,8 +223,10 @@ public class MapReduceChildJVM2 {
vargs.add(YarnChild2.class.getName()); // main of Child
// pass TaskAttemptListener's address
vargs.add(taskAttemptListenerAddr.getAddress().getHostAddress());
- vargs.add(Integer.toString(taskAttemptListenerAddr.getPort()));
- vargs.add(attemptID.toString()); // pass task identifier
+ vargs.add(Integer.toString(taskAttemptListenerAddr.getPort()));
+ // Set the job id, and task type.
+ vargs.add(jobID.toString());
+ vargs.add(taskType.toString());
// Finally add the jvmID
vargs.add(String.valueOf(jvmID.getId()));
Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapred/YarnChild2.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapred/YarnChild2.java?rev=1388596&r1=1388595&r2=1388596&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapred/YarnChild2.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapred/YarnChild2.java Fri Sep 21 18:11:35 2012
@@ -39,7 +39,6 @@ import org.apache.hadoop.fs.LocalDirAllo
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.MRJobConfig;
@@ -70,6 +69,10 @@ class YarnChild2 {
static volatile TaskAttemptID taskid = null;
+ /*
+ * Expected arguments args[0] - host, args[1] - port, args[2] - JobId, args[3]
+ * - TaskType, args[4] - jvm integer id, rest are log redirects etc.
+ */
public static void main(String[] args) throws Throwable {
LOG.info("XXX: Child starting");
@@ -81,20 +84,22 @@ class YarnChild2 {
int port = Integer.parseInt(args[1]);
final InetSocketAddress address =
NetUtils.createSocketAddrForHost(host, port);
- final TaskAttemptID firstTaskid = TaskAttemptID.forName(args[2]);
- int jvmIdInt = Integer.parseInt(args[3]);
- JVMId jvmId = new JVMId(firstTaskid.getJobID(),
- firstTaskid.getTaskType() == TaskType.MAP, jvmIdInt);
+
+ final JobID jobID = JobID.forName(args[2]);
+ final TaskType taskType = TaskType.valueOf(args[3]);
+
+ final int jvmIdInt = Integer.parseInt(args[4]);
+ JVMId jvmId = new JVMId(jobID, taskType == TaskType.MAP, jvmIdInt);
// initialize metrics
DefaultMetricsSystem.initialize(
- StringUtils.camelize(firstTaskid.getTaskType().name()) +"Task");
+ StringUtils.camelize(taskType.name()) +"Task");
Token<JobTokenIdentifier> jt = loadCredentials(defaultConf, address);
// Create TaskUmbilicalProtocol as actual task owner.
UserGroupInformation taskOwner =
- UserGroupInformation.createRemoteUser(firstTaskid.getJobID().toString());
+ UserGroupInformation.createRemoteUser(jobID.toString());
taskOwner.addToken(jt);
final TaskUmbilicalProtocol umbilical =
taskOwner.doAs(new PrivilegedExceptionAction<TaskUmbilicalProtocol>() {
Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/MRAppMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/MRAppMaster.java?rev=1388596&r1=1388595&r2=1388596&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/MRAppMaster.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/MRAppMaster.java Fri Sep 21 18:11:35 2012
@@ -270,10 +270,12 @@ public class MRAppMaster extends Composi
addIfService(containerHeartbeatHandler);
//service to handle requests to TaskUmbilicalProtocol
- taskAttemptListener = createTaskAttemptListener(context, taskHeartbeatHandler, containerHeartbeatHandler);
+ taskAttemptListener = createTaskAttemptListener(context,
+ taskHeartbeatHandler, containerHeartbeatHandler);
addIfService(taskAttemptListener);
- containers = new AMContainerMap(containerHeartbeatHandler, taskAttemptListener, dispatcher.getEventHandler(), context);
+ containers = new AMContainerMap(containerHeartbeatHandler,
+ taskAttemptListener, context);
addIfService(containers);
dispatcher.register(AMContainerEventType.class, containers);
Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TaskAttemptImplHelpers.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TaskAttemptImplHelpers.java?rev=1388596&r1=1388595&r2=1388596&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TaskAttemptImplHelpers.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TaskAttemptImplHelpers.java Fri Sep 21 18:11:35 2012
@@ -1,271 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.hadoop.mapreduce.v2.app2.job.impl;
-import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.mapred.MapReduceChildJVM2;
-import org.apache.hadoop.mapred.ShuffleHandler;
-import org.apache.hadoop.mapred.Task;
-import org.apache.hadoop.mapred.WrappedJvmID;
-import org.apache.hadoop.mapreduce.MRJobConfig;
-import org.apache.hadoop.mapreduce.security.TokenCache;
-import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
-import org.apache.hadoop.mapreduce.v2.app2.TaskAttemptListener;
-import org.apache.hadoop.mapreduce.v2.util.MRApps;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.yarn.YarnException;
-import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
-import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.api.records.LocalResourceType;
-import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.URL;
-import org.apache.hadoop.yarn.util.Apps;
-import org.apache.hadoop.yarn.util.BuilderUtils;
-import org.apache.hadoop.yarn.util.ConverterUtils;
public class TaskAttemptImplHelpers {
private static final Log LOG = LogFactory.getLog(TaskAttemptImplHelpers.class);
- private static Object commonContainerSpecLock = new Object();
- private static ContainerLaunchContext commonContainerSpec = null;
- private static final Object classpathLock = new Object();
- private static AtomicBoolean initialClasspathFlag = new AtomicBoolean();
- private static String initialClasspath = null;
-
- /**
- * Create the common {@link ContainerLaunchContext} for all attempts.
- * @param applicationACLs
- */
- private static ContainerLaunchContext createCommonContainerLaunchContext(
- Map<ApplicationAccessType, String> applicationACLs, Configuration conf,
- Token<JobTokenIdentifier> jobToken,
- final org.apache.hadoop.mapred.JobID oldJobId,
- Credentials credentials) {
-
- // Application resources
- Map<String, LocalResource> localResources =
- new HashMap<String, LocalResource>();
-
- // Application environment
- Map<String, String> environment = new HashMap<String, String>();
-
- // Service data
- Map<String, ByteBuffer> serviceData = new HashMap<String, ByteBuffer>();
-
- // Tokens
- ByteBuffer taskCredentialsBuffer = ByteBuffer.wrap(new byte[]{});
- try {
- FileSystem remoteFS = FileSystem.get(conf);
-
- // //////////// Set up JobJar to be localized properly on the remote NM.
- String jobJar = conf.get(MRJobConfig.JAR);
- if (jobJar != null) {
- Path remoteJobJar = (new Path(jobJar)).makeQualified(remoteFS
- .getUri(), remoteFS.getWorkingDirectory());
- localResources.put(
- MRJobConfig.JOB_JAR,
- createLocalResource(remoteFS, remoteJobJar,
- LocalResourceType.FILE, LocalResourceVisibility.APPLICATION));
- LOG.info("The job-jar file on the remote FS is "
- + remoteJobJar.toUri().toASCIIString());
- } else {
- // Job jar may be null. For e.g, for pipes, the job jar is the hadoop
- // mapreduce jar itself which is already on the classpath.
- LOG.info("Job jar is not present. "
- + "Not adding any jar to the list of resources.");
- }
- // //////////// End of JobJar setup
-
- // //////////// Set up JobConf to be localized properly on the remote NM.
- Path path =
- MRApps.getStagingAreaDir(conf, UserGroupInformation
- .getCurrentUser().getShortUserName());
- Path remoteJobSubmitDir =
- new Path(path, oldJobId.toString());
- Path remoteJobConfPath =
- new Path(remoteJobSubmitDir, MRJobConfig.JOB_CONF_FILE);
- localResources.put(
- MRJobConfig.JOB_CONF_FILE,
- createLocalResource(remoteFS, remoteJobConfPath,
- LocalResourceType.FILE, LocalResourceVisibility.APPLICATION));
- LOG.info("The job-conf file on the remote FS is "
- + remoteJobConfPath.toUri().toASCIIString());
- // //////////// End of JobConf setup
-
- // Setup DistributedCache
- MRApps.setupDistributedCache(conf, localResources);
-
- // Setup up task credentials buffer
- Credentials taskCredentials = new Credentials();
-
- if (UserGroupInformation.isSecurityEnabled()) {
- LOG.info("Adding #" + credentials.numberOfTokens()
- + " tokens and #" + credentials.numberOfSecretKeys()
- + " secret keys for NM use for launching container");
- taskCredentials.addAll(credentials);
- }
-
- // LocalStorageToken is needed irrespective of whether security is enabled
- // or not.
- TokenCache.setJobToken(jobToken, taskCredentials);
-
- DataOutputBuffer containerTokens_dob = new DataOutputBuffer();
- LOG.info("Size of containertokens_dob is "
- + taskCredentials.numberOfTokens());
- taskCredentials.writeTokenStorageToStream(containerTokens_dob);
- taskCredentialsBuffer =
- ByteBuffer.wrap(containerTokens_dob.getData(), 0,
- containerTokens_dob.getLength());
-
- // Add shuffle token
- LOG.info("Putting shuffle token in serviceData");
- serviceData.put(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID,
- ShuffleHandler.serializeServiceData(jobToken));
-
- Apps.addToEnvironment(
- environment,
- Environment.CLASSPATH.name(),
- getInitialClasspath(conf));
- } catch (IOException e) {
- throw new YarnException(e);
- }
-
- // Shell
- environment.put(
- Environment.SHELL.name(),
- conf.get(
- MRJobConfig.MAPRED_ADMIN_USER_SHELL,
- MRJobConfig.DEFAULT_SHELL)
- );
-
- // Add pwd to LD_LIBRARY_PATH, add this before adding anything else
- Apps.addToEnvironment(
- environment,
- Environment.LD_LIBRARY_PATH.name(),
- Environment.PWD.$());
-
- // Add the env variables passed by the admin
- Apps.setEnvFromInputString(
- environment,
- conf.get(
- MRJobConfig.MAPRED_ADMIN_USER_ENV,
- MRJobConfig.DEFAULT_MAPRED_ADMIN_USER_ENV)
- );
-
- // Construct the actual Container
- // The null fields are per-container and will be constructed for each
- // container separately.
- ContainerLaunchContext container = BuilderUtils
- .newContainerLaunchContext(null, conf
- .get(MRJobConfig.USER_NAME), null, localResources,
- environment, null, serviceData, taskCredentialsBuffer,
- applicationACLs);
-
- return container;
- }
-
- static ContainerLaunchContext createContainerLaunchContext(
- Map<ApplicationAccessType, String> applicationACLs,
- ContainerId containerID, Configuration conf,
- Token<JobTokenIdentifier> jobToken, Task remoteTask,
- final org.apache.hadoop.mapred.JobID oldJobId,
- Resource assignedCapability, WrappedJvmID jvmID,
- TaskAttemptListener taskAttemptListener,
- Credentials credentials) {
-
- synchronized (commonContainerSpecLock) {
- if (commonContainerSpec == null) {
- commonContainerSpec = createCommonContainerLaunchContext(
- applicationACLs, conf, jobToken, oldJobId, credentials);
- }
- }
-
- // Fill in the fields needed per-container that are missing in the common
- // spec.
-
- // Setup environment by cloning from common env.
- Map<String, String> env = commonContainerSpec.getEnvironment();
- Map<String, String> myEnv = new HashMap<String, String>(env.size());
- myEnv.putAll(env);
- MapReduceChildJVM2.setVMEnv(myEnv, remoteTask);
-
- // Set up the launch command
- List<String> commands = MapReduceChildJVM2.getVMCommand(
- taskAttemptListener.getAddress(), remoteTask, jvmID);
-
- // Duplicate the ByteBuffers for access by multiple containers.
- Map<String, ByteBuffer> myServiceData = new HashMap<String, ByteBuffer>();
- for (Entry<String, ByteBuffer> entry : commonContainerSpec
- .getServiceData().entrySet()) {
- myServiceData.put(entry.getKey(), entry.getValue().duplicate());
- }
-
- // Construct the actual Container
- ContainerLaunchContext container = BuilderUtils.newContainerLaunchContext(
- containerID, commonContainerSpec.getUser(), assignedCapability,
- commonContainerSpec.getLocalResources(), myEnv, commands,
- myServiceData, commonContainerSpec.getContainerTokens().duplicate(),
- applicationACLs);
-
- return container;
- }
-
- /**
- * Create a {@link LocalResource} record with all the given parameters.
- */
- private static LocalResource createLocalResource(FileSystem fc, Path file,
- LocalResourceType type, LocalResourceVisibility visibility)
- throws IOException {
- FileStatus fstat = fc.getFileStatus(file);
- URL resourceURL = ConverterUtils.getYarnUrlFromPath(fc.resolvePath(fstat
- .getPath()));
- long resourceSize = fstat.getLen();
- long resourceModificationTime = fstat.getModificationTime();
-
- return BuilderUtils.newLocalResource(resourceURL, type, visibility,
- resourceSize, resourceModificationTime);
- }
-
- /**
- * Lock this on initialClasspath so that there is only one fork in the AM for
- * getting the initial class-path. TODO: We already construct
- * a parent CLC and use it for all the containers, so this should go away
- * once the mr-generated-classpath stuff is gone.
- */
- private static String getInitialClasspath(Configuration conf) throws IOException {
- synchronized (classpathLock) {
- if (initialClasspathFlag.get()) {
- return initialClasspath;
- }
- Map<String, String> env = new HashMap<String, String>();
- MRApps.setClasspath(env, conf);
- initialClasspath = env.get(Environment.CLASSPATH.name());
- initialClasspathFlag.set(true);
- return initialClasspath;
- }
- }
-
+
static String[] resolveHosts(String[] src) {
String[] result = new String[src.length];
for (int i = 0; i < src.length; i++) {
Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/recover/RecoveryService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/recover/RecoveryService.java?rev=1388596&r1=1388595&r2=1388596&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/recover/RecoveryService.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/recover/RecoveryService.java Fri Sep 21 18:11:35 2012
@@ -31,6 +31,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -568,8 +569,12 @@ public class RecoveryService extends Com
// Request container launch for new containers.
if (appContext.getAllContainers().get(cId).getState() == AMContainerState.ALLOCATED) {
- actualHandler.handle(new AMContainerLaunchRequestEvent(cId, event,
- null, yarnAttemptID.getTaskId().getJobId()));
+ TaskId taskId = yarnAttemptID.getTaskId();
+ AMContainerLaunchRequestEvent lrEvent = new AMContainerLaunchRequestEvent(
+ cId, taskId.getJobId(), taskId.getTaskType(), event.getJobToken(),
+ event.getCredentials(), false, new JobConf(appContext.getJob(
+ taskId.getJobId()).getConf()));
+ actualHandler.handle(lrEvent);
}
// Assing the task attempt to this container.
actualHandler.handle(new AMContainerAssignTAEvent(cId, yarnAttemptID,
Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerAllocator.java?rev=1388596&r1=1388595&r2=1388596&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerAllocator.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerAllocator.java Fri Sep 21 18:11:35 2012
@@ -39,6 +39,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.JobCounter;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.MRJobConfig;
@@ -75,8 +76,8 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.RackResolver;
/**
@@ -98,12 +99,9 @@ public class RMContainerAllocator extend
private volatile boolean stopEventHandling;
static {
- PRIORITY_FAST_FAIL_MAP = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Priority.class);
- PRIORITY_FAST_FAIL_MAP.setPriority(5);
- PRIORITY_REDUCE = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Priority.class);
- PRIORITY_REDUCE.setPriority(10);
- PRIORITY_MAP = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Priority.class);
- PRIORITY_MAP.setPriority(20);
+ PRIORITY_FAST_FAIL_MAP = BuilderUtils.newPriority(5);
+ PRIORITY_REDUCE = BuilderUtils.newPriority(10);
+ PRIORITY_MAP = BuilderUtils.newPriority(20);
}
protected final AppContext appContext;
@@ -970,10 +968,17 @@ public class RMContainerAllocator extend
// TODO Maybe: ApplicationACLs should be populated into the appContext from the RMCommunicator.
if (appContext.getAllContainers().get(containerId).getState() == AMContainerState.ALLOCATED) {
- eventHandler.handle(new AMContainerLaunchRequestEvent(
- containerId, attemptToLaunchRequestMap.get(assigned
- .getAttemptId()), requestor.getApplicationAcls(),
- getJob().getID()));
+ AMSchedulerTALaunchRequestEvent tlrEvent = attemptToLaunchRequestMap
+ .get(assigned.getAttemptId());
+ JobConf jobConf = new JobConf(job.getConf());
+
+ AMContainerLaunchRequestEvent launchRequest = new AMContainerLaunchRequestEvent(
+ containerId, jobId, assigned.getAttemptId().getTaskId()
+ .getTaskType(), tlrEvent.getJobToken(),
+ tlrEvent.getCredentials(), shouldProfileTaskAttempt(
+ jobConf, tlrEvent.getRemoteTask()), jobConf);
+
+ eventHandler.handle(launchRequest);
}
eventHandler.handle(new AMContainerAssignTAEvent(containerId,
assigned.getAttemptId(), attemptToLaunchRequestMap.get(
@@ -1239,6 +1244,22 @@ public class RMContainerAllocator extend
return newReq;
}
+ /*
+ * Not very useful for a re-use scheduler.
+ */
+ protected boolean shouldProfileTaskAttempt(JobConf conf,
+ org.apache.hadoop.mapred.Task remoteTask) {
+ TaskType taskType = TypeConverter.toYarn(remoteTask.getTaskID()
+ .getTaskType());
+ if (conf.getProfileEnabled()) {
+ if (conf.getProfileTaskRange(taskType == TaskType.MAP).isIncluded(
+ remoteTask.getPartition())) {
+ return true;
+ }
+ }
+ return false;
+ }
+
private static class ContainerRequestInfo {
ContainerRequestInfo(ContainerRequest containerRequest,
AMSchedulerTALaunchRequestEvent launchRequestEvent) {
Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerHelpers.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerHelpers.java?rev=1388596&view=auto
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerHelpers.java (added)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerHelpers.java Fri Sep 21 18:11:35 2012
@@ -0,0 +1,271 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app2.rm.container;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceChildJVM2;
+import org.apache.hadoop.mapred.ShuffleHandler;
+import org.apache.hadoop.mapred.WrappedJvmID;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.security.TokenCache;
+import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
+import org.apache.hadoop.mapreduce.v2.app2.TaskAttemptListener;
+import org.apache.hadoop.mapreduce.v2.util.MRApps;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.URL;
+import org.apache.hadoop.yarn.util.Apps;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+
+public class AMContainerHelpers {
+
+ private static final Log LOG = LogFactory.getLog(AMContainerHelpers.class);
+
+ private static Object commonContainerSpecLock = new Object();
+ private static ContainerLaunchContext commonContainerSpec = null;
+ private static final Object classpathLock = new Object();
+ private static AtomicBoolean initialClasspathFlag = new AtomicBoolean();
+ private static String initialClasspath = null;
+
+ /**
+ * Create a {@link LocalResource} record with all the given parameters.
+ */
+ private static LocalResource createLocalResource(FileSystem fc, Path file,
+ LocalResourceType type, LocalResourceVisibility visibility)
+ throws IOException {
+ FileStatus fstat = fc.getFileStatus(file);
+ URL resourceURL = ConverterUtils.getYarnUrlFromPath(fc.resolvePath(fstat
+ .getPath()));
+ long resourceSize = fstat.getLen();
+ long resourceModificationTime = fstat.getModificationTime();
+
+ return BuilderUtils.newLocalResource(resourceURL, type, visibility,
+ resourceSize, resourceModificationTime);
+ }
+
+ /**
+ * Lock this on initialClasspath so that there is only one fork in the AM for
+ * getting the initial class-path. TODO: We already construct a parent CLC and
+ * use it for all the containers, so this should go away once the
+ * mr-generated-classpath stuff is gone.
+ */
+ private static String getInitialClasspath(Configuration conf)
+ throws IOException {
+ synchronized (classpathLock) {
+ if (initialClasspathFlag.get()) {
+ return initialClasspath;
+ }
+ Map<String, String> env = new HashMap<String, String>();
+ MRApps.setClasspath(env, conf);
+ initialClasspath = env.get(Environment.CLASSPATH.name());
+ initialClasspathFlag.set(true);
+ return initialClasspath;
+ }
+ }
+
+ /**
+ * Create the common {@link ContainerLaunchContext} for all attempts.
+ *
+ * @param applicationACLs
+ */
+ private static ContainerLaunchContext createCommonContainerLaunchContext(
+ Map<ApplicationAccessType, String> applicationACLs, Configuration conf,
+ Token<JobTokenIdentifier> jobToken,
+ final org.apache.hadoop.mapred.JobID oldJobId, Credentials credentials) {
+
+ // Application resources
+ Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
+
+ // Application environment
+ Map<String, String> environment = new HashMap<String, String>();
+
+ // Service data
+ Map<String, ByteBuffer> serviceData = new HashMap<String, ByteBuffer>();
+
+ // Tokens
+ ByteBuffer taskCredentialsBuffer = ByteBuffer.wrap(new byte[] {});
+ try {
+ FileSystem remoteFS = FileSystem.get(conf);
+
+ // //////////// Set up JobJar to be localized properly on the remote NM.
+ String jobJar = conf.get(MRJobConfig.JAR);
+ if (jobJar != null) {
+ Path remoteJobJar = (new Path(jobJar)).makeQualified(remoteFS.getUri(),
+ remoteFS.getWorkingDirectory());
+ localResources.put(
+ MRJobConfig.JOB_JAR,
+ createLocalResource(remoteFS, remoteJobJar, LocalResourceType.FILE,
+ LocalResourceVisibility.APPLICATION));
+ LOG.info("The job-jar file on the remote FS is "
+ + remoteJobJar.toUri().toASCIIString());
+ } else {
+ // Job jar may be null. For e.g, for pipes, the job jar is the hadoop
+ // mapreduce jar itself which is already on the classpath.
+ LOG.info("Job jar is not present. "
+ + "Not adding any jar to the list of resources.");
+ }
+ // //////////// End of JobJar setup
+
+ // //////////// Set up JobConf to be localized properly on the remote NM.
+ Path path = MRApps.getStagingAreaDir(conf, UserGroupInformation
+ .getCurrentUser().getShortUserName());
+ Path remoteJobSubmitDir = new Path(path, oldJobId.toString());
+ Path remoteJobConfPath = new Path(remoteJobSubmitDir,
+ MRJobConfig.JOB_CONF_FILE);
+ localResources.put(
+ MRJobConfig.JOB_CONF_FILE,
+ createLocalResource(remoteFS, remoteJobConfPath,
+ LocalResourceType.FILE, LocalResourceVisibility.APPLICATION));
+ LOG.info("The job-conf file on the remote FS is "
+ + remoteJobConfPath.toUri().toASCIIString());
+ // //////////// End of JobConf setup
+
+ // Setup DistributedCache
+ MRApps.setupDistributedCache(conf, localResources);
+
+ // Setup up task credentials buffer
+ Credentials taskCredentials = new Credentials();
+
+ if (UserGroupInformation.isSecurityEnabled()) {
+ LOG.info("Adding #" + credentials.numberOfTokens() + " tokens and #"
+ + credentials.numberOfSecretKeys()
+ + " secret keys for NM use for launching container");
+ taskCredentials.addAll(credentials);
+ }
+
+ // LocalStorageToken is needed irrespective of whether security is enabled
+ // or not.
+ TokenCache.setJobToken(jobToken, taskCredentials);
+
+ DataOutputBuffer containerTokens_dob = new DataOutputBuffer();
+ LOG.info("Size of containertokens_dob is "
+ + taskCredentials.numberOfTokens());
+ taskCredentials.writeTokenStorageToStream(containerTokens_dob);
+ taskCredentialsBuffer = ByteBuffer.wrap(containerTokens_dob.getData(), 0,
+ containerTokens_dob.getLength());
+
+ // Add shuffle token
+ LOG.info("Putting shuffle token in serviceData");
+ serviceData.put(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID,
+ ShuffleHandler.serializeServiceData(jobToken));
+
+ Apps.addToEnvironment(environment, Environment.CLASSPATH.name(),
+ getInitialClasspath(conf));
+ } catch (IOException e) {
+ throw new YarnException(e);
+ }
+
+ // Shell
+ environment.put(Environment.SHELL.name(), conf.get(
+ MRJobConfig.MAPRED_ADMIN_USER_SHELL, MRJobConfig.DEFAULT_SHELL));
+
+ // Add pwd to LD_LIBRARY_PATH, add this before adding anything else
+ Apps.addToEnvironment(environment, Environment.LD_LIBRARY_PATH.name(),
+ Environment.PWD.$());
+
+ // Add the env variables passed by the admin
+ Apps.setEnvFromInputString(environment, conf.get(
+ MRJobConfig.MAPRED_ADMIN_USER_ENV,
+ MRJobConfig.DEFAULT_MAPRED_ADMIN_USER_ENV));
+
+ // Construct the actual Container
+ // The null fields are per-container and will be constructed for each
+ // container separately.
+ ContainerLaunchContext container = BuilderUtils.newContainerLaunchContext(
+ null, conf.get(MRJobConfig.USER_NAME), null, localResources,
+ environment, null, serviceData, taskCredentialsBuffer, applicationACLs);
+
+ return container;
+ }
+
+ static ContainerLaunchContext createContainerLaunchContext(
+ Map<ApplicationAccessType, String> applicationACLs,
+ ContainerId containerID, JobConf jobConf, TaskType taskType,
+ Token<JobTokenIdentifier> jobToken,
+ final org.apache.hadoop.mapred.JobID oldJobId,
+ Resource assignedCapability, WrappedJvmID jvmID,
+ TaskAttemptListener taskAttemptListener, Credentials credentials,
+ boolean shouldProfile) {
+
+ synchronized (commonContainerSpecLock) {
+ if (commonContainerSpec == null) {
+ commonContainerSpec = createCommonContainerLaunchContext(
+ applicationACLs, jobConf, jobToken, oldJobId, credentials);
+ }
+ }
+
+ // Fill in the fields needed per-container that are missing in the common
+ // spec.
+
+ // Setup environment by cloning from common env.
+ Map<String, String> env = commonContainerSpec.getEnvironment();
+ Map<String, String> myEnv = new HashMap<String, String>(env.size());
+ myEnv.putAll(env);
+ MapReduceChildJVM2.setVMEnv(myEnv, jobConf, taskType);
+
+ // Set up the launch command
+ List<String> commands = MapReduceChildJVM2.getVMCommand(
+ taskAttemptListener.getAddress(), jobConf, taskType, jvmID,
+ oldJobId, shouldProfile);
+
+ // Duplicate the ByteBuffers for access by multiple containers.
+ Map<String, ByteBuffer> myServiceData = new HashMap<String, ByteBuffer>();
+ for (Entry<String, ByteBuffer> entry : commonContainerSpec.getServiceData()
+ .entrySet()) {
+ myServiceData.put(entry.getKey(), entry.getValue().duplicate());
+ }
+
+ // Construct the actual Container
+ ContainerLaunchContext container = BuilderUtils.newContainerLaunchContext(
+ containerID, commonContainerSpec.getUser(), assignedCapability,
+ commonContainerSpec.getLocalResources(), myEnv, commands,
+ myServiceData, commonContainerSpec.getContainerTokens().duplicate(),
+ applicationACLs);
+
+ return container;
+ }
+
+}
Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerImpl.java?rev=1388596&r1=1388595&r2=1388596&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerImpl.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerImpl.java Fri Sep 21 18:11:35 2012
@@ -18,8 +18,6 @@
package org.apache.hadoop.mapreduce.v2.app2.rm.container;
-import java.io.IOException;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
@@ -27,56 +25,30 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.mapred.MapReduceChildJVM2;
-import org.apache.hadoop.mapred.ShuffleHandler;
-import org.apache.hadoop.mapred.Task;
+import org.apache.hadoop.mapred.JobID;
import org.apache.hadoop.mapred.WrappedJvmID;
-import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.ContainerHeartbeatHandler;
-import org.apache.hadoop.mapreduce.security.TokenCache;
-import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app2.AppContext;
import org.apache.hadoop.mapreduce.v2.app2.TaskAttemptListener;
import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptDiagnosticsUpdateEvent;
-import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventKillRequest;
import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventTerminated;
-import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerEventContainerCompleted;
-import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerTALaunchRequestEvent;
import org.apache.hadoop.mapreduce.v2.app2.rm.NMCommunicatorLaunchRequestEvent;
import org.apache.hadoop.mapreduce.v2.app2.rm.NMCommunicatorStopRequestEvent;
import org.apache.hadoop.mapreduce.v2.app2.rm.RMCommunicatorContainerDeAllocateRequestEvent;
-import org.apache.hadoop.mapreduce.v2.util.MRApps;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.yarn.YarnException;
-import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
-import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.api.records.LocalResourceType;
-import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
@@ -84,10 +56,8 @@ import org.apache.hadoop.yarn.state.Mult
import org.apache.hadoop.yarn.state.SingleArcTransition;
import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory;
-import org.apache.hadoop.yarn.util.Apps;
-import org.apache.hadoop.yarn.util.BuilderUtils;
-import org.apache.hadoop.yarn.util.ConverterUtils;
+@SuppressWarnings("rawtypes")
public class AMContainerImpl implements AMContainer {
private static final Log LOG = LogFactory.getLog(AMContainerImpl.class);
@@ -101,13 +71,8 @@ public class AMContainerImpl implements
private final AppContext appContext;
private final ContainerHeartbeatHandler containerHeartbeatHandler;
private final TaskAttemptListener taskAttemptListener;
-
- private static Object commonContainerSpecLock = new Object();
- private static ContainerLaunchContext commonContainerSpec = null;
- private static final Object classpathLock = new Object();
- private static AtomicBoolean initialClasspathFlag = new AtomicBoolean();
- private static String initialClasspath = null;
-
+ protected final EventHandler eventHandler;
+
private final List<TaskAttemptId> completedAttempts = new LinkedList<TaskAttemptId>();
// TODO Maybe this should be pulled from the TaskAttempt.s
@@ -130,8 +95,7 @@ public class AMContainerImpl implements
private ContainerLaunchContext clc;
private WrappedJvmID jvmId;
- @SuppressWarnings("rawtypes")
- protected EventHandler eventHandler;
+
private static boolean stateMachineInited = false;
private static StateMachineFactory
@@ -194,16 +158,17 @@ public class AMContainerImpl implements
.installTopology();
}
-
- @SuppressWarnings("rawtypes")
+
+ // Attempting to use a container based purely on reosurces required, etc needs
+ // additional change - JvmID, YarnChild, etc depend on TaskType.
public AMContainerImpl(Container container, ContainerHeartbeatHandler chh,
- TaskAttemptListener tal, EventHandler eventHandler, AppContext appContext) {
+ TaskAttemptListener tal, AppContext appContext) {
ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
this.readLock = rwLock.readLock();
this.writeLock = rwLock.writeLock();
this.container = container;
this.containerId = container.getId();
- this.eventHandler = eventHandler;
+ this.eventHandler = appContext.getEventHandler();
this.appContext = appContext;
this.containerHeartbeatHandler = chh;
this.taskAttemptListener = tal;
@@ -307,8 +272,7 @@ public class AMContainerImpl implements
this.eventHandler.handle(event);
}
- // TODO Maybe have pullTA send out an attemptId. TAL talks to the TaskAttempt
- // to fetch the actual RemoteTask.
+ // Push the TaskAttempt to the TAL, instead of the TAL pulling when a JVM asks for a TaskAttempt.
public org.apache.hadoop.mapred.Task pullTaskAttempt() {
this.writeLock.lock();
try {
@@ -333,23 +297,29 @@ public class AMContainerImpl implements
@Override
public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
AMContainerLaunchRequestEvent event = (AMContainerLaunchRequestEvent) cEvent;
- AMSchedulerTALaunchRequestEvent taEvent = event.getLaunchRequestEvent();
-
- // TODO LATER May be possible to forget about the clc or a part of it after
- // launch. Save AM resources.
-
- container.jvmId = new WrappedJvmID(taEvent.getRemoteTask().getJobID(), taEvent.getRemoteTask().isMapTask(), container.containerId.getId());
- container.clc = createContainerLaunchContext(
- event.getApplicationAcls(), container.getContainerId(),
- container.appContext.getJob(event.getJobId()).getConf(), taEvent.getJobToken(),
- taEvent.getRemoteTask(), TypeConverter.fromYarn(event.getJobId()),
+ JobID oldJobID = TypeConverter.fromYarn(event.getJobId());
+ container.jvmId = new WrappedJvmID(oldJobID,
+ event.getTaskTypeForContainer() == TaskType.MAP,
+ container.containerId.getId());
+
+ container.clc = AMContainerHelpers.createContainerLaunchContext(
+ container.appContext.getApplicationACLs(),
+ container.getContainerId(), event.getJobConf(),
+ event.getTaskTypeForContainer(), event.getJobToken(),
+ TypeConverter.fromYarn(event.getJobId()),
container.getContainer().getResource(), container.jvmId,
- container.taskAttemptListener, taEvent.getCredentials());
+ container.taskAttemptListener, event.getCredentials(),
+ event.shouldProfile());
- container.sendEvent(new NMCommunicatorLaunchRequestEvent(container.clc, container.container));
+ container.sendEvent(new NMCommunicatorLaunchRequestEvent(container.clc,
+ container.container));
LOG.info("Sending Launch Request for Container with id: "
+ container.clc.getContainerId());
+ // Forget about the clc to save resources. At some point, part of the clc
+ // info may need to be exposed to the scheduler to figure out whether a
+ // container can be used for a specific TaskAttempt.
+ container.clc = null;
}
}
@@ -364,7 +334,9 @@ public class AMContainerImpl implements
public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
AMContainerAssignTAEvent event = (AMContainerAssignTAEvent) cEvent;
container.inError = true;
- container.sendTerminatedToTaskAttempt(event.getTaskAttemptId());
+ container.sendTerminatedToTaskAttempt(event.getTaskAttemptId(),
+ "AMScheduler Error: TaskAttempt should not be" +
+ " allocated before a launch request.");
container.sendCompletedToScheduler();
container.deAllocate();
LOG.warn("Unexpected TA Assignment: TAId: " + event.getTaskAttemptId()
@@ -422,8 +394,11 @@ public class AMContainerImpl implements
protected void sendCompletedToScheduler() {
sendEvent(new AMSchedulerEventContainerCompleted(containerId));
}
-
- protected void sendTerminatedToTaskAttempt(TaskAttemptId taId) {
+
+ protected void sendTerminatedToTaskAttempt(TaskAttemptId taId, String message) {
+ if (message != null) {
+ sendEvent(new TaskAttemptDiagnosticsUpdateEvent(taId, message));
+ }
sendEvent(new TaskAttemptEventTerminated(taId));
}
@@ -462,14 +437,19 @@ public class AMContainerImpl implements
AMContainerAssignTAEvent event = (AMContainerAssignTAEvent) cEvent;
if (container.pendingAttempt != null) {
container.inError = true;
- container.sendTerminatedToTaskAttempt(event.getTaskAttemptId());
+ String errorMessage = "AMScheduler Error: Multiple simultaneous " +
+ "taskAttempt allocations to: " + container.getContainerId();
+ container.sendTerminatedToTaskAttempt(event.getTaskAttemptId(),
+ errorMessage);
container.deAllocate();
+ LOG.warn(errorMessage);
return AMContainerState.STOPPING;
}
container.pendingAttempt = event.getTaskAttemptId();
container.remoteTaskMap.put(event.getTaskAttemptId(),
event.getRemoteTask());
- // TODO XXX: Consider registering with the TAL, instead of the TAL pulling.
+ // TODO Consider registering with the TAL, instead of the TAL pulling.
+ // Possibly after splitting TAL and ContainerListener.
return container.getState();
}
}
@@ -515,7 +495,6 @@ public class AMContainerImpl implements
}
LOG.info("XXX: Assigned task + [" + container.runningAttempt + "] to container: [" + container.getContainerId() + "]");
return AMContainerState.RUNNING;
- // TODO XXX: Make sure the TAL sends out a TA_STARTED_REMOTELY, along with the shuffle port.
} else {
return AMContainerState.IDLE;
}
@@ -531,7 +510,6 @@ public class AMContainerImpl implements
SingleArcTransition<AMContainerImpl, AMContainerEvent> {
@Override
public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
- // TODO XXX: Send diagnostics to pending task attempt. Update action in transition table.
if (container.pendingAttempt != null) {
AMContainerEventLaunchFailed event = (AMContainerEventLaunchFailed) cEvent;
container.sendEvent(new TaskAttemptDiagnosticsUpdateEvent(
@@ -551,7 +529,10 @@ public class AMContainerImpl implements
@Override
public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
if (container.pendingAttempt != null) {
- container.sendTerminatedToTaskAttempt(container.pendingAttempt);
+ String errorMessage = "Container" + container.getContainerId()
+ + " failed. Received COMPLETED event while trying to launch";
+ container.sendTerminatedToTaskAttempt(container.pendingAttempt,errorMessage);
+ LOG.warn(errorMessage);
// TODO XXX Maybe nullify pendingAttempt.
}
container.sendCompletedToScheduler();
@@ -605,10 +586,15 @@ public class AMContainerImpl implements
AMContainerAssignTAEvent event = (AMContainerAssignTAEvent) cEvent;
if (container.pendingAttempt != null) {
container.inError = true;
- container.sendTerminatedToTaskAttempt(event.getTaskAttemptId());
+ String errorMessage = "AMScheduler Error: Multiple simultaneous "
+ + "taskAttempt allocations to: " + container.getContainerId();
+ container.sendTerminatedToTaskAttempt(event.getTaskAttemptId(),
+ errorMessage);
+ LOG.warn(errorMessage);
container.sendStopRequestToNM();
container.deAllocate();
container.containerHeartbeatHandler.unregister(container.containerId);
+
return AMContainerState.STOPPING;
}
container.pendingAttempt = event.getTaskAttemptId();
@@ -631,7 +617,7 @@ public class AMContainerImpl implements
LOG.info("Cotnainer with id: " + container.getContainerId()
+ " Completed." + " Previous state was: " + container.getState());
if (container.pendingAttempt != null) {
- container.sendTerminatedToTaskAttempt(container.pendingAttempt);
+ container.sendTerminatedToTaskAttempt(container.pendingAttempt, null);
}
container.sendCompletedToScheduler();
container.containerHeartbeatHandler.unregister(container.containerId);
@@ -689,7 +675,7 @@ public class AMContainerImpl implements
SingleArcTransition<AMContainerImpl, AMContainerEvent> {
@Override
public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
- container.sendTerminatedToTaskAttempt(container.runningAttempt);
+ container.sendTerminatedToTaskAttempt(container.runningAttempt, null);
container.sendCompletedToScheduler();
container.containerHeartbeatHandler.unregister(container.containerId);
container.unregisterAttemptFromListener(container.runningAttempt);
@@ -756,7 +742,11 @@ public class AMContainerImpl implements
public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
AMContainerAssignTAEvent event = (AMContainerAssignTAEvent) cEvent;
container.inError = true;
- container.sendTerminatedToTaskAttempt(event.getTaskAttemptId());
+ String errorMessage = "AttemptId: " + event.getTaskAttemptId()
+ + " cannot be allocated to container: " + container.getContainerId()
+ + " in STOPPING state";
+ container.sendTerminatedToTaskAttempt(event.getTaskAttemptId(),
+ errorMessage);
}
}
@@ -784,7 +774,11 @@ public class AMContainerImpl implements
@Override
public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
AMContainerAssignTAEvent event = (AMContainerAssignTAEvent) cEvent;
- container.sendTerminatedToTaskAttempt(event.getTaskAttemptId());
+ String errorMessage = "AttemptId: " + event.getTaskAttemptId()
+ + " cannot be allocated to container: " + container.getContainerId()
+ + " in COMPLETED state";
+ container.sendTerminatedToTaskAttempt(event.getTaskAttemptId(),
+ errorMessage);
}
}
@@ -797,14 +791,15 @@ public class AMContainerImpl implements
SingleArcTransition<AMContainerImpl, AMContainerEvent> {
@Override
public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+ // XXX: Would some of these events not have gone out when entering the STOPPING state. Fix errorMessages
if (container.pendingAttempt != null) {
- container.sendTerminatedToTaskAttempt(container.pendingAttempt);
+ container.sendTerminatedToTaskAttempt(container.pendingAttempt, null);
}
if (container.runningAttempt != null) {
- container.sendTerminatedToTaskAttempt(container.runningAttempt);
+ container.sendTerminatedToTaskAttempt(container.runningAttempt, null);
}
if (container.interruptedEvent != null) {
- container.sendTerminatedToTaskAttempt(container.interruptedEvent);
+ container.sendTerminatedToTaskAttempt(container.interruptedEvent, null);
}
container.sendCompletedToScheduler();
}
@@ -876,7 +871,10 @@ public class AMContainerImpl implements
public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
AMContainerAssignTAEvent event = (AMContainerAssignTAEvent) cEvent;
container.inError = true;
- container.sendTerminatedToTaskAttempt(event.getTaskAttemptId());
+ String errorMessage = "AttemptId: " + event.getTaskAttemptId()
+ + " cannot be allocated to container: " + container.getContainerId()
+ + " in RUNNING state";
+ container.sendTerminatedToTaskAttempt(event.getTaskAttemptId(), errorMessage);
container.sendStopRequestToNM();
container.deAllocate();
container.unregisterAttemptFromListener(container.runningAttempt);
@@ -948,217 +946,5 @@ public class AMContainerImpl implements
// TODO Create a generic ERROR state. Container tries informing relevant components in this case.
- /**
- * Create a {@link LocalResource} record with all the given parameters.
- */
- private static LocalResource createLocalResource(FileSystem fc, Path file,
- LocalResourceType type, LocalResourceVisibility visibility)
- throws IOException {
- FileStatus fstat = fc.getFileStatus(file);
- URL resourceURL = ConverterUtils.getYarnUrlFromPath(fc.resolvePath(fstat
- .getPath()));
- long resourceSize = fstat.getLen();
- long resourceModificationTime = fstat.getModificationTime();
-
- return BuilderUtils.newLocalResource(resourceURL, type, visibility,
- resourceSize, resourceModificationTime);
- }
-
- /**
- * Lock this on initialClasspath so that there is only one fork in the AM for
- * getting the initial class-path. TODO: We already construct
- * a parent CLC and use it for all the containers, so this should go away
- * once the mr-generated-classpath stuff is gone.
- */
- private static String getInitialClasspath(Configuration conf) throws IOException {
- synchronized (classpathLock) {
- if (initialClasspathFlag.get()) {
- return initialClasspath;
- }
- Map<String, String> env = new HashMap<String, String>();
- MRApps.setClasspath(env, conf);
- initialClasspath = env.get(Environment.CLASSPATH.name());
- initialClasspathFlag.set(true);
- return initialClasspath;
- }
- }
-
- /**
- * Create the common {@link ContainerLaunchContext} for all attempts.
- * @param applicationACLs
- */
- private static ContainerLaunchContext createCommonContainerLaunchContext(
- Map<ApplicationAccessType, String> applicationACLs, Configuration conf,
- Token<JobTokenIdentifier> jobToken,
- final org.apache.hadoop.mapred.JobID oldJobId,
- Credentials credentials) {
-
- // Application resources
- Map<String, LocalResource> localResources =
- new HashMap<String, LocalResource>();
-
- // Application environment
- Map<String, String> environment = new HashMap<String, String>();
-
- // Service data
- Map<String, ByteBuffer> serviceData = new HashMap<String, ByteBuffer>();
-
- // Tokens
- ByteBuffer taskCredentialsBuffer = ByteBuffer.wrap(new byte[]{});
- try {
- FileSystem remoteFS = FileSystem.get(conf);
-
- // //////////// Set up JobJar to be localized properly on the remote NM.
- String jobJar = conf.get(MRJobConfig.JAR);
- if (jobJar != null) {
- Path remoteJobJar = (new Path(jobJar)).makeQualified(remoteFS
- .getUri(), remoteFS.getWorkingDirectory());
- localResources.put(
- MRJobConfig.JOB_JAR,
- createLocalResource(remoteFS, remoteJobJar,
- LocalResourceType.FILE, LocalResourceVisibility.APPLICATION));
- LOG.info("The job-jar file on the remote FS is "
- + remoteJobJar.toUri().toASCIIString());
- } else {
- // Job jar may be null. For e.g, for pipes, the job jar is the hadoop
- // mapreduce jar itself which is already on the classpath.
- LOG.info("Job jar is not present. "
- + "Not adding any jar to the list of resources.");
- }
- // //////////// End of JobJar setup
-
- // //////////// Set up JobConf to be localized properly on the remote NM.
- Path path =
- MRApps.getStagingAreaDir(conf, UserGroupInformation
- .getCurrentUser().getShortUserName());
- Path remoteJobSubmitDir =
- new Path(path, oldJobId.toString());
- Path remoteJobConfPath =
- new Path(remoteJobSubmitDir, MRJobConfig.JOB_CONF_FILE);
- localResources.put(
- MRJobConfig.JOB_CONF_FILE,
- createLocalResource(remoteFS, remoteJobConfPath,
- LocalResourceType.FILE, LocalResourceVisibility.APPLICATION));
- LOG.info("The job-conf file on the remote FS is "
- + remoteJobConfPath.toUri().toASCIIString());
- // //////////// End of JobConf setup
-
- // Setup DistributedCache
- MRApps.setupDistributedCache(conf, localResources);
-
- // Setup up task credentials buffer
- Credentials taskCredentials = new Credentials();
-
- if (UserGroupInformation.isSecurityEnabled()) {
- LOG.info("Adding #" + credentials.numberOfTokens()
- + " tokens and #" + credentials.numberOfSecretKeys()
- + " secret keys for NM use for launching container");
- taskCredentials.addAll(credentials);
- }
-
- // LocalStorageToken is needed irrespective of whether security is enabled
- // or not.
- TokenCache.setJobToken(jobToken, taskCredentials);
-
- DataOutputBuffer containerTokens_dob = new DataOutputBuffer();
- LOG.info("Size of containertokens_dob is "
- + taskCredentials.numberOfTokens());
- taskCredentials.writeTokenStorageToStream(containerTokens_dob);
- taskCredentialsBuffer =
- ByteBuffer.wrap(containerTokens_dob.getData(), 0,
- containerTokens_dob.getLength());
-
- // Add shuffle token
- LOG.info("Putting shuffle token in serviceData");
- serviceData.put(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID,
- ShuffleHandler.serializeServiceData(jobToken));
-
- Apps.addToEnvironment(
- environment,
- Environment.CLASSPATH.name(),
- getInitialClasspath(conf));
- } catch (IOException e) {
- throw new YarnException(e);
- }
-
- // Shell
- environment.put(
- Environment.SHELL.name(),
- conf.get(
- MRJobConfig.MAPRED_ADMIN_USER_SHELL,
- MRJobConfig.DEFAULT_SHELL)
- );
-
- // Add pwd to LD_LIBRARY_PATH, add this before adding anything else
- Apps.addToEnvironment(
- environment,
- Environment.LD_LIBRARY_PATH.name(),
- Environment.PWD.$());
-
- // Add the env variables passed by the admin
- Apps.setEnvFromInputString(
- environment,
- conf.get(
- MRJobConfig.MAPRED_ADMIN_USER_ENV,
- MRJobConfig.DEFAULT_MAPRED_ADMIN_USER_ENV)
- );
-
- // Construct the actual Container
- // The null fields are per-container and will be constructed for each
- // container separately.
- ContainerLaunchContext container = BuilderUtils
- .newContainerLaunchContext(null, conf
- .get(MRJobConfig.USER_NAME), null, localResources,
- environment, null, serviceData, taskCredentialsBuffer,
- applicationACLs);
-
- return container;
- }
-
- static ContainerLaunchContext createContainerLaunchContext(
- Map<ApplicationAccessType, String> applicationACLs,
- ContainerId containerID, Configuration conf,
- Token<JobTokenIdentifier> jobToken, Task remoteTask,
- final org.apache.hadoop.mapred.JobID oldJobId,
- Resource assignedCapability, WrappedJvmID jvmID,
- TaskAttemptListener taskAttemptListener,
- Credentials credentials) {
-
- synchronized (commonContainerSpecLock) {
- if (commonContainerSpec == null) {
- commonContainerSpec = createCommonContainerLaunchContext(
- applicationACLs, conf, jobToken, oldJobId, credentials);
- }
- }
- // Fill in the fields needed per-container that are missing in the common
- // spec.
-
- // Setup environment by cloning from common env.
- Map<String, String> env = commonContainerSpec.getEnvironment();
- Map<String, String> myEnv = new HashMap<String, String>(env.size());
- myEnv.putAll(env);
- MapReduceChildJVM2.setVMEnv(myEnv, remoteTask);
-
- // Set up the launch command
- List<String> commands = MapReduceChildJVM2.getVMCommand(
- taskAttemptListener.getAddress(), remoteTask, jvmID);
-
- // Duplicate the ByteBuffers for access by multiple containers.
- Map<String, ByteBuffer> myServiceData = new HashMap<String, ByteBuffer>();
- for (Entry<String, ByteBuffer> entry : commonContainerSpec
- .getServiceData().entrySet()) {
- myServiceData.put(entry.getKey(), entry.getValue().duplicate());
- }
-
- // Construct the actual Container
- ContainerLaunchContext container = BuilderUtils.newContainerLaunchContext(
- containerID, commonContainerSpec.getUser(), assignedCapability,
- commonContainerSpec.getLocalResources(), myEnv, commands,
- myServiceData, commonContainerSpec.getContainerTokens().duplicate(),
- applicationACLs);
-
- return container;
- }
-
}
Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerLaunchRequestEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerLaunchRequestEvent.java?rev=1388596&r1=1388595&r2=1388596&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerLaunchRequestEvent.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerLaunchRequestEvent.java Fri Sep 21 18:11:35 2012
@@ -1,37 +1,73 @@
-package org.apache.hadoop.mapreduce.v2.app2.rm.container;
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
-import java.util.Map;
+package org.apache.hadoop.mapreduce.v2.app2.rm.container;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
-import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerTALaunchRequestEvent;
-import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.records.ContainerId;
public class AMContainerLaunchRequestEvent extends AMContainerEvent {
- // Temporarily sending in the event from the task.
-// private final ContainerLaunchContext clc;
- private final AMSchedulerTALaunchRequestEvent event;
- private final Map<ApplicationAccessType, String> applicationAcls;
private final JobId jobId;
-
- public AMContainerLaunchRequestEvent(ContainerId containerId, AMSchedulerTALaunchRequestEvent event, Map<ApplicationAccessType, String> applicationAcls, JobId jobId) {
+ private final TaskType taskTypeForContainer;
+ private final Token<JobTokenIdentifier> jobToken;
+ private final Credentials credentials;
+ private final boolean shouldProfile;
+ private final JobConf jobConf;
+
+ public AMContainerLaunchRequestEvent(ContainerId containerId, JobId jobId,
+ TaskType taskType, Token<JobTokenIdentifier> jobToken,
+ Credentials credentials, boolean shouldProfile, JobConf jobConf) {
super(containerId, AMContainerEventType.C_START_REQUEST);
- this.event = event;
- this.applicationAcls = applicationAcls;
this.jobId = jobId;
- }
-
- // TODO XXX: Temporary.
- public AMSchedulerTALaunchRequestEvent getLaunchRequestEvent() {
- return event;
- }
-
- public Map<ApplicationAccessType, String> getApplicationAcls() {
- return this.applicationAcls;
+ this.taskTypeForContainer = taskType;
+ this.jobToken = jobToken;
+ this.credentials = credentials;
+ this.shouldProfile = shouldProfile;
+ this.jobConf = jobConf;
}
public JobId getJobId() {
return this.jobId;
}
+
+ public TaskType getTaskTypeForContainer() {
+ return this.taskTypeForContainer;
+ }
+
+ public Token<JobTokenIdentifier> getJobToken() {
+ return this.jobToken;
+ }
+
+ public Credentials getCredentials() {
+ return this.credentials;
+ }
+
+ public boolean shouldProfile() {
+ return this.shouldProfile;
+ }
+
+ public JobConf getJobConf() {
+ return this.jobConf;
+ }
}
Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerMap.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerMap.java?rev=1388596&r1=1388595&r2=1388596&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerMap.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerMap.java Fri Sep 21 18:11:35 2012
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.hadoop.mapreduce.v2.app2.rm.container;
import java.util.Collection;
@@ -13,25 +31,21 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.service.AbstractService;
-public class AMContainerMap extends AbstractService
- implements EventHandler<AMContainerEvent> {
+public class AMContainerMap extends AbstractService implements
+ EventHandler<AMContainerEvent> {
private static final Log LOG = LogFactory.getLog(AMContainerMap.class);
-
+
private final ContainerHeartbeatHandler chh;
private final TaskAttemptListener tal;
- @SuppressWarnings("rawtypes")
- private final EventHandler eventHandler;
private final AppContext context;
- private final ConcurrentHashMap<ContainerId, AMContainer>containerMap;
+ private final ConcurrentHashMap<ContainerId, AMContainer> containerMap;
- @SuppressWarnings("rawtypes")
public AMContainerMap(ContainerHeartbeatHandler chh, TaskAttemptListener tal,
- EventHandler eventHandler, AppContext context) {
+ AppContext context) {
super("AMContainerMaps");
this.chh = chh;
this.tal = tal;
- this.eventHandler = eventHandler;
this.context = context;
this.containerMap = new ConcurrentHashMap<ContainerId, AMContainer>();
}
@@ -43,16 +57,15 @@ public class AMContainerMap extends Abst
}
public void addContainerIfNew(Container container) {
- AMContainer amc = new AMContainerImpl(container, chh, tal, eventHandler,
- context);
+ AMContainer amc = new AMContainerImpl(container, chh, tal, context);
containerMap.putIfAbsent(container.getId(), amc);
}
-
+
public AMContainer get(ContainerId containerId) {
return containerMap.get(containerId);
}
-
- public Collection<AMContainer>values() {
+
+ public Collection<AMContainer> values() {
return containerMap.values();
}
}
\ No newline at end of file
Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/MRApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/MRApp.java?rev=1388596&r1=1388595&r2=1388596&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/MRApp.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/MRApp.java Fri Sep 21 18:11:35 2012
@@ -33,6 +33,7 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.WrappedJvmID;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.JobID;
@@ -637,8 +638,11 @@ public class MRApp extends MRAppMaster {
if (getContext().getAllContainers().get(cId).getState() == AMContainerState.ALLOCATED) {
LOG.info("XXX: Sending launch request for container: " + cId
+ " for taskAttemptId: " + lEvent.getAttemptID());
- getContext().getEventHandler().handle(
- new AMContainerLaunchRequestEvent(cId, lEvent, appAcls, jobId));
+ AMContainerLaunchRequestEvent lrEvent = new AMContainerLaunchRequestEvent(
+ cId, jobId, lEvent.getAttemptID().getTaskId().getTaskType(),
+ lEvent.getJobToken(), lEvent.getCredentials(), false,
+ new JobConf(getContext().getJob(jobId).getConf()));
+ getContext().getEventHandler().handle(lrEvent);
}
LOG.info("XXX: Assigning attempt [" + lEvent.getAttemptID()
+ "] to Container [" + cId + "]");
Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/rm/TestRMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/rm/TestRMContainerAllocator.java?rev=1388596&r1=1388595&r2=1388596&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/rm/TestRMContainerAllocator.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/rm/TestRMContainerAllocator.java Fri Sep 21 18:11:35 2012
@@ -970,7 +970,7 @@ public class TestRMContainerAllocator {
AMContainerMap amContainerMap = new AMContainerMap(
mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
- eventHandler, appContext);
+ appContext);
amContainerMap.init(conf);
amContainerMap.start();
when(appContext.getAllContainers()).thenReturn(amContainerMap);
|