hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject svn commit: r1388596 - in /hadoop/common/branches/MR-3902/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapred/ hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/a...
Date Fri, 21 Sep 2012 18:11:36 GMT
Author: sseth
Date: Fri Sep 21 18:11:35 2012
New Revision: 1388596

URL: http://svn.apache.org/viewvc?rev=1388596&view=rev
Log:
MAPREDUCE-4663. Container Launch should be independent of o.a.h.m.Task (sseth)

Added:
    hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerHelpers.java
Modified:
    hadoop/common/branches/MR-3902/hadoop-mapreduce-project/CHANGES.txt.MR-3902
    hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM2.java
    hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapred/YarnChild2.java
    hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/MRAppMaster.java
    hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TaskAttemptImplHelpers.java
    hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/recover/RecoveryService.java
    hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerAllocator.java
    hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerImpl.java
    hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerLaunchRequestEvent.java
    hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerMap.java
    hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/MRApp.java
    hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/rm/TestRMContainerAllocator.java

Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/CHANGES.txt.MR-3902
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/CHANGES.txt.MR-3902?rev=1388596&r1=1388595&r2=1388596&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/CHANGES.txt.MR-3902 (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/CHANGES.txt.MR-3902 Fri Sep 21 18:11:35 2012
@@ -24,3 +24,5 @@ Branch MR-3902
   MAPREDUCE-4665. Use the configured shuffle port and application ACLs (sseth)
 
   MAPREDUCE-4664. ContainerHeartbeatHandler should be pinged on a getTask call (sseth)
+
+  MAPREDUCE-4663. Container Launch should be independent of o.a.h.m.Task (sseth)

Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM2.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM2.java?rev=1388596&r1=1388595&r2=1388596&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM2.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM2.java Fri Sep 21 18:11:35 2012
@@ -28,6 +28,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.TaskLog.LogName;
 import org.apache.hadoop.mapreduce.ID;
 import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.mapreduce.v2.util.MRApps;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
@@ -65,13 +66,11 @@ public class MapReduceChildJVM2 {
     }
   }
   
-  public static void setVMEnv(Map<String, String> environment,
-      Task task) {
-
-    JobConf conf = task.conf;
+  public static void setVMEnv(Map<String, String> environment, JobConf conf,
+      TaskType taskType) {
 
     // Add the env variables passed by the user
-    String mapredChildEnv = getChildEnv(conf, task.isMapTask());
+    String mapredChildEnv = getChildEnv(conf, taskType == TaskType.MAP);
     Apps.setEnvFromInputString(environment, mapredChildEnv);
 
     // Set logging level in the environment.
@@ -79,7 +78,7 @@ public class MapReduceChildJVM2 {
     // streaming) it will have the correct loglevel.
     environment.put(
         "HADOOP_ROOT_LOGGER", 
-        getChildLogLevel(conf, task.isMapTask()) + ",CLA"); 
+        getChildLogLevel(conf, taskType == TaskType.MAP) + ",CLA"); 
 
     // TODO: The following is useful for instance in streaming tasks. Should be
     // set in ApplicationMaster's env by the RM.
@@ -93,7 +92,7 @@ public class MapReduceChildJVM2 {
     // properties.
     long logSize = TaskLog.getTaskLogLength(conf);
     Vector<String> logProps = new Vector<String>(4);
-    setupLog4jProperties(task, logProps, logSize);
+    setupLog4jProperties(conf, taskType, logProps, logSize);
     Iterator<String> it = logProps.iterator();
     StringBuffer buffer = new StringBuffer();
     while (it.hasNext()) {
@@ -148,21 +147,19 @@ public class MapReduceChildJVM2 {
     return adminClasspath + " " + userClasspath;
   }
 
-  private static void setupLog4jProperties(Task task,
-      Vector<String> vargs,
-      long logSize) {
-    String logLevel = getChildLogLevel(task.conf, task.isMapTask()); 
+  private static void setupLog4jProperties(JobConf conf, TaskType taskType,
+      Vector<String> vargs, long logSize) {
+    String logLevel = getChildLogLevel(conf, taskType == TaskType.MAP);
     MRApps.addLog4jSystemProperties(logLevel, logSize, vargs);
   }
 
   public static List<String> getVMCommand(
-      InetSocketAddress taskAttemptListenerAddr, Task task, 
-      ID jvmID) {
+      InetSocketAddress taskAttemptListenerAddr, JobConf conf, TaskType taskType, 
+      ID jvmID, JobID jobID, boolean shouldProfile) {
 
-    TaskAttemptID attemptID = task.getTaskID();
-    JobConf conf = task.conf;
+    // TaskAttemptID attemptID = task.getTaskID();
 
-    Vector<String> vargs = new Vector<String>(8);
+    Vector<String> vargs = new Vector<String>(9);
 
     vargs.add("exec");
     vargs.add(Environment.JAVA_HOME.$() + "/bin/java");
@@ -195,8 +192,9 @@ public class MapReduceChildJVM2 {
     //    </value>
     //  </property>
     //
-    String javaOpts = getChildJavaOpts(conf, task.isMapTask());
-    javaOpts = javaOpts.replace("@taskid@", attemptID.toString());
+    String javaOpts = getChildJavaOpts(conf, taskType == TaskType.MAP);
+    // Broken by the AM re-factor. TaskID is not JVM specific.
+    // javaOpts = javaOpts.replace("@taskid@", attemptID.toString());
     String [] javaOptsSplit = javaOpts.split(" ");
     for (int i = 0; i < javaOptsSplit.length; i++) {
       vargs.add(javaOptsSplit[i]);
@@ -208,24 +206,16 @@ public class MapReduceChildJVM2 {
 
     // Setup the log4j prop
     long logSize = TaskLog.getTaskLogLength(conf);
-    setupLog4jProperties(task, vargs, logSize);
+    setupLog4jProperties(conf, taskType, vargs, logSize);
 
-    if (conf.getProfileEnabled()) {
-      if (conf.getProfileTaskRange(task.isMapTask()
-                                   ).isIncluded(task.getPartition())) {
-        vargs.add(
-            String.format(
-                conf.getProfileParams(), 
-                getTaskLogFile(TaskLog.LogName.PROFILE)
-                )
-            );
-        if (task.isMapTask()) {
-          vargs.add(conf.get(MRJobConfig.TASK_MAP_PROFILE_PARAMS, ""));
-        }
-        else {
-          vargs.add(conf.get(MRJobConfig.TASK_REDUCE_PROFILE_PARAMS, ""));
-        }
-        
+    // Decision to profile needs to be made in the scheduler.
+    if (shouldProfile) {
+      vargs.add(String.format(conf.getProfileParams(),
+          getTaskLogFile(TaskLog.LogName.PROFILE)));
+      if (taskType == TaskType.MAP) {
+        vargs.add(conf.get(MRJobConfig.TASK_MAP_PROFILE_PARAMS, ""));
+      } else {
+        vargs.add(conf.get(MRJobConfig.TASK_REDUCE_PROFILE_PARAMS, ""));
       }
     }
 
@@ -233,8 +223,10 @@ public class MapReduceChildJVM2 {
     vargs.add(YarnChild2.class.getName());  // main of Child
     // pass TaskAttemptListener's address
     vargs.add(taskAttemptListenerAddr.getAddress().getHostAddress()); 
-    vargs.add(Integer.toString(taskAttemptListenerAddr.getPort())); 
-    vargs.add(attemptID.toString());                      // pass task identifier
+    vargs.add(Integer.toString(taskAttemptListenerAddr.getPort()));
+    // Set the job id, and task type.
+    vargs.add(jobID.toString());
+    vargs.add(taskType.toString());
 
     // Finally add the jvmID
     vargs.add(String.valueOf(jvmID.getId()));

Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapred/YarnChild2.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapred/YarnChild2.java?rev=1388596&r1=1388595&r2=1388596&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapred/YarnChild2.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapred/YarnChild2.java Fri Sep 21 18:11:35 2012
@@ -39,7 +39,6 @@ import org.apache.hadoop.fs.LocalDirAllo
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.MRJobConfig;
@@ -70,6 +69,10 @@ class YarnChild2 {
 
   static volatile TaskAttemptID taskid = null;
 
+  /*
+   * Expected arguments args[0] - host, args[1] - port, args[2] - JobId, args[3]
+   * - TaskType, args[4] - jvm integer id, rest are log redirects etc.
+   */
   public static void main(String[] args) throws Throwable {
     LOG.info("XXX: Child starting");
 
@@ -81,20 +84,22 @@ class YarnChild2 {
     int port = Integer.parseInt(args[1]);
     final InetSocketAddress address =
         NetUtils.createSocketAddrForHost(host, port);
-    final TaskAttemptID firstTaskid = TaskAttemptID.forName(args[2]);
-    int jvmIdInt = Integer.parseInt(args[3]);
-    JVMId jvmId = new JVMId(firstTaskid.getJobID(),
-        firstTaskid.getTaskType() == TaskType.MAP, jvmIdInt);
+
+    final JobID jobID = JobID.forName(args[2]);
+    final TaskType taskType = TaskType.valueOf(args[3]);
+    
+    final int jvmIdInt = Integer.parseInt(args[4]);
+    JVMId jvmId = new JVMId(jobID, taskType == TaskType.MAP, jvmIdInt);
 
     // initialize metrics
     DefaultMetricsSystem.initialize(
-        StringUtils.camelize(firstTaskid.getTaskType().name()) +"Task");
+        StringUtils.camelize(taskType.name()) +"Task");
 
     Token<JobTokenIdentifier> jt = loadCredentials(defaultConf, address);
 
     // Create TaskUmbilicalProtocol as actual task owner.
     UserGroupInformation taskOwner =
-      UserGroupInformation.createRemoteUser(firstTaskid.getJobID().toString());
+      UserGroupInformation.createRemoteUser(jobID.toString());
     taskOwner.addToken(jt);
     final TaskUmbilicalProtocol umbilical =
       taskOwner.doAs(new PrivilegedExceptionAction<TaskUmbilicalProtocol>() {

Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/MRAppMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/MRAppMaster.java?rev=1388596&r1=1388595&r2=1388596&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/MRAppMaster.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/MRAppMaster.java Fri Sep 21 18:11:35 2012
@@ -270,10 +270,12 @@ public class MRAppMaster extends Composi
     addIfService(containerHeartbeatHandler);
     
     //service to handle requests to TaskUmbilicalProtocol
-    taskAttemptListener = createTaskAttemptListener(context, taskHeartbeatHandler, containerHeartbeatHandler);
+    taskAttemptListener = createTaskAttemptListener(context,
+        taskHeartbeatHandler, containerHeartbeatHandler);
     addIfService(taskAttemptListener);
 
-    containers = new AMContainerMap(containerHeartbeatHandler, taskAttemptListener, dispatcher.getEventHandler(), context);
+    containers = new AMContainerMap(containerHeartbeatHandler,
+        taskAttemptListener, context);
     addIfService(containers);
     dispatcher.register(AMContainerEventType.class, containers);
     

Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TaskAttemptImplHelpers.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TaskAttemptImplHelpers.java?rev=1388596&r1=1388595&r2=1388596&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TaskAttemptImplHelpers.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TaskAttemptImplHelpers.java Fri Sep 21 18:11:35 2012
@@ -1,271 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.hadoop.mapreduce.v2.app2.job.impl;
 
-import java.io.IOException;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.regex.Pattern;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.mapred.MapReduceChildJVM2;
-import org.apache.hadoop.mapred.ShuffleHandler;
-import org.apache.hadoop.mapred.Task;
-import org.apache.hadoop.mapred.WrappedJvmID;
-import org.apache.hadoop.mapreduce.MRJobConfig;
-import org.apache.hadoop.mapreduce.security.TokenCache;
-import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
-import org.apache.hadoop.mapreduce.v2.app2.TaskAttemptListener;
-import org.apache.hadoop.mapreduce.v2.util.MRApps;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.yarn.YarnException;
-import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
-import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.api.records.LocalResourceType;
-import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.URL;
-import org.apache.hadoop.yarn.util.Apps;
-import org.apache.hadoop.yarn.util.BuilderUtils;
-import org.apache.hadoop.yarn.util.ConverterUtils;
 
 public class TaskAttemptImplHelpers {
 
   private static final Log LOG = LogFactory.getLog(TaskAttemptImplHelpers.class);
-  private static Object commonContainerSpecLock = new Object();
-  private static ContainerLaunchContext commonContainerSpec = null;
-  private static final Object classpathLock = new Object();
-  private static AtomicBoolean initialClasspathFlag = new AtomicBoolean();
-  private static String initialClasspath = null;
-  
-  /**
-   * Create the common {@link ContainerLaunchContext} for all attempts.
-   * @param applicationACLs 
-   */
-  private static ContainerLaunchContext createCommonContainerLaunchContext(
-      Map<ApplicationAccessType, String> applicationACLs, Configuration conf,
-      Token<JobTokenIdentifier> jobToken,
-      final org.apache.hadoop.mapred.JobID oldJobId,
-      Credentials credentials) {
-
-    // Application resources
-    Map<String, LocalResource> localResources = 
-        new HashMap<String, LocalResource>();
-    
-    // Application environment
-    Map<String, String> environment = new HashMap<String, String>();
-
-    // Service data
-    Map<String, ByteBuffer> serviceData = new HashMap<String, ByteBuffer>();
-
-    // Tokens
-    ByteBuffer taskCredentialsBuffer = ByteBuffer.wrap(new byte[]{});
-    try {
-      FileSystem remoteFS = FileSystem.get(conf);
-
-      // //////////// Set up JobJar to be localized properly on the remote NM.
-      String jobJar = conf.get(MRJobConfig.JAR);
-      if (jobJar != null) {
-        Path remoteJobJar = (new Path(jobJar)).makeQualified(remoteFS
-            .getUri(), remoteFS.getWorkingDirectory());
-        localResources.put(
-            MRJobConfig.JOB_JAR,
-            createLocalResource(remoteFS, remoteJobJar,
-                LocalResourceType.FILE, LocalResourceVisibility.APPLICATION));
-        LOG.info("The job-jar file on the remote FS is "
-            + remoteJobJar.toUri().toASCIIString());
-      } else {
-        // Job jar may be null. For e.g, for pipes, the job jar is the hadoop
-        // mapreduce jar itself which is already on the classpath.
-        LOG.info("Job jar is not present. "
-            + "Not adding any jar to the list of resources.");
-      }
-      // //////////// End of JobJar setup
-
-      // //////////// Set up JobConf to be localized properly on the remote NM.
-      Path path =
-          MRApps.getStagingAreaDir(conf, UserGroupInformation
-              .getCurrentUser().getShortUserName());
-      Path remoteJobSubmitDir =
-          new Path(path, oldJobId.toString());
-      Path remoteJobConfPath = 
-          new Path(remoteJobSubmitDir, MRJobConfig.JOB_CONF_FILE);
-      localResources.put(
-          MRJobConfig.JOB_CONF_FILE,
-          createLocalResource(remoteFS, remoteJobConfPath,
-              LocalResourceType.FILE, LocalResourceVisibility.APPLICATION));
-      LOG.info("The job-conf file on the remote FS is "
-          + remoteJobConfPath.toUri().toASCIIString());
-      // //////////// End of JobConf setup
-
-      // Setup DistributedCache
-      MRApps.setupDistributedCache(conf, localResources);
-
-      // Setup up task credentials buffer
-      Credentials taskCredentials = new Credentials();
-
-      if (UserGroupInformation.isSecurityEnabled()) {
-        LOG.info("Adding #" + credentials.numberOfTokens()
-            + " tokens and #" + credentials.numberOfSecretKeys()
-            + " secret keys for NM use for launching container");
-        taskCredentials.addAll(credentials);
-      }
-
-      // LocalStorageToken is needed irrespective of whether security is enabled
-      // or not.
-      TokenCache.setJobToken(jobToken, taskCredentials);
-
-      DataOutputBuffer containerTokens_dob = new DataOutputBuffer();
-      LOG.info("Size of containertokens_dob is "
-          + taskCredentials.numberOfTokens());
-      taskCredentials.writeTokenStorageToStream(containerTokens_dob);
-      taskCredentialsBuffer =
-          ByteBuffer.wrap(containerTokens_dob.getData(), 0,
-              containerTokens_dob.getLength());
-
-      // Add shuffle token
-      LOG.info("Putting shuffle token in serviceData");
-      serviceData.put(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID,
-          ShuffleHandler.serializeServiceData(jobToken));
-
-      Apps.addToEnvironment(
-          environment,  
-          Environment.CLASSPATH.name(), 
-          getInitialClasspath(conf));
-    } catch (IOException e) {
-      throw new YarnException(e);
-    }
-
-    // Shell
-    environment.put(
-        Environment.SHELL.name(), 
-        conf.get(
-            MRJobConfig.MAPRED_ADMIN_USER_SHELL, 
-            MRJobConfig.DEFAULT_SHELL)
-            );
-
-    // Add pwd to LD_LIBRARY_PATH, add this before adding anything else
-    Apps.addToEnvironment(
-        environment, 
-        Environment.LD_LIBRARY_PATH.name(), 
-        Environment.PWD.$());
-
-    // Add the env variables passed by the admin
-    Apps.setEnvFromInputString(
-        environment, 
-        conf.get(
-            MRJobConfig.MAPRED_ADMIN_USER_ENV, 
-            MRJobConfig.DEFAULT_MAPRED_ADMIN_USER_ENV)
-        );
-
-    // Construct the actual Container
-    // The null fields are per-container and will be constructed for each
-    // container separately.
-    ContainerLaunchContext container = BuilderUtils
-        .newContainerLaunchContext(null, conf
-            .get(MRJobConfig.USER_NAME), null, localResources,
-            environment, null, serviceData, taskCredentialsBuffer,
-            applicationACLs);
-
-    return container;
-  }
-
-  static ContainerLaunchContext createContainerLaunchContext(
-      Map<ApplicationAccessType, String> applicationACLs,
-      ContainerId containerID, Configuration conf,
-      Token<JobTokenIdentifier> jobToken, Task remoteTask,
-      final org.apache.hadoop.mapred.JobID oldJobId,
-      Resource assignedCapability, WrappedJvmID jvmID,
-      TaskAttemptListener taskAttemptListener,
-      Credentials credentials) {
-
-    synchronized (commonContainerSpecLock) {
-      if (commonContainerSpec == null) {
-        commonContainerSpec = createCommonContainerLaunchContext(
-            applicationACLs, conf, jobToken, oldJobId, credentials);
-      }
-    }
-
-    // Fill in the fields needed per-container that are missing in the common
-    // spec.
-
-    // Setup environment by cloning from common env.
-    Map<String, String> env = commonContainerSpec.getEnvironment();
-    Map<String, String> myEnv = new HashMap<String, String>(env.size());
-    myEnv.putAll(env);
-    MapReduceChildJVM2.setVMEnv(myEnv, remoteTask);
-
-    // Set up the launch command
-    List<String> commands = MapReduceChildJVM2.getVMCommand(
-        taskAttemptListener.getAddress(), remoteTask, jvmID);
-
-    // Duplicate the ByteBuffers for access by multiple containers.
-    Map<String, ByteBuffer> myServiceData = new HashMap<String, ByteBuffer>();
-    for (Entry<String, ByteBuffer> entry : commonContainerSpec
-                .getServiceData().entrySet()) {
-      myServiceData.put(entry.getKey(), entry.getValue().duplicate());
-    }
-
-    // Construct the actual Container
-    ContainerLaunchContext container = BuilderUtils.newContainerLaunchContext(
-        containerID, commonContainerSpec.getUser(), assignedCapability,
-        commonContainerSpec.getLocalResources(), myEnv, commands,
-        myServiceData, commonContainerSpec.getContainerTokens().duplicate(),
-        applicationACLs);
-
-    return container;
-  }
-  
-  /**
-   * Create a {@link LocalResource} record with all the given parameters.
-   */
-  private static LocalResource createLocalResource(FileSystem fc, Path file,
-      LocalResourceType type, LocalResourceVisibility visibility)
-      throws IOException {
-    FileStatus fstat = fc.getFileStatus(file);
-    URL resourceURL = ConverterUtils.getYarnUrlFromPath(fc.resolvePath(fstat
-        .getPath()));
-    long resourceSize = fstat.getLen();
-    long resourceModificationTime = fstat.getModificationTime();
-
-    return BuilderUtils.newLocalResource(resourceURL, type, visibility,
-        resourceSize, resourceModificationTime);
-  }
-  
-  /**
-   * Lock this on initialClasspath so that there is only one fork in the AM for
-   * getting the initial class-path. TODO: We already construct
-   * a parent CLC and use it for all the containers, so this should go away
-   * once the mr-generated-classpath stuff is gone.
-   */
-  private static String getInitialClasspath(Configuration conf) throws IOException {
-    synchronized (classpathLock) {
-      if (initialClasspathFlag.get()) {
-        return initialClasspath;
-      }
-      Map<String, String> env = new HashMap<String, String>();
-      MRApps.setClasspath(env, conf);
-      initialClasspath = env.get(Environment.CLASSPATH.name());
-      initialClasspathFlag.set(true);
-      return initialClasspath;
-    }
-  }
-  
+   
   static String[] resolveHosts(String[] src) {
     String[] result = new String[src.length];
     for (int i = 0; i < src.length; i++) {

Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/recover/RecoveryService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/recover/RecoveryService.java?rev=1388596&r1=1388595&r2=1388596&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/recover/RecoveryService.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/recover/RecoveryService.java Fri Sep 21 18:11:35 2012
@@ -31,6 +31,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -568,8 +569,12 @@ public class RecoveryService extends Com
       
       // Request container launch for new containers.
       if (appContext.getAllContainers().get(cId).getState() == AMContainerState.ALLOCATED) {
-        actualHandler.handle(new AMContainerLaunchRequestEvent(cId, event,
-            null, yarnAttemptID.getTaskId().getJobId()));
+        TaskId taskId = yarnAttemptID.getTaskId();
+        AMContainerLaunchRequestEvent lrEvent = new AMContainerLaunchRequestEvent(
+            cId, taskId.getJobId(), taskId.getTaskType(), event.getJobToken(),
+            event.getCredentials(), false, new JobConf(appContext.getJob(
+                taskId.getJobId()).getConf()));
+        actualHandler.handle(lrEvent);
       }
       // Assing the task attempt to this container.
       actualHandler.handle(new AMContainerAssignTAEvent(cId, yarnAttemptID,

Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerAllocator.java?rev=1388596&r1=1388595&r2=1388596&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerAllocator.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerAllocator.java Fri Sep 21 18:11:35 2012
@@ -39,6 +39,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.JobCounter;
 import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.MRJobConfig;
@@ -75,8 +76,8 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.hadoop.yarn.util.BuilderUtils;
 import org.apache.hadoop.yarn.util.RackResolver;
 
 /**
@@ -98,12 +99,9 @@ public class RMContainerAllocator extend
   private volatile boolean stopEventHandling;
 
   static {
-    PRIORITY_FAST_FAIL_MAP = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Priority.class);
-    PRIORITY_FAST_FAIL_MAP.setPriority(5);
-    PRIORITY_REDUCE = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Priority.class);
-    PRIORITY_REDUCE.setPriority(10);
-    PRIORITY_MAP = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Priority.class);
-    PRIORITY_MAP.setPriority(20);
+    PRIORITY_FAST_FAIL_MAP = BuilderUtils.newPriority(5);
+    PRIORITY_REDUCE = BuilderUtils.newPriority(10);
+    PRIORITY_MAP = BuilderUtils.newPriority(20);
   }
   
   protected final AppContext appContext;
@@ -970,10 +968,17 @@ public class RMContainerAllocator extend
               // TODO Maybe: ApplicationACLs should be populated into the appContext from the RMCommunicator.
 
               if (appContext.getAllContainers().get(containerId).getState() == AMContainerState.ALLOCATED) {
-                eventHandler.handle(new AMContainerLaunchRequestEvent(
-                    containerId, attemptToLaunchRequestMap.get(assigned
-                        .getAttemptId()), requestor.getApplicationAcls(),
-                    getJob().getID()));
+                AMSchedulerTALaunchRequestEvent tlrEvent = attemptToLaunchRequestMap
+                    .get(assigned.getAttemptId());
+                JobConf jobConf = new JobConf(job.getConf());
+
+                AMContainerLaunchRequestEvent launchRequest = new AMContainerLaunchRequestEvent(
+                    containerId, jobId, assigned.getAttemptId().getTaskId()
+                        .getTaskType(), tlrEvent.getJobToken(),
+                    tlrEvent.getCredentials(), shouldProfileTaskAttempt(
+                        jobConf, tlrEvent.getRemoteTask()), jobConf);
+
+                eventHandler.handle(launchRequest);
               }
               eventHandler.handle(new AMContainerAssignTAEvent(containerId,
                   assigned.getAttemptId(), attemptToLaunchRequestMap.get(
@@ -1239,6 +1244,22 @@ public class RMContainerAllocator extend
     return newReq;
   }
 
+  /*
+   * Not very useful for a re-use scheduler.
+   */
+  protected boolean shouldProfileTaskAttempt(JobConf conf,
+      org.apache.hadoop.mapred.Task remoteTask) {
+    TaskType taskType = TypeConverter.toYarn(remoteTask.getTaskID()
+        .getTaskType());
+    if (conf.getProfileEnabled()) {
+      if (conf.getProfileTaskRange(taskType == TaskType.MAP).isIncluded(
+          remoteTask.getPartition())) {
+        return true;
+      }
+    }
+    return false;
+  }
+
   private static class ContainerRequestInfo {
     ContainerRequestInfo(ContainerRequest containerRequest,
         AMSchedulerTALaunchRequestEvent launchRequestEvent) {

Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerHelpers.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerHelpers.java?rev=1388596&view=auto
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerHelpers.java (added)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerHelpers.java Fri Sep 21 18:11:35 2012
@@ -0,0 +1,271 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app2.rm.container;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceChildJVM2;
+import org.apache.hadoop.mapred.ShuffleHandler;
+import org.apache.hadoop.mapred.WrappedJvmID;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.security.TokenCache;
+import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
+import org.apache.hadoop.mapreduce.v2.app2.TaskAttemptListener;
+import org.apache.hadoop.mapreduce.v2.util.MRApps;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.URL;
+import org.apache.hadoop.yarn.util.Apps;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+
+public class AMContainerHelpers {
+
+  private static final Log LOG = LogFactory.getLog(AMContainerHelpers.class);
+
+  private static Object commonContainerSpecLock = new Object();
+  private static ContainerLaunchContext commonContainerSpec = null;
+  private static final Object classpathLock = new Object();
+  private static AtomicBoolean initialClasspathFlag = new AtomicBoolean();
+  private static String initialClasspath = null;
+
+  /**
+   * Create a {@link LocalResource} record with all the given parameters.
+   */
+  private static LocalResource createLocalResource(FileSystem fc, Path file,
+      LocalResourceType type, LocalResourceVisibility visibility)
+      throws IOException {
+    FileStatus fstat = fc.getFileStatus(file);
+    URL resourceURL = ConverterUtils.getYarnUrlFromPath(fc.resolvePath(fstat
+        .getPath()));
+    long resourceSize = fstat.getLen();
+    long resourceModificationTime = fstat.getModificationTime();
+
+    return BuilderUtils.newLocalResource(resourceURL, type, visibility,
+        resourceSize, resourceModificationTime);
+  }
+
+  /**
+   * Lock this on initialClasspath so that there is only one fork in the AM for
+   * getting the initial class-path. TODO: We already construct a parent CLC and
+   * use it for all the containers, so this should go away once the
+   * mr-generated-classpath stuff is gone.
+   */
+  private static String getInitialClasspath(Configuration conf)
+      throws IOException {
+    synchronized (classpathLock) {
+      if (initialClasspathFlag.get()) {
+        return initialClasspath;
+      }
+      Map<String, String> env = new HashMap<String, String>();
+      MRApps.setClasspath(env, conf);
+      initialClasspath = env.get(Environment.CLASSPATH.name());
+      initialClasspathFlag.set(true);
+      return initialClasspath;
+    }
+  }
+
+  /**
+   * Create the common {@link ContainerLaunchContext} for all attempts.
+   * 
+   * @param applicationACLs
+   */
+  private static ContainerLaunchContext createCommonContainerLaunchContext(
+      Map<ApplicationAccessType, String> applicationACLs, Configuration conf,
+      Token<JobTokenIdentifier> jobToken,
+      final org.apache.hadoop.mapred.JobID oldJobId, Credentials credentials) {
+
+    // Application resources
+    Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
+
+    // Application environment
+    Map<String, String> environment = new HashMap<String, String>();
+
+    // Service data
+    Map<String, ByteBuffer> serviceData = new HashMap<String, ByteBuffer>();
+
+    // Tokens
+    ByteBuffer taskCredentialsBuffer = ByteBuffer.wrap(new byte[] {});
+    try {
+      FileSystem remoteFS = FileSystem.get(conf);
+
+      // //////////// Set up JobJar to be localized properly on the remote NM.
+      String jobJar = conf.get(MRJobConfig.JAR);
+      if (jobJar != null) {
+        Path remoteJobJar = (new Path(jobJar)).makeQualified(remoteFS.getUri(),
+            remoteFS.getWorkingDirectory());
+        localResources.put(
+            MRJobConfig.JOB_JAR,
+            createLocalResource(remoteFS, remoteJobJar, LocalResourceType.FILE,
+                LocalResourceVisibility.APPLICATION));
+        LOG.info("The job-jar file on the remote FS is "
+            + remoteJobJar.toUri().toASCIIString());
+      } else {
+        // Job jar may be null. For e.g, for pipes, the job jar is the hadoop
+        // mapreduce jar itself which is already on the classpath.
+        LOG.info("Job jar is not present. "
+            + "Not adding any jar to the list of resources.");
+      }
+      // //////////// End of JobJar setup
+
+      // //////////// Set up JobConf to be localized properly on the remote NM.
+      Path path = MRApps.getStagingAreaDir(conf, UserGroupInformation
+          .getCurrentUser().getShortUserName());
+      Path remoteJobSubmitDir = new Path(path, oldJobId.toString());
+      Path remoteJobConfPath = new Path(remoteJobSubmitDir,
+          MRJobConfig.JOB_CONF_FILE);
+      localResources.put(
+          MRJobConfig.JOB_CONF_FILE,
+          createLocalResource(remoteFS, remoteJobConfPath,
+              LocalResourceType.FILE, LocalResourceVisibility.APPLICATION));
+      LOG.info("The job-conf file on the remote FS is "
+          + remoteJobConfPath.toUri().toASCIIString());
+      // //////////// End of JobConf setup
+
+      // Setup DistributedCache
+      MRApps.setupDistributedCache(conf, localResources);
+
+      // Setup up task credentials buffer
+      Credentials taskCredentials = new Credentials();
+
+      if (UserGroupInformation.isSecurityEnabled()) {
+        LOG.info("Adding #" + credentials.numberOfTokens() + " tokens and #"
+            + credentials.numberOfSecretKeys()
+            + " secret keys for NM use for launching container");
+        taskCredentials.addAll(credentials);
+      }
+
+      // LocalStorageToken is needed irrespective of whether security is enabled
+      // or not.
+      TokenCache.setJobToken(jobToken, taskCredentials);
+
+      DataOutputBuffer containerTokens_dob = new DataOutputBuffer();
+      LOG.info("Size of containertokens_dob is "
+          + taskCredentials.numberOfTokens());
+      taskCredentials.writeTokenStorageToStream(containerTokens_dob);
+      taskCredentialsBuffer = ByteBuffer.wrap(containerTokens_dob.getData(), 0,
+          containerTokens_dob.getLength());
+
+      // Add shuffle token
+      LOG.info("Putting shuffle token in serviceData");
+      serviceData.put(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID,
+          ShuffleHandler.serializeServiceData(jobToken));
+
+      Apps.addToEnvironment(environment, Environment.CLASSPATH.name(),
+          getInitialClasspath(conf));
+    } catch (IOException e) {
+      throw new YarnException(e);
+    }
+
+    // Shell
+    environment.put(Environment.SHELL.name(), conf.get(
+        MRJobConfig.MAPRED_ADMIN_USER_SHELL, MRJobConfig.DEFAULT_SHELL));
+
+    // Add pwd to LD_LIBRARY_PATH, add this before adding anything else
+    Apps.addToEnvironment(environment, Environment.LD_LIBRARY_PATH.name(),
+        Environment.PWD.$());
+
+    // Add the env variables passed by the admin
+    Apps.setEnvFromInputString(environment, conf.get(
+        MRJobConfig.MAPRED_ADMIN_USER_ENV,
+        MRJobConfig.DEFAULT_MAPRED_ADMIN_USER_ENV));
+
+    // Construct the actual Container
+    // The null fields are per-container and will be constructed for each
+    // container separately.
+    ContainerLaunchContext container = BuilderUtils.newContainerLaunchContext(
+        null, conf.get(MRJobConfig.USER_NAME), null, localResources,
+        environment, null, serviceData, taskCredentialsBuffer, applicationACLs);
+
+    return container;
+  }
+
+  static ContainerLaunchContext createContainerLaunchContext(
+      Map<ApplicationAccessType, String> applicationACLs,
+      ContainerId containerID, JobConf jobConf, TaskType taskType,
+      Token<JobTokenIdentifier> jobToken,
+      final org.apache.hadoop.mapred.JobID oldJobId,
+      Resource assignedCapability, WrappedJvmID jvmID,
+      TaskAttemptListener taskAttemptListener, Credentials credentials,
+      boolean shouldProfile) {
+
+    synchronized (commonContainerSpecLock) {
+      if (commonContainerSpec == null) {
+        commonContainerSpec = createCommonContainerLaunchContext(
+            applicationACLs, jobConf, jobToken, oldJobId, credentials);
+      }
+    }
+
+    // Fill in the fields needed per-container that are missing in the common
+    // spec.
+
+    // Setup environment by cloning from common env.
+    Map<String, String> env = commonContainerSpec.getEnvironment();
+    Map<String, String> myEnv = new HashMap<String, String>(env.size());
+    myEnv.putAll(env);
+    MapReduceChildJVM2.setVMEnv(myEnv, jobConf, taskType);
+
+    // Set up the launch command
+    List<String> commands = MapReduceChildJVM2.getVMCommand(
+        taskAttemptListener.getAddress(), jobConf, taskType, jvmID,
+        oldJobId, shouldProfile);
+
+    // Duplicate the ByteBuffers for access by multiple containers.
+    Map<String, ByteBuffer> myServiceData = new HashMap<String, ByteBuffer>();
+    for (Entry<String, ByteBuffer> entry : commonContainerSpec.getServiceData()
+        .entrySet()) {
+      myServiceData.put(entry.getKey(), entry.getValue().duplicate());
+    }
+
+    // Construct the actual Container
+    ContainerLaunchContext container = BuilderUtils.newContainerLaunchContext(
+        containerID, commonContainerSpec.getUser(), assignedCapability,
+        commonContainerSpec.getLocalResources(), myEnv, commands,
+        myServiceData, commonContainerSpec.getContainerTokens().duplicate(),
+        applicationACLs);
+
+    return container;
+  }
+
+}

Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerImpl.java?rev=1388596&r1=1388595&r2=1388596&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerImpl.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerImpl.java Fri Sep 21 18:11:35 2012
@@ -18,8 +18,6 @@
 
 package org.apache.hadoop.mapreduce.v2.app2.rm.container;
 
-import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.EnumSet;
@@ -27,56 +25,30 @@ import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.mapred.MapReduceChildJVM2;
-import org.apache.hadoop.mapred.ShuffleHandler;
-import org.apache.hadoop.mapred.Task;
+import org.apache.hadoop.mapred.JobID;
 import org.apache.hadoop.mapred.WrappedJvmID;
-import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.jobhistory.ContainerHeartbeatHandler;
-import org.apache.hadoop.mapreduce.security.TokenCache;
-import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.mapreduce.v2.app2.AppContext;
 import org.apache.hadoop.mapreduce.v2.app2.TaskAttemptListener;
 import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptDiagnosticsUpdateEvent;
-import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEvent;
 import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventKillRequest;
 import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventTerminated;
-import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventType;
 import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerEventContainerCompleted;
-import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerTALaunchRequestEvent;
 import org.apache.hadoop.mapreduce.v2.app2.rm.NMCommunicatorLaunchRequestEvent;
 import org.apache.hadoop.mapreduce.v2.app2.rm.NMCommunicatorStopRequestEvent;
 import org.apache.hadoop.mapreduce.v2.app2.rm.RMCommunicatorContainerDeAllocateRequestEvent;
-import org.apache.hadoop.mapreduce.v2.util.MRApps;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.yarn.YarnException;
-import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
-import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.api.records.LocalResourceType;
-import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.URL;
 import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
@@ -84,10 +56,8 @@ import org.apache.hadoop.yarn.state.Mult
 import org.apache.hadoop.yarn.state.SingleArcTransition;
 import org.apache.hadoop.yarn.state.StateMachine;
 import org.apache.hadoop.yarn.state.StateMachineFactory;
-import org.apache.hadoop.yarn.util.Apps;
-import org.apache.hadoop.yarn.util.BuilderUtils;
-import org.apache.hadoop.yarn.util.ConverterUtils;
 
+@SuppressWarnings("rawtypes")
 public class AMContainerImpl implements AMContainer {
 
   private static final Log LOG = LogFactory.getLog(AMContainerImpl.class);
@@ -101,13 +71,8 @@ public class AMContainerImpl implements 
   private final AppContext appContext;
   private final ContainerHeartbeatHandler containerHeartbeatHandler;
   private final TaskAttemptListener taskAttemptListener;
-  
-  private static Object commonContainerSpecLock = new Object();
-  private static ContainerLaunchContext commonContainerSpec = null;
-  private static final Object classpathLock = new Object();
-  private static AtomicBoolean initialClasspathFlag = new AtomicBoolean();
-  private static String initialClasspath = null;
-  
+  protected final EventHandler eventHandler;
+
   private final List<TaskAttemptId> completedAttempts = new LinkedList<TaskAttemptId>();
 
   // TODO Maybe this should be pulled from the TaskAttempt.s
@@ -130,8 +95,7 @@ public class AMContainerImpl implements 
   
   private ContainerLaunchContext clc;
   private WrappedJvmID jvmId;
-  @SuppressWarnings("rawtypes")
-  protected EventHandler eventHandler;
+  
 
   private static boolean stateMachineInited = false;
   private static StateMachineFactory
@@ -194,16 +158,17 @@ public class AMContainerImpl implements 
  
         .installTopology();
   }
-  
-  @SuppressWarnings("rawtypes")
+
+  // Attempting to use a container based purely on reosurces required, etc needs
+  // additional change - JvmID, YarnChild, etc depend on TaskType.
   public AMContainerImpl(Container container, ContainerHeartbeatHandler chh,
-      TaskAttemptListener tal, EventHandler eventHandler, AppContext appContext) {
+      TaskAttemptListener tal, AppContext appContext) {
     ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
     this.readLock = rwLock.readLock();
     this.writeLock = rwLock.writeLock();
     this.container = container;
     this.containerId = container.getId();
-    this.eventHandler = eventHandler;
+    this.eventHandler = appContext.getEventHandler();
     this.appContext = appContext;
     this.containerHeartbeatHandler = chh;
     this.taskAttemptListener = tal;
@@ -307,8 +272,7 @@ public class AMContainerImpl implements 
     this.eventHandler.handle(event);
   }
 
-  // TODO Maybe have pullTA send out an attemptId. TAL talks to the TaskAttempt
-  // to fetch the actual RemoteTask.
+  // Push the TaskAttempt to the TAL, instead of the TAL pulling when a JVM asks for a TaskAttempt.
   public org.apache.hadoop.mapred.Task pullTaskAttempt() {
     this.writeLock.lock();
     try {
@@ -333,23 +297,29 @@ public class AMContainerImpl implements 
     @Override
     public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
       AMContainerLaunchRequestEvent event = (AMContainerLaunchRequestEvent) cEvent;
-      AMSchedulerTALaunchRequestEvent taEvent = event.getLaunchRequestEvent();
-      
-      // TODO LATER May be possible to forget about the clc or a part of it after
-      // launch. Save AM resources.
-      
-      container.jvmId = new WrappedJvmID(taEvent.getRemoteTask().getJobID(), taEvent.getRemoteTask().isMapTask(), container.containerId.getId());
 
-      container.clc = createContainerLaunchContext(
-          event.getApplicationAcls(), container.getContainerId(),
-          container.appContext.getJob(event.getJobId()).getConf(), taEvent.getJobToken(),
-          taEvent.getRemoteTask(), TypeConverter.fromYarn(event.getJobId()),
+      JobID oldJobID = TypeConverter.fromYarn(event.getJobId());
+      container.jvmId = new WrappedJvmID(oldJobID,
+          event.getTaskTypeForContainer() == TaskType.MAP,
+          container.containerId.getId());
+      
+      container.clc = AMContainerHelpers.createContainerLaunchContext(
+          container.appContext.getApplicationACLs(),
+          container.getContainerId(), event.getJobConf(),
+          event.getTaskTypeForContainer(), event.getJobToken(),
+          TypeConverter.fromYarn(event.getJobId()),
           container.getContainer().getResource(), container.jvmId,
-          container.taskAttemptListener, taEvent.getCredentials());
+          container.taskAttemptListener, event.getCredentials(),
+          event.shouldProfile());
       
-      container.sendEvent(new NMCommunicatorLaunchRequestEvent(container.clc, container.container));
+      container.sendEvent(new NMCommunicatorLaunchRequestEvent(container.clc,
+          container.container));
       LOG.info("Sending Launch Request for Container with id: "
           + container.clc.getContainerId());
+      // Forget about the clc to save resources. At some point, part of the clc
+      // info may need to be exposed to the scheduler to figure out whether a 
+      // container can be used for a specific TaskAttempt.
+      container.clc = null;
     }
   }
   
@@ -364,7 +334,9 @@ public class AMContainerImpl implements 
     public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
       AMContainerAssignTAEvent event = (AMContainerAssignTAEvent) cEvent;
       container.inError = true;
-      container.sendTerminatedToTaskAttempt(event.getTaskAttemptId());
+      container.sendTerminatedToTaskAttempt(event.getTaskAttemptId(),
+          "AMScheduler Error: TaskAttempt should not be" +
+          " allocated before a launch request.");
       container.sendCompletedToScheduler();
       container.deAllocate();
       LOG.warn("Unexpected TA Assignment: TAId: " + event.getTaskAttemptId()
@@ -422,8 +394,11 @@ public class AMContainerImpl implements 
   protected void sendCompletedToScheduler() {
     sendEvent(new AMSchedulerEventContainerCompleted(containerId));
   }
-  
-  protected void sendTerminatedToTaskAttempt(TaskAttemptId taId) {
+
+  protected void sendTerminatedToTaskAttempt(TaskAttemptId taId, String message) {
+    if (message != null) {
+      sendEvent(new TaskAttemptDiagnosticsUpdateEvent(taId, message));
+    }
     sendEvent(new TaskAttemptEventTerminated(taId));
   }
 
@@ -462,14 +437,19 @@ public class AMContainerImpl implements 
       AMContainerAssignTAEvent event = (AMContainerAssignTAEvent) cEvent;
       if (container.pendingAttempt != null) {
         container.inError = true;
-        container.sendTerminatedToTaskAttempt(event.getTaskAttemptId());
+        String errorMessage = "AMScheduler Error: Multiple simultaneous " +
+        		"taskAttempt allocations to: " + container.getContainerId();
+        container.sendTerminatedToTaskAttempt(event.getTaskAttemptId(),
+            errorMessage);
         container.deAllocate();
+        LOG.warn(errorMessage);
         return AMContainerState.STOPPING;
       }
       container.pendingAttempt = event.getTaskAttemptId();
       container.remoteTaskMap.put(event.getTaskAttemptId(),
           event.getRemoteTask());
-      // TODO XXX: Consider registering with the TAL, instead of the TAL pulling.
+      // TODO Consider registering with the TAL, instead of the TAL pulling.
+      // Possibly after splitting TAL and ContainerListener.
       return container.getState();
     }
   }
@@ -515,7 +495,6 @@ public class AMContainerImpl implements 
         }
         LOG.info("XXX: Assigned task + [" + container.runningAttempt + "] to container: [" + container.getContainerId() + "]");
         return AMContainerState.RUNNING;
-        // TODO XXX: Make sure the TAL sends out a TA_STARTED_REMOTELY, along with the shuffle port.
       } else {
         return AMContainerState.IDLE;
       }
@@ -531,7 +510,6 @@ public class AMContainerImpl implements 
       SingleArcTransition<AMContainerImpl, AMContainerEvent> {
     @Override
     public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
-      // TODO XXX: Send diagnostics to pending task attempt. Update action in transition table.
       if (container.pendingAttempt != null) {
         AMContainerEventLaunchFailed event = (AMContainerEventLaunchFailed) cEvent;
         container.sendEvent(new TaskAttemptDiagnosticsUpdateEvent(
@@ -551,7 +529,10 @@ public class AMContainerImpl implements 
     @Override
     public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
       if (container.pendingAttempt != null) {
-        container.sendTerminatedToTaskAttempt(container.pendingAttempt);
+        String errorMessage = "Container" + container.getContainerId()
+            + " failed. Received COMPLETED event while trying to launch";
+        container.sendTerminatedToTaskAttempt(container.pendingAttempt,errorMessage);
+        LOG.warn(errorMessage);    
         // TODO XXX Maybe nullify pendingAttempt.
       }
       container.sendCompletedToScheduler();
@@ -605,10 +586,15 @@ public class AMContainerImpl implements 
       AMContainerAssignTAEvent event = (AMContainerAssignTAEvent) cEvent;
       if (container.pendingAttempt != null) {
         container.inError = true;
-        container.sendTerminatedToTaskAttempt(event.getTaskAttemptId());
+        String errorMessage = "AMScheduler Error: Multiple simultaneous "
+            + "taskAttempt allocations to: " + container.getContainerId();
+        container.sendTerminatedToTaskAttempt(event.getTaskAttemptId(),
+            errorMessage);
+        LOG.warn(errorMessage);
         container.sendStopRequestToNM();
         container.deAllocate();
         container.containerHeartbeatHandler.unregister(container.containerId);
+        
         return AMContainerState.STOPPING;
       }
       container.pendingAttempt = event.getTaskAttemptId();
@@ -631,7 +617,7 @@ public class AMContainerImpl implements 
       LOG.info("Cotnainer with id: " + container.getContainerId()
           + " Completed." + " Previous state was: " + container.getState());
       if (container.pendingAttempt != null) {
-        container.sendTerminatedToTaskAttempt(container.pendingAttempt);
+        container.sendTerminatedToTaskAttempt(container.pendingAttempt, null);
       }
       container.sendCompletedToScheduler();
       container.containerHeartbeatHandler.unregister(container.containerId);
@@ -689,7 +675,7 @@ public class AMContainerImpl implements 
       SingleArcTransition<AMContainerImpl, AMContainerEvent> {
     @Override
     public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
-      container.sendTerminatedToTaskAttempt(container.runningAttempt);
+      container.sendTerminatedToTaskAttempt(container.runningAttempt, null);
       container.sendCompletedToScheduler();
       container.containerHeartbeatHandler.unregister(container.containerId);
       container.unregisterAttemptFromListener(container.runningAttempt);
@@ -756,7 +742,11 @@ public class AMContainerImpl implements 
     public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
       AMContainerAssignTAEvent event = (AMContainerAssignTAEvent) cEvent;
       container.inError = true;
-      container.sendTerminatedToTaskAttempt(event.getTaskAttemptId());
+      String errorMessage = "AttemptId: " + event.getTaskAttemptId()
+          + " cannot be allocated to container: " + container.getContainerId()
+          + " in STOPPING state";
+      container.sendTerminatedToTaskAttempt(event.getTaskAttemptId(),
+          errorMessage);
     }
   }
 
@@ -784,7 +774,11 @@ public class AMContainerImpl implements 
     @Override
     public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
       AMContainerAssignTAEvent event = (AMContainerAssignTAEvent) cEvent;
-      container.sendTerminatedToTaskAttempt(event.getTaskAttemptId());
+      String errorMessage = "AttemptId: " + event.getTaskAttemptId()
+          + " cannot be allocated to container: " + container.getContainerId()
+          + " in COMPLETED state";
+      container.sendTerminatedToTaskAttempt(event.getTaskAttemptId(),
+          errorMessage);
     }
   }
   
@@ -797,14 +791,15 @@ public class AMContainerImpl implements 
       SingleArcTransition<AMContainerImpl, AMContainerEvent> {
     @Override
     public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+      // XXX: Would some of these events not have gone out when entering the STOPPING state. Fix errorMessages
       if (container.pendingAttempt != null) {
-        container.sendTerminatedToTaskAttempt(container.pendingAttempt);
+        container.sendTerminatedToTaskAttempt(container.pendingAttempt, null);
       }
       if (container.runningAttempt != null) {
-        container.sendTerminatedToTaskAttempt(container.runningAttempt);
+        container.sendTerminatedToTaskAttempt(container.runningAttempt, null);
       }
       if (container.interruptedEvent != null) {
-        container.sendTerminatedToTaskAttempt(container.interruptedEvent);
+        container.sendTerminatedToTaskAttempt(container.interruptedEvent, null);
       }
       container.sendCompletedToScheduler();
     }
@@ -876,7 +871,10 @@ public class AMContainerImpl implements 
     public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
       AMContainerAssignTAEvent event = (AMContainerAssignTAEvent) cEvent;
       container.inError = true;
-      container.sendTerminatedToTaskAttempt(event.getTaskAttemptId());
+      String errorMessage = "AttemptId: " + event.getTaskAttemptId()
+          + " cannot be allocated to container: " + container.getContainerId()
+          + " in RUNNING state";
+      container.sendTerminatedToTaskAttempt(event.getTaskAttemptId(), errorMessage);
       container.sendStopRequestToNM();
       container.deAllocate();
       container.unregisterAttemptFromListener(container.runningAttempt);
@@ -948,217 +946,5 @@ public class AMContainerImpl implements 
 
   // TODO Create a generic ERROR state. Container tries informing relevant components in this case.
   
-  /**
-   * Create a {@link LocalResource} record with all the given parameters.
-   */
-  private static LocalResource createLocalResource(FileSystem fc, Path file,
-      LocalResourceType type, LocalResourceVisibility visibility)
-      throws IOException {
-    FileStatus fstat = fc.getFileStatus(file);
-    URL resourceURL = ConverterUtils.getYarnUrlFromPath(fc.resolvePath(fstat
-        .getPath()));
-    long resourceSize = fstat.getLen();
-    long resourceModificationTime = fstat.getModificationTime();
-
-    return BuilderUtils.newLocalResource(resourceURL, type, visibility,
-        resourceSize, resourceModificationTime);
-  }
-
-  /**
-   * Lock this on initialClasspath so that there is only one fork in the AM for
-   * getting the initial class-path. TODO: We already construct
-   * a parent CLC and use it for all the containers, so this should go away
-   * once the mr-generated-classpath stuff is gone.
-   */
-  private static String getInitialClasspath(Configuration conf) throws IOException {
-    synchronized (classpathLock) {
-      if (initialClasspathFlag.get()) {
-        return initialClasspath;
-      }
-      Map<String, String> env = new HashMap<String, String>();
-      MRApps.setClasspath(env, conf);
-      initialClasspath = env.get(Environment.CLASSPATH.name());
-      initialClasspathFlag.set(true);
-      return initialClasspath;
-    }
-  }
-  
-  /**
-   * Create the common {@link ContainerLaunchContext} for all attempts.
-   * @param applicationACLs 
-   */
-  private static ContainerLaunchContext createCommonContainerLaunchContext(
-      Map<ApplicationAccessType, String> applicationACLs, Configuration conf,
-      Token<JobTokenIdentifier> jobToken,
-      final org.apache.hadoop.mapred.JobID oldJobId,
-      Credentials credentials) {
-
-    // Application resources
-    Map<String, LocalResource> localResources = 
-        new HashMap<String, LocalResource>();
-    
-    // Application environment
-    Map<String, String> environment = new HashMap<String, String>();
-
-    // Service data
-    Map<String, ByteBuffer> serviceData = new HashMap<String, ByteBuffer>();
-
-    // Tokens
-    ByteBuffer taskCredentialsBuffer = ByteBuffer.wrap(new byte[]{});
-    try {
-      FileSystem remoteFS = FileSystem.get(conf);
-
-      // //////////// Set up JobJar to be localized properly on the remote NM.
-      String jobJar = conf.get(MRJobConfig.JAR);
-      if (jobJar != null) {
-        Path remoteJobJar = (new Path(jobJar)).makeQualified(remoteFS
-            .getUri(), remoteFS.getWorkingDirectory());
-        localResources.put(
-            MRJobConfig.JOB_JAR,
-            createLocalResource(remoteFS, remoteJobJar,
-                LocalResourceType.FILE, LocalResourceVisibility.APPLICATION));
-        LOG.info("The job-jar file on the remote FS is "
-            + remoteJobJar.toUri().toASCIIString());
-      } else {
-        // Job jar may be null. For e.g, for pipes, the job jar is the hadoop
-        // mapreduce jar itself which is already on the classpath.
-        LOG.info("Job jar is not present. "
-            + "Not adding any jar to the list of resources.");
-      }
-      // //////////// End of JobJar setup
-
-      // //////////// Set up JobConf to be localized properly on the remote NM.
-      Path path =
-          MRApps.getStagingAreaDir(conf, UserGroupInformation
-              .getCurrentUser().getShortUserName());
-      Path remoteJobSubmitDir =
-          new Path(path, oldJobId.toString());
-      Path remoteJobConfPath = 
-          new Path(remoteJobSubmitDir, MRJobConfig.JOB_CONF_FILE);
-      localResources.put(
-          MRJobConfig.JOB_CONF_FILE,
-          createLocalResource(remoteFS, remoteJobConfPath,
-              LocalResourceType.FILE, LocalResourceVisibility.APPLICATION));
-      LOG.info("The job-conf file on the remote FS is "
-          + remoteJobConfPath.toUri().toASCIIString());
-      // //////////// End of JobConf setup
-
-      // Setup DistributedCache
-      MRApps.setupDistributedCache(conf, localResources);
-
-      // Setup up task credentials buffer
-      Credentials taskCredentials = new Credentials();
-
-      if (UserGroupInformation.isSecurityEnabled()) {
-        LOG.info("Adding #" + credentials.numberOfTokens()
-            + " tokens and #" + credentials.numberOfSecretKeys()
-            + " secret keys for NM use for launching container");
-        taskCredentials.addAll(credentials);
-      }
-
-      // LocalStorageToken is needed irrespective of whether security is enabled
-      // or not.
-      TokenCache.setJobToken(jobToken, taskCredentials);
-
-      DataOutputBuffer containerTokens_dob = new DataOutputBuffer();
-      LOG.info("Size of containertokens_dob is "
-          + taskCredentials.numberOfTokens());
-      taskCredentials.writeTokenStorageToStream(containerTokens_dob);
-      taskCredentialsBuffer =
-          ByteBuffer.wrap(containerTokens_dob.getData(), 0,
-              containerTokens_dob.getLength());
-
-      // Add shuffle token
-      LOG.info("Putting shuffle token in serviceData");
-      serviceData.put(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID,
-          ShuffleHandler.serializeServiceData(jobToken));
-
-      Apps.addToEnvironment(
-          environment,  
-          Environment.CLASSPATH.name(), 
-          getInitialClasspath(conf));
-    } catch (IOException e) {
-      throw new YarnException(e);
-    }
-
-    // Shell
-    environment.put(
-        Environment.SHELL.name(), 
-        conf.get(
-            MRJobConfig.MAPRED_ADMIN_USER_SHELL, 
-            MRJobConfig.DEFAULT_SHELL)
-            );
-
-    // Add pwd to LD_LIBRARY_PATH, add this before adding anything else
-    Apps.addToEnvironment(
-        environment, 
-        Environment.LD_LIBRARY_PATH.name(), 
-        Environment.PWD.$());
-
-    // Add the env variables passed by the admin
-    Apps.setEnvFromInputString(
-        environment, 
-        conf.get(
-            MRJobConfig.MAPRED_ADMIN_USER_ENV, 
-            MRJobConfig.DEFAULT_MAPRED_ADMIN_USER_ENV)
-        );
-
-    // Construct the actual Container
-    // The null fields are per-container and will be constructed for each
-    // container separately.
-    ContainerLaunchContext container = BuilderUtils
-        .newContainerLaunchContext(null, conf
-            .get(MRJobConfig.USER_NAME), null, localResources,
-            environment, null, serviceData, taskCredentialsBuffer,
-            applicationACLs);
-
-    return container;
-  }
-
-  static ContainerLaunchContext createContainerLaunchContext(
-      Map<ApplicationAccessType, String> applicationACLs,
-      ContainerId containerID, Configuration conf,
-      Token<JobTokenIdentifier> jobToken, Task remoteTask,
-      final org.apache.hadoop.mapred.JobID oldJobId,
-      Resource assignedCapability, WrappedJvmID jvmID,
-      TaskAttemptListener taskAttemptListener,
-      Credentials credentials) {
-
-    synchronized (commonContainerSpecLock) {
-      if (commonContainerSpec == null) {
-        commonContainerSpec = createCommonContainerLaunchContext(
-            applicationACLs, conf, jobToken, oldJobId, credentials);
-      }
-    }
 
-    // Fill in the fields needed per-container that are missing in the common
-    // spec.
-
-    // Setup environment by cloning from common env.
-    Map<String, String> env = commonContainerSpec.getEnvironment();
-    Map<String, String> myEnv = new HashMap<String, String>(env.size());
-    myEnv.putAll(env);
-    MapReduceChildJVM2.setVMEnv(myEnv, remoteTask);
-
-    // Set up the launch command
-    List<String> commands = MapReduceChildJVM2.getVMCommand(
-        taskAttemptListener.getAddress(), remoteTask, jvmID);
-
-    // Duplicate the ByteBuffers for access by multiple containers.
-    Map<String, ByteBuffer> myServiceData = new HashMap<String, ByteBuffer>();
-    for (Entry<String, ByteBuffer> entry : commonContainerSpec
-                .getServiceData().entrySet()) {
-      myServiceData.put(entry.getKey(), entry.getValue().duplicate());
-    }
-
-    // Construct the actual Container
-    ContainerLaunchContext container = BuilderUtils.newContainerLaunchContext(
-        containerID, commonContainerSpec.getUser(), assignedCapability,
-        commonContainerSpec.getLocalResources(), myEnv, commands,
-        myServiceData, commonContainerSpec.getContainerTokens().duplicate(),
-        applicationACLs);
-
-    return container;
-  }
-  
 }

Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerLaunchRequestEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerLaunchRequestEvent.java?rev=1388596&r1=1388595&r2=1388596&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerLaunchRequestEvent.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerLaunchRequestEvent.java Fri Sep 21 18:11:35 2012
@@ -1,37 +1,73 @@
-package org.apache.hadoop.mapreduce.v2.app2.rm.container;
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 
-import java.util.Map;
+package org.apache.hadoop.mapreduce.v2.app2.rm.container;
 
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
-import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerTALaunchRequestEvent;
-import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 
 public class AMContainerLaunchRequestEvent extends AMContainerEvent {
 
-  // Temporarily sending in the event from the task.
-//  private final ContainerLaunchContext clc;
-  private final AMSchedulerTALaunchRequestEvent event;
-  private final Map<ApplicationAccessType, String> applicationAcls;
   private final JobId jobId;
-  
-  public AMContainerLaunchRequestEvent(ContainerId containerId, AMSchedulerTALaunchRequestEvent event, Map<ApplicationAccessType, String> applicationAcls, JobId jobId) {
+  private final TaskType taskTypeForContainer;
+  private final Token<JobTokenIdentifier> jobToken;
+  private final Credentials credentials;
+  private final boolean shouldProfile;
+  private final JobConf jobConf;
+
+  public AMContainerLaunchRequestEvent(ContainerId containerId, JobId jobId,
+      TaskType taskType, Token<JobTokenIdentifier> jobToken,
+      Credentials credentials, boolean shouldProfile, JobConf jobConf) {
     super(containerId, AMContainerEventType.C_START_REQUEST);
-    this.event = event;
-    this.applicationAcls = applicationAcls;
     this.jobId = jobId;
-  }
-  
-  // TODO XXX: Temporary. 
-  public AMSchedulerTALaunchRequestEvent getLaunchRequestEvent() {
-    return event;
-  }
-  
-  public Map<ApplicationAccessType, String> getApplicationAcls() {
-    return this.applicationAcls;
+    this.taskTypeForContainer = taskType;
+    this.jobToken = jobToken;
+    this.credentials = credentials;
+    this.shouldProfile = shouldProfile;
+    this.jobConf = jobConf;
   }
 
   public JobId getJobId() {
     return this.jobId;
   }
+
+  public TaskType getTaskTypeForContainer() {
+    return this.taskTypeForContainer;
+  }
+
+  public Token<JobTokenIdentifier> getJobToken() {
+    return this.jobToken;
+  }
+
+  public Credentials getCredentials() {
+    return this.credentials;
+  }
+
+  public boolean shouldProfile() {
+    return this.shouldProfile;
+  }
+
+  public JobConf getJobConf() {
+    return this.jobConf;
+  }
 }

Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerMap.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerMap.java?rev=1388596&r1=1388595&r2=1388596&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerMap.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerMap.java Fri Sep 21 18:11:35 2012
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.hadoop.mapreduce.v2.app2.rm.container;
 
 import java.util.Collection;
@@ -13,25 +31,21 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.service.AbstractService;
 
-public class AMContainerMap extends AbstractService
-    implements EventHandler<AMContainerEvent> {
+public class AMContainerMap extends AbstractService implements
+    EventHandler<AMContainerEvent> {
 
   private static final Log LOG = LogFactory.getLog(AMContainerMap.class);
-  
+
   private final ContainerHeartbeatHandler chh;
   private final TaskAttemptListener tal;
-  @SuppressWarnings("rawtypes")
-  private final EventHandler eventHandler;
   private final AppContext context;
-  private final ConcurrentHashMap<ContainerId, AMContainer>containerMap;
+  private final ConcurrentHashMap<ContainerId, AMContainer> containerMap;
 
-  @SuppressWarnings("rawtypes")
   public AMContainerMap(ContainerHeartbeatHandler chh, TaskAttemptListener tal,
-      EventHandler eventHandler, AppContext context) {
+      AppContext context) {
     super("AMContainerMaps");
     this.chh = chh;
     this.tal = tal;
-    this.eventHandler = eventHandler;
     this.context = context;
     this.containerMap = new ConcurrentHashMap<ContainerId, AMContainer>();
   }
@@ -43,16 +57,15 @@ public class AMContainerMap extends Abst
   }
 
   public void addContainerIfNew(Container container) {
-    AMContainer amc = new AMContainerImpl(container, chh, tal, eventHandler,
-        context);
+    AMContainer amc = new AMContainerImpl(container, chh, tal, context);
     containerMap.putIfAbsent(container.getId(), amc);
   }
-  
+
   public AMContainer get(ContainerId containerId) {
     return containerMap.get(containerId);
   }
-  
-  public Collection<AMContainer>values() {
+
+  public Collection<AMContainer> values() {
     return containerMap.values();
   }
 }
\ No newline at end of file

Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/MRApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/MRApp.java?rev=1388596&r1=1388595&r2=1388596&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/MRApp.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/MRApp.java Fri Sep 21 18:11:35 2012
@@ -33,6 +33,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.WrappedJvmID;
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.JobID;
@@ -637,8 +638,11 @@ public class MRApp extends MRAppMaster {
         if (getContext().getAllContainers().get(cId).getState() == AMContainerState.ALLOCATED) {
           LOG.info("XXX: Sending launch request for container: " + cId
               + " for taskAttemptId: " + lEvent.getAttemptID());
-          getContext().getEventHandler().handle(
-              new AMContainerLaunchRequestEvent(cId, lEvent, appAcls, jobId));
+          AMContainerLaunchRequestEvent lrEvent = new AMContainerLaunchRequestEvent(
+              cId, jobId, lEvent.getAttemptID().getTaskId().getTaskType(),
+              lEvent.getJobToken(), lEvent.getCredentials(), false,
+              new JobConf(getContext().getJob(jobId).getConf()));
+          getContext().getEventHandler().handle(lrEvent);
         }
         LOG.info("XXX: Assigning attempt [" + lEvent.getAttemptID()
             + "] to Container [" + cId + "]");

Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/rm/TestRMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/rm/TestRMContainerAllocator.java?rev=1388596&r1=1388595&r2=1388596&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/rm/TestRMContainerAllocator.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/rm/TestRMContainerAllocator.java Fri Sep 21 18:11:35 2012
@@ -970,7 +970,7 @@ public class TestRMContainerAllocator {
 
     AMContainerMap amContainerMap = new AMContainerMap(
         mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
-        eventHandler, appContext);
+        appContext);
     amContainerMap.init(conf);
     amContainerMap.start();
     when(appContext.getAllContainers()).thenReturn(amContainerMap);



Mime
View raw message