hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject svn commit: r1376283 [2/22] - in /hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client: ./ hadoop-mapreduce-client-app2/ hadoop-mapreduce-client-app2/src/ hadoop-mapreduce-client-app2/src/main/ hadoop-mapreduce-client-app2/sr...
Date Wed, 22 Aug 2012 22:11:48 GMT
Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM2.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM2.java?rev=1376283&view=auto
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM2.java (added)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM2.java Wed Aug 22 22:11:39 2012
@@ -0,0 +1,253 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapred;
+
+import java.net.InetSocketAddress;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Vector;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.TaskLog.LogName;
+import org.apache.hadoop.mapreduce.ID;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.v2.util.MRApps;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.util.Apps;
+
+@SuppressWarnings("deprecation")
+public class MapReduceChildJVM2 {
+
+  private static String getTaskLogFile(LogName filter) {
+    return ApplicationConstants.LOG_DIR_EXPANSION_VAR + Path.SEPARATOR + 
+        filter.toString();
+  }
+
+  private static String getChildEnv(JobConf jobConf, boolean isMap) {
+    if (isMap) {
+      return jobConf.get(JobConf.MAPRED_MAP_TASK_ENV,
+          jobConf.get(JobConf.MAPRED_TASK_ENV));
+    }
+    return jobConf.get(JobConf.MAPRED_REDUCE_TASK_ENV,
+        jobConf.get(JobConf.MAPRED_TASK_ENV));
+  }
+
+  private static String getChildLogLevel(JobConf conf, boolean isMap) {
+    if (isMap) {
+      return conf.get(
+          MRJobConfig.MAP_LOG_LEVEL, 
+          JobConf.DEFAULT_LOG_LEVEL.toString()
+          );
+    } else {
+      return conf.get(
+          MRJobConfig.REDUCE_LOG_LEVEL, 
+          JobConf.DEFAULT_LOG_LEVEL.toString()
+          );
+    }
+  }
+  
+  public static void setVMEnv(Map<String, String> environment,
+      Task task) {
+
+    JobConf conf = task.conf;
+
+    // Add the env variables passed by the user
+    String mapredChildEnv = getChildEnv(conf, task.isMapTask());
+    Apps.setEnvFromInputString(environment, mapredChildEnv);
+
+    // Set logging level in the environment.
+    // This is so that, if the child forks another "bin/hadoop" (common in
+    // streaming) it will have the correct loglevel.
+    environment.put(
+        "HADOOP_ROOT_LOGGER", 
+        getChildLogLevel(conf, task.isMapTask()) + ",CLA"); 
+
+    // TODO: The following is useful for instance in streaming tasks. Should be
+    // set in ApplicationMaster's env by the RM.
+    String hadoopClientOpts = System.getenv("HADOOP_CLIENT_OPTS");
+    if (hadoopClientOpts == null) {
+      hadoopClientOpts = "";
+    } else {
+      hadoopClientOpts = hadoopClientOpts + " ";
+    }
+    // FIXME: don't think this is also needed given we already set java
+    // properties.
+    long logSize = TaskLog.getTaskLogLength(conf);
+    Vector<String> logProps = new Vector<String>(4);
+    setupLog4jProperties(task, logProps, logSize);
+    Iterator<String> it = logProps.iterator();
+    StringBuffer buffer = new StringBuffer();
+    while (it.hasNext()) {
+      buffer.append(" " + it.next());
+    }
+    hadoopClientOpts = hadoopClientOpts + buffer.toString();
+    environment.put("HADOOP_CLIENT_OPTS", hadoopClientOpts);
+
+    // Add stdout/stderr env
+    environment.put(
+        MRJobConfig.STDOUT_LOGFILE_ENV,
+        getTaskLogFile(TaskLog.LogName.STDOUT)
+        );
+    environment.put(
+        MRJobConfig.STDERR_LOGFILE_ENV,
+        getTaskLogFile(TaskLog.LogName.STDERR)
+        );
+    environment.put(MRJobConfig.APPLICATION_ATTEMPT_ID_ENV, 
+        	conf.get(MRJobConfig.APPLICATION_ATTEMPT_ID).toString());
+  }
+
+  private static String getChildJavaOpts(JobConf jobConf, boolean isMapTask) {
+    String userClasspath = "";
+    String adminClasspath = "";
+    if (isMapTask) {
+      userClasspath = 
+          jobConf.get(
+              JobConf.MAPRED_MAP_TASK_JAVA_OPTS, 
+              jobConf.get(
+                  JobConf.MAPRED_TASK_JAVA_OPTS, 
+                  JobConf.DEFAULT_MAPRED_TASK_JAVA_OPTS)
+          );
+      adminClasspath = 
+          jobConf.get(
+              MRJobConfig.MAPRED_MAP_ADMIN_JAVA_OPTS,
+              MRJobConfig.DEFAULT_MAPRED_ADMIN_JAVA_OPTS);
+    } else {
+      userClasspath =
+          jobConf.get(
+              JobConf.MAPRED_REDUCE_TASK_JAVA_OPTS, 
+              jobConf.get(
+                  JobConf.MAPRED_TASK_JAVA_OPTS,
+                  JobConf.DEFAULT_MAPRED_TASK_JAVA_OPTS)
+              );
+      adminClasspath =
+          jobConf.get(
+              MRJobConfig.MAPRED_REDUCE_ADMIN_JAVA_OPTS,
+              MRJobConfig.DEFAULT_MAPRED_ADMIN_JAVA_OPTS);
+    }
+    
+    // Add admin classpath first so it can be overridden by user.
+    return adminClasspath + " " + userClasspath;
+  }
+
+  private static void setupLog4jProperties(Task task,
+      Vector<String> vargs,
+      long logSize) {
+    String logLevel = getChildLogLevel(task.conf, task.isMapTask()); 
+    MRApps.addLog4jSystemProperties(logLevel, logSize, vargs);
+  }
+
+  public static List<String> getVMCommand(
+      InetSocketAddress taskAttemptListenerAddr, Task task, 
+      ID jvmID) {
+
+    TaskAttemptID attemptID = task.getTaskID();
+    JobConf conf = task.conf;
+
+    Vector<String> vargs = new Vector<String>(8);
+
+    vargs.add("exec");
+    vargs.add(Environment.JAVA_HOME.$() + "/bin/java");
+
+    // Add child (task) java-vm options.
+    //
+    // The following symbols if present in mapred.{map|reduce}.child.java.opts 
+    // value are replaced:
+    // + @taskid@ is interpolated with value of TaskID.
+    // Other occurrences of @ will not be altered.
+    //
+    // Example with multiple arguments and substitutions, showing
+    // jvm GC logging, and start of a passwordless JVM JMX agent so can
+    // connect with jconsole and the likes to watch child memory, threads
+    // and get thread dumps.
+    //
+    //  <property>
+    //    <name>mapred.map.child.java.opts</name>
+    //    <value>-Xmx 512M -verbose:gc -Xloggc:/tmp/@taskid@.gc \
+    //           -Dcom.sun.management.jmxremote.authenticate=false \
+    //           -Dcom.sun.management.jmxremote.ssl=false \
+    //    </value>
+    //  </property>
+    //
+    //  <property>
+    //    <name>mapred.reduce.child.java.opts</name>
+    //    <value>-Xmx 1024M -verbose:gc -Xloggc:/tmp/@taskid@.gc \
+    //           -Dcom.sun.management.jmxremote.authenticate=false \
+    //           -Dcom.sun.management.jmxremote.ssl=false \
+    //    </value>
+    //  </property>
+    //
+    String javaOpts = getChildJavaOpts(conf, task.isMapTask());
+    javaOpts = javaOpts.replace("@taskid@", attemptID.toString());
+    String [] javaOptsSplit = javaOpts.split(" ");
+    for (int i = 0; i < javaOptsSplit.length; i++) {
+      vargs.add(javaOptsSplit[i]);
+    }
+
+    Path childTmpDir = new Path(Environment.PWD.$(),
+        YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR);
+    vargs.add("-Djava.io.tmpdir=" + childTmpDir);
+
+    // Setup the log4j prop
+    long logSize = TaskLog.getTaskLogLength(conf);
+    setupLog4jProperties(task, vargs, logSize);
+
+    if (conf.getProfileEnabled()) {
+      if (conf.getProfileTaskRange(task.isMapTask()
+                                   ).isIncluded(task.getPartition())) {
+        vargs.add(
+            String.format(
+                conf.getProfileParams(), 
+                getTaskLogFile(TaskLog.LogName.PROFILE)
+                )
+            );
+        if (task.isMapTask()) {
+          vargs.add(conf.get(MRJobConfig.TASK_MAP_PROFILE_PARAMS, ""));
+        }
+        else {
+          vargs.add(conf.get(MRJobConfig.TASK_REDUCE_PROFILE_PARAMS, ""));
+        }
+        
+      }
+    }
+
+    // Add main class and its arguments 
+    vargs.add(YarnChild2.class.getName());  // main of Child
+    // pass TaskAttemptListener's address
+    vargs.add(taskAttemptListenerAddr.getAddress().getHostAddress()); 
+    vargs.add(Integer.toString(taskAttemptListenerAddr.getPort())); 
+    vargs.add(attemptID.toString());                      // pass task identifier
+
+    // Finally add the jvmID
+    vargs.add(String.valueOf(jvmID.getId()));
+    vargs.add("1>" + getTaskLogFile(TaskLog.LogName.STDOUT));
+    vargs.add("2>" + getTaskLogFile(TaskLog.LogName.STDERR));
+
+    // Final commmand
+    StringBuilder mergedCommand = new StringBuilder();
+    for (CharSequence str : vargs) {
+      mergedCommand.append(str).append(" ");
+    }
+    Vector<String> vargsFinal = new Vector<String>(1);
+    vargsFinal.add(mergedCommand.toString());
+    return vargsFinal;
+  }
+}

Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapred/MapTaskAttemptImpl2.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapred/MapTaskAttemptImpl2.java?rev=1376283&view=auto
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapred/MapTaskAttemptImpl2.java (added)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapred/MapTaskAttemptImpl2.java Wed Aug 22 22:11:39 2012
@@ -0,0 +1,69 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapred;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapTask;
+import org.apache.hadoop.mapred.Task;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.app2.AppContext;
+import org.apache.hadoop.mapreduce.v2.app2.TaskAttemptListener;
+import org.apache.hadoop.mapreduce.v2.app2.TaskHeartbeatHandler;
+import org.apache.hadoop.mapreduce.v2.app2.job.impl.TaskAttemptImpl;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.Clock;
+import org.apache.hadoop.yarn.event.EventHandler;
+
+@SuppressWarnings("rawtypes")
+public class MapTaskAttemptImpl2 extends TaskAttemptImpl {
+
+  private final TaskSplitMetaInfo splitInfo;
+
+  public MapTaskAttemptImpl2(TaskId taskId, int attempt, 
+      EventHandler eventHandler, Path jobFile, 
+      int partition, TaskSplitMetaInfo splitInfo, JobConf conf,
+      TaskAttemptListener taskAttemptListener, 
+      OutputCommitter committer, Token<JobTokenIdentifier> jobToken,
+      Credentials credentials, Clock clock, TaskHeartbeatHandler thh,
+      AppContext appContext) {
+    super(taskId, attempt, eventHandler, 
+        taskAttemptListener, jobFile, partition, conf, splitInfo.getLocations(),
+        committer, jobToken, credentials, clock, thh, appContext);
+    this.splitInfo = splitInfo;
+  }
+
+  @Override
+  public Task createRemoteTask() {
+    //job file name is set in TaskAttempt, setting it null here
+    MapTask mapTask =
+      new MapTask("", TypeConverter.fromYarn(getID()), partition,
+          splitInfo.getSplitIndex(), 1); // YARN doesn't have the concept of slots per task, set it as 1.
+    mapTask.setUser(conf.get(MRJobConfig.USER_NAME));
+    mapTask.setConf(conf);
+    return mapTask;
+  }
+
+}

Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapred/ReduceTaskAttemptImpl2.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapred/ReduceTaskAttemptImpl2.java?rev=1376283&view=auto
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapred/ReduceTaskAttemptImpl2.java (added)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapred/ReduceTaskAttemptImpl2.java Wed Aug 22 22:11:39 2012
@@ -0,0 +1,65 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapred;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.app2.AppContext;
+import org.apache.hadoop.mapreduce.v2.app2.TaskAttemptListener;
+import org.apache.hadoop.mapreduce.v2.app2.TaskHeartbeatHandler;
+import org.apache.hadoop.mapreduce.v2.app2.job.impl.TaskAttemptImpl;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.Clock;
+import org.apache.hadoop.yarn.event.EventHandler;
+
+@SuppressWarnings("rawtypes")
+public class ReduceTaskAttemptImpl2 extends TaskAttemptImpl {
+
+  private final int numMapTasks;
+
+  public ReduceTaskAttemptImpl2(TaskId id, int attempt,
+      EventHandler eventHandler, Path jobFile, int partition,
+      int numMapTasks, JobConf conf,
+      TaskAttemptListener taskAttemptListener, OutputCommitter committer,
+      Token<JobTokenIdentifier> jobToken,
+      Credentials credentials, Clock clock,
+      TaskHeartbeatHandler thh, AppContext appContext) {
+    super(id, attempt, eventHandler, taskAttemptListener, jobFile, partition,
+        conf, new String[] {}, committer, jobToken, credentials, clock, thh,
+        appContext);
+    this.numMapTasks = numMapTasks;
+  }
+
+  @Override
+  public Task createRemoteTask() {
+  //job file name is set in TaskAttempt, setting it null here
+    ReduceTask reduceTask =
+      new ReduceTask("", TypeConverter.fromYarn(getID()), partition,
+          numMapTasks, 1); // YARN doesn't have the concept of slots per task, set it as 1.
+  reduceTask.setUser(conf.get(MRJobConfig.USER_NAME));
+  reduceTask.setConf(conf);
+    return reduceTask;
+  }
+
+}

Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl2.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl2.java?rev=1376283&view=auto
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl2.java (added)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl2.java Wed Aug 22 22:11:39 2012
@@ -0,0 +1,561 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.ipc.ProtocolSignature;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.mapred.SortedRanges.Range;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.jobhistory.ContainerHeartbeatHandler;
+import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
+import org.apache.hadoop.mapreduce.v2.app2.AppContext;
+import org.apache.hadoop.mapreduce.v2.app2.TaskAttemptListener;
+import org.apache.hadoop.mapreduce.v2.app2.TaskHeartbeatHandler;
+import org.apache.hadoop.mapreduce.v2.app2.job.Job;
+import org.apache.hadoop.mapreduce.v2.app2.job.Task;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptDiagnosticsUpdateEvent;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventType;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptRemoteStartEvent;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptStatusUpdateEvent;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerImpl;
+import org.apache.hadoop.mapreduce.v2.app2.security.authorize.MRAMPolicyProvider;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.authorize.PolicyProvider;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.service.CompositeService;
+
+/**
+ * This class is responsible for talking to the task umblical.
+ * It also converts all the old data structures
+ * to yarn data structures.
+ * 
+ * This class HAS to be in this package to access package private 
+ * methods/classes.
+ */
+@SuppressWarnings("unchecked")
+public class TaskAttemptListenerImpl2 extends CompositeService 
+    implements TaskUmbilicalProtocol, TaskAttemptListener {
+
+  // TODO XXX: Ideally containerId registration and unregistration should be taken care of by the Container.
+  // .... TaskAttemptId registration and unregistration by the TaskAttempt. Can this be split into a 
+  // ContainerListener + TaskAttemptListener ?
+  
+  // TODO XXX. Re-look at big chungs. Possibly redo bits.
+  // ..launchedJvm map etc.
+  // ..Sending back errors for unknown tasks.
+  
+  private static final JvmTask TASK_FOR_INVALID_JVM = new JvmTask(null, true);
+  private static final JvmTask UNASSIGNED_TASK = new JvmTask(null, false);
+
+  private static final Log LOG = LogFactory.getLog(TaskAttemptListenerImpl2.class);
+
+  private final AppContext context;
+  
+  protected final TaskHeartbeatHandler taskHeartbeatHandler;
+  protected final ContainerHeartbeatHandler containerHeartbeatHandler;
+  private final JobTokenSecretManager jobTokenSecretManager;
+  private InetSocketAddress address;
+  private Server server;
+  
+  // TODO XXX: Use this to figure out whether an incoming ping is valid.
+  private ConcurrentMap<TaskAttemptID, WrappedJvmID>
+    jvmIDToActiveAttemptMap
+      = new ConcurrentHashMap<TaskAttemptID, WrappedJvmID>();
+  // jvmIdToContainerIdMap also serving to check whether the container is still running.
+  private ConcurrentMap<WrappedJvmID, ContainerId> jvmIDToContainerIdMap = new ConcurrentHashMap<WrappedJvmID, ContainerId>();
+//  private Set<WrappedJvmID> launchedJVMs = Collections
+//      .newSetFromMap(new ConcurrentHashMap<WrappedJvmID, Boolean>()); 
+  
+  
+  
+  public TaskAttemptListenerImpl2(AppContext context, TaskHeartbeatHandler thh,
+      ContainerHeartbeatHandler chh, JobTokenSecretManager jobTokenSecretManager) {
+    super(TaskAttemptListenerImpl2.class.getName());
+    this.context = context;
+    this.jobTokenSecretManager = jobTokenSecretManager;
+    this.taskHeartbeatHandler = thh;
+    this.containerHeartbeatHandler = chh;
+  }
+
+  @Override
+  public void start() {
+    LOG.info("XXX: Starting TAL2");
+    startRpcServer();
+    super.start();
+  }
+
+  protected void startRpcServer() {
+    Configuration conf = getConfig();
+    try {
+      server =
+          RPC.getServer(TaskUmbilicalProtocol.class, this, "0.0.0.0", 0, 
+              conf.getInt(MRJobConfig.MR_AM_TASK_LISTENER_THREAD_COUNT, 
+                  MRJobConfig.DEFAULT_MR_AM_TASK_LISTENER_THREAD_COUNT),
+              false, conf, jobTokenSecretManager);
+      
+      // Enable service authorization?
+      if (conf.getBoolean(
+          CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, 
+          false)) {
+        refreshServiceAcls(conf, new MRAMPolicyProvider());
+      }
+
+      server.start();
+      this.address = NetUtils.getConnectAddress(server);
+    } catch (IOException e) {
+      throw new YarnException(e);
+    }
+  }
+
+  void refreshServiceAcls(Configuration configuration, 
+      PolicyProvider policyProvider) {
+    this.server.refreshServiceAcl(configuration, policyProvider);
+  }
+
+  @Override
+  public void stop() {
+    stopRpcServer();
+    super.stop();
+  }
+
+  protected void stopRpcServer() {
+    server.stop();
+  }
+
+  @Override
+  public InetSocketAddress getAddress() {
+    return address;
+  }
+
+  private void pingContainerHeartbeatHandler(TaskAttemptID attemptID) {
+    containerHeartbeatHandler.pinged(jvmIDToContainerIdMap.get(jvmIDToActiveAttemptMap.get(attemptID)));
+  }
+  
+  /**
+   * Child checking whether it can commit.
+   * 
+   * <br/>
+   * Commit is a two-phased protocol. First the attempt informs the
+   * ApplicationMaster that it is
+   * {@link #commitPending(TaskAttemptID, TaskStatus)}. Then it repeatedly polls
+   * the ApplicationMaster whether it {@link #canCommit(TaskAttemptID)} This is
+   * a legacy from the centralized commit protocol handling by the JobTracker.
+   */
+  @Override
+  public boolean canCommit(TaskAttemptID taskAttemptID) throws IOException {
+    LOG.info("Commit go/no-go request from " + taskAttemptID.toString());
+    // An attempt is asking if it can commit its output. This can be decided
+    // only by the task which is managing the multiple attempts. So redirect the
+    // request there.
+    org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID =
+        TypeConverter.toYarn(taskAttemptID);
+
+    taskHeartbeatHandler.progressing(attemptID);
+    pingContainerHeartbeatHandler(taskAttemptID);
+
+    Job job = context.getJob(attemptID.getTaskId().getJobId());
+    Task task = job.getTask(attemptID.getTaskId());
+    return task.canCommit(attemptID);
+  }
+
+  /**
+   * TaskAttempt is reporting that it is in commit_pending and it is waiting for
+   * the commit Response
+   * 
+   * <br/>
+   * Commit it a two-phased protocol. First the attempt informs the
+   * ApplicationMaster that it is
+   * {@link #commitPending(TaskAttemptID, TaskStatus)}. Then it repeatedly polls
+   * the ApplicationMaster whether it {@link #canCommit(TaskAttemptID)} This is
+   * a legacy from the centralized commit protocol handling by the JobTracker.
+   */
+  @Override
+  public void commitPending(TaskAttemptID taskAttemptID, TaskStatus taskStatsu)
+          throws IOException, InterruptedException {
+    LOG.info("Commit-pending state update from " + taskAttemptID.toString());
+    // An attempt is asking if it can commit its output. This can be decided
+    // only by the task which is managing the multiple attempts. So redirect the
+    // request there.
+    org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID =
+        TypeConverter.toYarn(taskAttemptID);
+    
+
+    taskHeartbeatHandler.progressing(attemptID);
+    pingContainerHeartbeatHandler(taskAttemptID);
+    //Ignorable TaskStatus? - since a task will send a LastStatusUpdate
+    context.getEventHandler().handle(
+        new TaskAttemptEvent(attemptID, 
+            TaskAttemptEventType.TA_COMMIT_PENDING));
+  }
+
+  @Override
+  public void done(TaskAttemptID taskAttemptID) throws IOException {
+    LOG.info("Done acknowledgement from " + taskAttemptID.toString());
+
+    org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID =
+        TypeConverter.toYarn(taskAttemptID);
+
+    taskHeartbeatHandler.progressing(attemptID);
+    pingContainerHeartbeatHandler(taskAttemptID);
+
+    context.getEventHandler().handle(
+        new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_DONE));
+  }
+
+  @Override
+  public void fatalError(TaskAttemptID taskAttemptID, String msg)
+      throws IOException {
+    // This happens only in Child and in the Task.
+    LOG.fatal("Task: " + taskAttemptID + " - exited : " + msg);
+    reportDiagnosticInfo(taskAttemptID, "Error: " + msg);
+
+    org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID =
+        TypeConverter.toYarn(taskAttemptID);
+    context.getEventHandler().handle(
+        new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_FAILED));
+  }
+
+  @Override
+  public void fsError(TaskAttemptID taskAttemptID, String message)
+      throws IOException {
+    // This happens only in Child.
+    LOG.fatal("Task: " + taskAttemptID + " - failed due to FSError: "
+        + message);
+    reportDiagnosticInfo(taskAttemptID, "FSError: " + message);
+
+    org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID =
+        TypeConverter.toYarn(taskAttemptID);
+    context.getEventHandler().handle(
+        new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_FAILED));
+  }
+
+  @Override
+  public void shuffleError(TaskAttemptID taskAttemptID, String message) throws IOException {
+    // TODO: This isn't really used in any MR code. Ask for removal.    
+  }
+
+  @Override
+  public MapTaskCompletionEventsUpdate getMapCompletionEvents(
+      JobID jobIdentifier, int fromEventId, int maxEvents,
+      TaskAttemptID taskAttemptID) throws IOException {
+    LOG.info("MapCompletionEvents request from " + taskAttemptID.toString()
+        + ". fromEventID " + fromEventId + " maxEvents " + maxEvents);
+
+    // TODO: shouldReset is never used. See TT. Ask for Removal.
+    boolean shouldReset = false;
+    org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID =
+      TypeConverter.toYarn(taskAttemptID);
+    org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent[] events =
+        context.getJob(attemptID.getTaskId().getJobId()).getTaskAttemptCompletionEvents(
+            fromEventId, maxEvents);
+
+    taskHeartbeatHandler.progressing(attemptID);
+    pingContainerHeartbeatHandler(taskAttemptID);
+
+    // filter the events to return only map completion events in old format
+    List<TaskCompletionEvent> mapEvents = new ArrayList<TaskCompletionEvent>();
+    for (org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent event : events) {
+      if (TaskType.MAP.equals(event.getAttemptId().getTaskId().getTaskType())) {
+        mapEvents.add(TypeConverter.fromYarn(event));
+      }
+    }
+    
+    return new MapTaskCompletionEventsUpdate(
+        mapEvents.toArray(new TaskCompletionEvent[0]), shouldReset);
+  }
+
+  @Override
+  public boolean ping(TaskAttemptID taskAttemptID) throws IOException {
+    LOG.info("Ping from " + taskAttemptID.toString());
+    taskHeartbeatHandler.pinged(TypeConverter.toYarn(taskAttemptID));
+    pingContainerHeartbeatHandler(taskAttemptID);
+    return true;
+  }
+
+  @Override
+  public void reportDiagnosticInfo(TaskAttemptID taskAttemptID, String diagnosticInfo)
+ throws IOException {
+    LOG.info("Diagnostics report from " + taskAttemptID.toString() + ": "
+        + diagnosticInfo);
+
+    org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID =
+      TypeConverter.toYarn(taskAttemptID);
+    taskHeartbeatHandler.progressing(attemptID);
+    pingContainerHeartbeatHandler(taskAttemptID);
+
+    // This is mainly used for cases where we want to propagate exception traces
+    // of tasks that fail.
+
+    // This call exists as a hadoop mapreduce legacy wherein all changes in
+    // counters/progress/phase/output-size are reported through statusUpdate()
+    // call but not diagnosticInformation.
+    context.getEventHandler().handle(
+        new TaskAttemptDiagnosticsUpdateEvent(attemptID, diagnosticInfo));
+  }
+
+  @Override
+  public boolean statusUpdate(TaskAttemptID taskAttemptID,
+      TaskStatus taskStatus) throws IOException, InterruptedException {
+    LOG.info("Status update from " + taskAttemptID.toString());
+    org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId yarnAttemptID =
+        TypeConverter.toYarn(taskAttemptID);
+    taskHeartbeatHandler.progressing(yarnAttemptID);
+    pingContainerHeartbeatHandler(taskAttemptID);
+    TaskAttemptStatus taskAttemptStatus =
+        new TaskAttemptStatus();
+    taskAttemptStatus.id = yarnAttemptID;
+    // Task sends the updated progress to the TT.
+    taskAttemptStatus.progress = taskStatus.getProgress();
+    LOG.info("Progress of TaskAttempt " + taskAttemptID + " is : "
+        + taskStatus.getProgress());
+    // Task sends the updated state-string to the TT.
+    taskAttemptStatus.stateString = taskStatus.getStateString();
+    // Set the output-size when map-task finishes. Set by the task itself.
+    taskAttemptStatus.outputSize = taskStatus.getOutputSize();
+    // Task sends the updated phase to the TT.
+    taskAttemptStatus.phase = TypeConverter.toYarn(taskStatus.getPhase());
+    // Counters are updated by the task. Convert counters into new format as
+    // that is the primary storage format inside the AM to avoid multiple
+    // conversions and unnecessary heap usage.
+    taskAttemptStatus.counters = new org.apache.hadoop.mapreduce.Counters(
+      taskStatus.getCounters());
+
+    // Map Finish time set by the task (map only)
+    if (taskStatus.getIsMap() && taskStatus.getMapFinishTime() != 0) {
+      taskAttemptStatus.mapFinishTime = taskStatus.getMapFinishTime();
+    }
+
+    // Shuffle Finish time set by the task (reduce only).
+    if (!taskStatus.getIsMap() && taskStatus.getShuffleFinishTime() != 0) {
+      taskAttemptStatus.shuffleFinishTime = taskStatus.getShuffleFinishTime();
+    }
+
+    // Sort finish time set by the task (reduce only).
+    if (!taskStatus.getIsMap() && taskStatus.getSortFinishTime() != 0) {
+      taskAttemptStatus.sortFinishTime = taskStatus.getSortFinishTime();
+    }
+
+    // Not Setting the task state. Used by speculation - will be set in TaskAttemptImpl
+    //taskAttemptStatus.taskState =  TypeConverter.toYarn(taskStatus.getRunState());
+    
+    //set the fetch failures
+    if (taskStatus.getFetchFailedMaps() != null 
+        && taskStatus.getFetchFailedMaps().size() > 0) {
+      taskAttemptStatus.fetchFailedMaps = 
+        new ArrayList<org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId>();
+      for (TaskAttemptID failedMapId : taskStatus.getFetchFailedMaps()) {
+        taskAttemptStatus.fetchFailedMaps.add(
+            TypeConverter.toYarn(failedMapId));
+      }
+    }
+
+ // Task sends the information about the nextRecordRange to the TT
+    
+//    TODO: The following are not needed here, but needed to be set somewhere inside AppMaster.
+//    taskStatus.getRunState(); // Set by the TT/JT. Transform into a state TODO
+//    taskStatus.getStartTime(); // Used to be set by the TaskTracker. This should be set by getTask().
+//    taskStatus.getFinishTime(); // Used to be set by TT/JT. Should be set when task finishes
+//    // This was used by TT to do counter updates only once every minute. So this
+//    // isn't ever changed by the Task itself.
+//    taskStatus.getIncludeCounters();
+
+    context.getEventHandler().handle(
+        new TaskAttemptStatusUpdateEvent(taskAttemptStatus.id,
+            taskAttemptStatus));
+    return true;
+  }
+
+  @Override
+  public long getProtocolVersion(String arg0, long arg1) throws IOException {
+    return TaskUmbilicalProtocol.versionID;
+  }
+
+  @Override
+  public void reportNextRecordRange(TaskAttemptID taskAttemptID, Range range)
+      throws IOException {
+    // This is used when the feature of skipping records is enabled.
+
+    // This call exists as a hadoop mapreduce legacy wherein all changes in
+    // counters/progress/phase/output-size are reported through statusUpdate()
+    // call but not the next record range information.
+    throw new IOException("Not yet implemented.");
+  }
+
+  @Override
+  public JvmTask getTask(JvmContext jvmContext) throws IOException {
+
+    // A rough imitation of code from TaskTracker.
+
+    JVMId jvmId = jvmContext.jvmId;
+    LOG.info("ZZZ: JVM with ID : " + jvmId + " asked for a task");
+    
+    JvmTask jvmTask = null;
+    // TODO: Is it an authorized container to get a task? Otherwise return null.
+
+    // TODO: Child.java's firstTaskID isn't really firstTaskID. Ask for update
+    // to jobId and task-type.
+
+    WrappedJvmID wJvmID = new WrappedJvmID(jvmId.getJobId(), jvmId.isMap,
+        jvmId.getId());
+
+    ContainerId containerId = jvmIDToContainerIdMap.get(wJvmID);
+    if (containerId == null) {
+      LOG.info("JVM with ID: " + jvmId + " is invalid and will be killed.");
+      jvmTask = TASK_FOR_INVALID_JVM;
+    } else {
+      org.apache.hadoop.mapred.Task task = pullTaskAttempt(containerId);
+      if (task == null) {
+        LOG.info("No task currently assigned to JVM with ID: " + jvmId);
+        jvmTask = null;
+      } else {
+        TaskAttemptId yTaskAttemptId = TypeConverter.toYarn(task.getTaskID());
+        // TODO XXX: Generate this event properly - proper params etc etc etc.s
+        // TODO XXX: Fix the hardcoded port.
+        context.getEventHandler().handle(new TaskAttemptRemoteStartEvent(yTaskAttemptId, containerId, null, 8080));
+        LOG.info("JVM with ID: " + jvmId + " given task: " + task.getTaskID());
+        registerTaskAttempt(yTaskAttemptId, wJvmID);
+        jvmTask = new JvmTask(task, false);
+      }
+    }
+    return jvmTask;
+    
+//    
+//    // Try to look up the task. We remove it directly as we don't give
+//    // multiple tasks to a JVM
+//    if (!jvmIDToActiveAttemptMap.containsKey(wJvmID)) {
+//      LOG.info("JVM with ID: " + jvmId + " is invalid and will be killed.");
+//      jvmTask = TASK_FOR_INVALID_JVM;
+//    } else {
+//      if (!launchedJVMs.contains(wJvmID)) {
+//        jvmTask = null;
+//        LOG.info("JVM with ID: " + jvmId
+//            + " asking for task before AM launch registered. Given null task");
+//      } else {
+//        // remove the task as it is no more needed and free up the memory.
+//        // Also we have already told the JVM to process a task, so it is no
+//        // longer pending, and further request should ask it to exit.
+//        org.apache.hadoop.mapred.Task task =
+//            jvmIDToActiveAttemptMap.remove(wJvmID);
+//        launchedJVMs.remove(wJvmID);
+//        LOG.info("JVM with ID: " + jvmId + " given task: " + task.getTaskID());
+//        jvmTask = new JvmTask(task, false);
+//      }
+//    }
+//    return jvmTask;
+  }
+
+  @Override
+  public void registerRunningJvm(WrappedJvmID jvmID, ContainerId containerId) {
+    LOG.info("XXX: JvmRegistration: " + jvmID + ", ContaienrId: " + containerId);
+    jvmIDToContainerIdMap.putIfAbsent(jvmID, containerId);
+  }
+  
+  @Override
+  public void unregisterRunningJvm(WrappedJvmID jvmID) {
+    LOG.info("TOREMOVE: Unregistering jvmId: " + jvmID);
+    if (jvmIDToContainerIdMap.remove(jvmID) == null) {
+      LOG.warn("Attempt to unregister unknwon jvmtoContainerMap: " + jvmID);
+    }
+  }
+  
+  public void registerTaskAttempt(TaskAttemptId attemptId, WrappedJvmID jvmId) {
+    jvmIDToActiveAttemptMap.put(TypeConverter.fromYarn(attemptId), jvmId);
+  }
+  
+  // Unregister called by the Container. Registration happens when TAL asks
+  // the container for a task.
+  @Override
+  public void unregisterTaskAttempt(TaskAttemptId attemptId) {
+    jvmIDToActiveAttemptMap.remove(TypeConverter.fromYarn(attemptId));
+  }
+
+  public org.apache.hadoop.mapred.Task pullTaskAttempt(ContainerId containerId) {
+    // TODO XXX: pullTaskAttempt as part of the interface.
+    AMContainerImpl container = (AMContainerImpl) context
+        .getContainer(containerId);
+    return container.pullTaskAttempt();
+  }
+
+//  @Override
+//  public void registerPendingTask(
+//      org.apache.hadoop.mapred.Task task, WrappedJvmID jvmID) {
+//    // Create the mapping so that it is easy to look up
+//    // when the jvm comes back to ask for Task.
+//
+//    // A JVM not present in this map is an illegal task/JVM.
+//    jvmIDToActiveAttemptMap.put(jvmID, task);
+//  }
+//
+//  @Override
+//  public void registerLaunchedTask(
+//      org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID,
+//      WrappedJvmID jvmId) {
+//    // The AM considers the task to be launched (Has asked the NM to launch it)
+//    // The JVM will only be given a task after this registartion.
+//    launchedJVMs.add(jvmId);
+//
+//    taskHeartbeatHandler.register(attemptID);
+//  }
+//
+//  @Override
+//  public void unregister(
+//      org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID,
+//      WrappedJvmID jvmID) {
+//
+//    // Unregistration also comes from the same TaskAttempt which does the
+//    // registration. Events are ordered at TaskAttempt, so unregistration will
+//    // always come after registration.
+//
+//    // Remove from launchedJVMs before jvmIDToActiveAttemptMap to avoid
+//    // synchronization issue with getTask(). getTask should be checking
+//    // jvmIDToActiveAttemptMap before it checks launchedJVMs.
+// 
+//    // remove the mappings if not already removed
+//    launchedJVMs.remove(jvmID);
+//    jvmIDToActiveAttemptMap.remove(jvmID);
+//
+//    //unregister this attempt
+//    taskHeartbeatHandler.unregister(attemptID);
+//  }
+
+  @Override
+  public ProtocolSignature getProtocolSignature(String protocol,
+      long clientVersion, int clientMethodsHash) throws IOException {
+    return ProtocolSignature.getProtocolSignature(this, 
+        protocol, clientVersion, clientMethodsHash);
+  }
+}

Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapred/WrappedJvmID.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapred/WrappedJvmID.java?rev=1376283&view=auto
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapred/WrappedJvmID.java (added)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapred/WrappedJvmID.java Wed Aug 22 22:11:39 2012
@@ -0,0 +1,30 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapred;
+
+/**
+ * A simple wrapper for increasing the visibility.
+ */
+public class WrappedJvmID extends JVMId {
+
+  public WrappedJvmID(JobID jobID, boolean mapTask, int nextInt) {
+    super(jobID, mapTask, nextInt);
+  }
+
+}

Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapred/WrappedPeriodicStatsAccumulator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapred/WrappedPeriodicStatsAccumulator.java?rev=1376283&view=auto
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapred/WrappedPeriodicStatsAccumulator.java (added)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapred/WrappedPeriodicStatsAccumulator.java Wed Aug 22 22:11:39 2012
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+//Workaround for PeriodicStateAccumulator being package access
+public class WrappedPeriodicStatsAccumulator {
+
+  private PeriodicStatsAccumulator real;
+
+  public WrappedPeriodicStatsAccumulator(PeriodicStatsAccumulator real) {
+    this.real = real;
+  }
+  
+  public void extend(double newProgress, int newValue) {
+    real.extend(newProgress, newValue);
+  }
+}

Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapred/WrappedProgressSplitsBlock.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapred/WrappedProgressSplitsBlock.java?rev=1376283&view=auto
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapred/WrappedProgressSplitsBlock.java (added)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapred/WrappedProgressSplitsBlock.java Wed Aug 22 22:11:39 2012
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+// Workaround for ProgressSplitBlock being package access
+public class WrappedProgressSplitsBlock extends ProgressSplitsBlock {
+  private WrappedPeriodicStatsAccumulator wrappedProgressWallclockTime;
+  private WrappedPeriodicStatsAccumulator wrappedProgressCPUTime;
+  private WrappedPeriodicStatsAccumulator wrappedProgressVirtualMemoryKbytes;
+  private WrappedPeriodicStatsAccumulator wrappedProgressPhysicalMemoryKbytes;
+
+  public WrappedProgressSplitsBlock(int numberSplits) {
+    super(numberSplits);
+  }
+
+  public int[][] burst() {
+    return super.burst();
+  }
+
+  public WrappedPeriodicStatsAccumulator getProgressWallclockTime() {
+    if (wrappedProgressWallclockTime == null) {
+      wrappedProgressWallclockTime = new WrappedPeriodicStatsAccumulator(
+          progressWallclockTime);
+    }
+    return wrappedProgressWallclockTime;
+  }
+
+  public WrappedPeriodicStatsAccumulator getProgressCPUTime() {
+    if (wrappedProgressCPUTime == null) {
+      wrappedProgressCPUTime = new WrappedPeriodicStatsAccumulator(
+          progressCPUTime);
+    }
+    return wrappedProgressCPUTime;
+  }
+
+  public WrappedPeriodicStatsAccumulator getProgressVirtualMemoryKbytes() {
+    if (wrappedProgressVirtualMemoryKbytes == null) {
+      wrappedProgressVirtualMemoryKbytes = new WrappedPeriodicStatsAccumulator(
+          progressVirtualMemoryKbytes);
+    }
+    return wrappedProgressVirtualMemoryKbytes;
+  }
+
+  public WrappedPeriodicStatsAccumulator getProgressPhysicalMemoryKbytes() {
+    if (wrappedProgressPhysicalMemoryKbytes == null) {
+      wrappedProgressPhysicalMemoryKbytes = new WrappedPeriodicStatsAccumulator(
+          progressPhysicalMemoryKbytes);
+    }
+    return wrappedProgressPhysicalMemoryKbytes;
+  }
+}

Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapred/YarnChild2.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapred/YarnChild2.java?rev=1376283&view=auto
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapred/YarnChild2.java (added)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapred/YarnChild2.java Wed Aug 22 22:11:39 2012
@@ -0,0 +1,394 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapred;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSError;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.filecache.DistributedCache;
+import org.apache.hadoop.mapreduce.security.TokenCache;
+import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
+import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.source.JvmMetrics;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.DiskChecker.DiskErrorException;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.log4j.LogManager;
+
+/**
+ * The main() for MapReduce task processes.
+ */
+class YarnChild2 {
+
+  private static final Log LOG = LogFactory.getLog(YarnChild2.class);
+
+  static volatile TaskAttemptID taskid = null;
+
+  public static void main(String[] args) throws Throwable {
+    LOG.info("XXX: Child starting");
+
+    final JobConf defaultConf = new JobConf();
+    defaultConf.addResource(MRJobConfig.JOB_CONF_FILE);
+    UserGroupInformation.setConfiguration(defaultConf);
+
+    String host = args[0];
+    int port = Integer.parseInt(args[1]);
+    final InetSocketAddress address =
+        NetUtils.createSocketAddrForHost(host, port);
+    final TaskAttemptID firstTaskid = TaskAttemptID.forName(args[2]);
+    int jvmIdInt = Integer.parseInt(args[3]);
+    JVMId jvmId = new JVMId(firstTaskid.getJobID(),
+        firstTaskid.getTaskType() == TaskType.MAP, jvmIdInt);
+
+    // initialize metrics
+    DefaultMetricsSystem.initialize(
+        StringUtils.camelize(firstTaskid.getTaskType().name()) +"Task");
+
+    Token<JobTokenIdentifier> jt = loadCredentials(defaultConf, address);
+
+    // Create TaskUmbilicalProtocol as actual task owner.
+    UserGroupInformation taskOwner =
+      UserGroupInformation.createRemoteUser(firstTaskid.getJobID().toString());
+    taskOwner.addToken(jt);
+    final TaskUmbilicalProtocol umbilical =
+      taskOwner.doAs(new PrivilegedExceptionAction<TaskUmbilicalProtocol>() {
+      @Override
+      public TaskUmbilicalProtocol run() throws Exception {
+        return (TaskUmbilicalProtocol)RPC.getProxy(TaskUmbilicalProtocol.class,
+            TaskUmbilicalProtocol.versionID, address, defaultConf);
+      }
+    });
+
+    // report non-pid to application master
+    JvmContext context = new JvmContext(jvmId, "-1000");
+    LOG.debug("PID: " + System.getenv().get("JVM_PID"));
+    Task task = null;
+    UserGroupInformation childUGI = null;
+
+    try {
+      while (true) {
+        LOG.info("Polling for next task");
+      int idleLoopCount = 0;
+      JvmTask myTask = null;;
+      // poll for new task
+      for (int idle = 0; null == myTask; ++idle) {
+//        long sleepTimeMilliSecs = Math.min(idle * 500, 1500);
+        // XXX: Figure out sleep time.
+        long sleepTimeMilliSecs = 20;
+        LOG.info("Sleeping for " + sleepTimeMilliSecs
+            + "ms before retrying again. Got null now.");
+        MILLISECONDS.sleep(sleepTimeMilliSecs);
+        myTask = umbilical.getTask(context);
+      }
+      if (myTask.shouldDie()) {
+        return;
+      }
+
+      task = myTask.getTask();
+      YarnChild2.taskid = task.getTaskID();
+
+      // Create the job-conf and set credentials
+      final JobConf job =
+        configureTask(task, defaultConf.getCredentials(), jt);
+
+      // Initiate Java VM metrics
+      JvmMetrics.initSingleton(jvmId.toString(), job.getSessionId());
+      childUGI = UserGroupInformation.createRemoteUser(System
+          .getenv(ApplicationConstants.Environment.USER.toString()));
+      // Add tokens to new user so that it may execute its task correctly.
+      for(Token<?> token : UserGroupInformation.getCurrentUser().getTokens()) {
+        childUGI.addToken(token);
+      }
+
+      // Create a final reference to the task for the doAs block
+      final Task taskFinal = task;
+      childUGI.doAs(new PrivilegedExceptionAction<Object>() {
+        @Override
+        public Object run() throws Exception {
+          // use job-specified working directory
+          FileSystem.get(job).setWorkingDirectory(job.getWorkingDirectory());
+          taskFinal.run(job, umbilical); // run the task
+          return null;
+        }
+      });
+      LOG.info("XXX: _______Done executing one task_________");
+      }
+    } catch (FSError e) {
+      LOG.fatal("FSError from child", e);
+      umbilical.fsError(taskid, e.getMessage());
+    } catch (Exception exception) {
+      LOG.warn("Exception running child : "
+          + StringUtils.stringifyException(exception));
+      try {
+        if (task != null) {
+          // do cleanup for the task
+          if (childUGI == null) { // no need to job into doAs block
+            task.taskCleanup(umbilical);
+          } else {
+            final Task taskFinal = task;
+            childUGI.doAs(new PrivilegedExceptionAction<Object>() {
+              @Override
+              public Object run() throws Exception {
+                taskFinal.taskCleanup(umbilical);
+                return null;
+              }
+            });
+          }
+        }
+      } catch (Exception e) {
+        LOG.info("Exception cleaning up: " + StringUtils.stringifyException(e));
+      }
+      // Report back any failures, for diagnostic purposes
+      ByteArrayOutputStream baos = new ByteArrayOutputStream();
+      exception.printStackTrace(new PrintStream(baos));
+      if (taskid != null) {
+        umbilical.fatalError(taskid, baos.toString());
+      }
+    } catch (Throwable throwable) {
+      LOG.fatal("Error running child : "
+    	        + StringUtils.stringifyException(throwable));
+      if (taskid != null) {
+        Throwable tCause = throwable.getCause();
+        String cause = tCause == null
+                                 ? throwable.getMessage()
+                                 : StringUtils.stringifyException(tCause);
+        umbilical.fatalError(taskid, cause);
+      }
+    } finally {
+      RPC.stopProxy(umbilical);
+      DefaultMetricsSystem.shutdown();
+      // Shutting down log4j of the child-vm...
+      // This assumes that on return from Task.run()
+      // there is no more logging done.
+      LogManager.shutdown();
+    }
+  }
+
+  private static Token<JobTokenIdentifier> loadCredentials(JobConf conf,
+      InetSocketAddress address) throws IOException {
+    //load token cache storage
+    String tokenFileLocation =
+        System.getenv(ApplicationConstants.CONTAINER_TOKEN_FILE_ENV_NAME);
+    String jobTokenFile =
+        new Path(tokenFileLocation).makeQualified(FileSystem.getLocal(conf))
+            .toUri().getPath();
+    Credentials credentials =
+      TokenCache.loadTokens(jobTokenFile, conf);
+    LOG.debug("loading token. # keys =" +credentials.numberOfSecretKeys() +
+        "; from file=" + jobTokenFile);
+    Token<JobTokenIdentifier> jt = TokenCache.getJobToken(credentials);
+    SecurityUtil.setTokenService(jt, address);
+    UserGroupInformation current = UserGroupInformation.getCurrentUser();
+    current.addToken(jt);
+    for (Token<? extends TokenIdentifier> tok : credentials.getAllTokens()) {
+      current.addToken(tok);
+    }
+    // Set the credentials
+    conf.setCredentials(credentials);
+    return jt;
+  }
+
+  /**
+   * Configure mapred-local dirs. This config is used by the task for finding
+   * out an output directory.
+   * @throws IOException 
+   */
+  private static void configureLocalDirs(Task task, JobConf job) throws IOException {
+    String[] localSysDirs = StringUtils.getTrimmedStrings(
+        System.getenv(ApplicationConstants.LOCAL_DIR_ENV));
+    job.setStrings(MRConfig.LOCAL_DIR, localSysDirs);
+    LOG.info(MRConfig.LOCAL_DIR + " for child: " + job.get(MRConfig.LOCAL_DIR));
+    LocalDirAllocator lDirAlloc = new LocalDirAllocator(MRConfig.LOCAL_DIR);
+    Path workDir = null;
+    // First, try to find the JOB_LOCAL_DIR on this host.
+    try {
+      workDir = lDirAlloc.getLocalPathToRead("work", job);
+    } catch (DiskErrorException e) {
+      // DiskErrorException means dir not found. If not found, it will
+      // be created below.
+    }
+    if (workDir == null) {
+      // JOB_LOCAL_DIR doesn't exist on this host -- Create it.
+      workDir = lDirAlloc.getLocalPathForWrite("work", job);
+      FileSystem lfs = FileSystem.getLocal(job).getRaw();
+      boolean madeDir = false;
+      try {
+        madeDir = lfs.mkdirs(workDir);
+      } catch (FileAlreadyExistsException e) {
+        // Since all tasks will be running in their own JVM, the race condition
+        // exists where multiple tasks could be trying to create this directory
+        // at the same time. If this task loses the race, it's okay because
+        // the directory already exists.
+        madeDir = true;
+        workDir = lDirAlloc.getLocalPathToRead("work", job);
+      }
+      if (!madeDir) {
+          throw new IOException("Mkdirs failed to create "
+              + workDir.toString());
+      }
+    }
+    job.set(MRJobConfig.JOB_LOCAL_DIR,workDir.toString());
+  }
+
+  private static JobConf configureTask(Task task, Credentials credentials,
+      Token<JobTokenIdentifier> jt) throws IOException {
+    final JobConf job = new JobConf(MRJobConfig.JOB_CONF_FILE);
+    job.setCredentials(credentials);
+    
+    String appAttemptIdEnv = System
+        .getenv(MRJobConfig.APPLICATION_ATTEMPT_ID_ENV);
+    LOG.debug("APPLICATION_ATTEMPT_ID: " + appAttemptIdEnv);
+    // Set it in conf, so as to be able to be used the the OutputCommitter.
+    job.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, Integer
+        .parseInt(appAttemptIdEnv));
+
+    // set tcp nodelay
+    job.setBoolean("ipc.client.tcpnodelay", true);
+    job.setClass(MRConfig.TASK_LOCAL_OUTPUT_CLASS,
+        YarnOutputFiles.class, MapOutputFile.class);
+    // set the jobTokenFile into task
+    task.setJobTokenSecret(
+        JobTokenSecretManager.createSecretKey(jt.getPassword()));
+
+    // setup the child's MRConfig.LOCAL_DIR.
+    configureLocalDirs(task, job);
+
+    // setup the child's attempt directories
+    // Do the task-type specific localization
+    task.localizeConfiguration(job);
+
+    // Set up the DistributedCache related configs
+    setupDistributedCacheConfig(job);
+
+    // Overwrite the localized task jobconf which is linked to in the current
+    // work-dir.
+    Path localTaskFile = new Path(MRJobConfig.JOB_CONF_FILE);
+    writeLocalJobFile(localTaskFile, job);
+    task.setJobFile(localTaskFile.toString());
+    task.setConf(job);
+    return job;
+  }
+
+  /**
+   * Set up the DistributedCache related configs to make
+   * {@link DistributedCache#getLocalCacheFiles(Configuration)}
+   * and
+   * {@link DistributedCache#getLocalCacheArchives(Configuration)}
+   * working.
+   * @param job
+   * @throws IOException
+   */
+  private static void setupDistributedCacheConfig(final JobConf job)
+      throws IOException {
+
+    String localWorkDir = System.getenv("PWD");
+    //        ^ ^ all symlinks are created in the current work-dir
+
+    // Update the configuration object with localized archives.
+    URI[] cacheArchives = DistributedCache.getCacheArchives(job);
+    if (cacheArchives != null) {
+      List<String> localArchives = new ArrayList<String>();
+      for (int i = 0; i < cacheArchives.length; ++i) {
+        URI u = cacheArchives[i];
+        Path p = new Path(u);
+        Path name =
+            new Path((null == u.getFragment()) ? p.getName()
+                : u.getFragment());
+        String linkName = name.toUri().getPath();
+        localArchives.add(new Path(localWorkDir, linkName).toUri().getPath());
+      }
+      if (!localArchives.isEmpty()) {
+        job.set(MRJobConfig.CACHE_LOCALARCHIVES, StringUtils
+            .arrayToString(localArchives.toArray(new String[localArchives
+                .size()])));
+      }
+    }
+
+    // Update the configuration object with localized files.
+    URI[] cacheFiles = DistributedCache.getCacheFiles(job);
+    if (cacheFiles != null) {
+      List<String> localFiles = new ArrayList<String>();
+      for (int i = 0; i < cacheFiles.length; ++i) {
+        URI u = cacheFiles[i];
+        Path p = new Path(u);
+        Path name =
+            new Path((null == u.getFragment()) ? p.getName()
+                : u.getFragment());
+        String linkName = name.toUri().getPath();
+        localFiles.add(new Path(localWorkDir, linkName).toUri().getPath());
+      }
+      if (!localFiles.isEmpty()) {
+        job.set(MRJobConfig.CACHE_LOCALFILES,
+            StringUtils.arrayToString(localFiles
+                .toArray(new String[localFiles.size()])));
+      }
+    }
+  }
+
+  private static final FsPermission urw_gr =
+    FsPermission.createImmutable((short) 0640);
+
+  /**
+   * Write the task specific job-configuration file.
+   * @throws IOException
+   */
+  private static void writeLocalJobFile(Path jobFile, JobConf conf)
+      throws IOException {
+    FileSystem localFs = FileSystem.getLocal(conf);
+    localFs.delete(jobFile);
+    OutputStream out = null;
+    try {
+      out = FileSystem.create(localFs, jobFile, urw_gr);
+      conf.writeXml(out);
+    } finally {
+      IOUtils.cleanup(LOG, out);
+    }
+  }
+
+}

Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapred/YarnOutputFiles.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapred/YarnOutputFiles.java?rev=1376283&view=auto
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapred/YarnOutputFiles.java (added)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapred/YarnOutputFiles.java Wed Aug 22 22:11:39 2012
@@ -0,0 +1,238 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.MRConfig;
+
+/**
+ * Manipulate the working area for the transient store for maps and reduces.
+ *
+ * This class is used by map and reduce tasks to identify the directories that
+ * they need to write to/read from for intermediate files. The callers of
+ * these methods are from child space.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class YarnOutputFiles extends MapOutputFile {
+
+  private JobConf conf;
+
+  private static final String JOB_OUTPUT_DIR = "output";
+  private static final String SPILL_FILE_PATTERN = "%s_spill_%d.out";
+  private static final String SPILL_INDEX_FILE_PATTERN = SPILL_FILE_PATTERN
+      + ".index";
+
+  public YarnOutputFiles() {
+  }
+
+  // assume configured to $localdir/usercache/$user/appcache/$appId
+  private LocalDirAllocator lDirAlloc = 
+    new LocalDirAllocator(MRConfig.LOCAL_DIR);
+
+  private Path getAttemptOutputDir() {
+    return new Path(JOB_OUTPUT_DIR, conf.get(JobContext.TASK_ATTEMPT_ID));
+  }
+  
+  /**
+   * Return the path to local map output file created earlier
+   * 
+   * @return path
+   * @throws IOException
+   */
+  public Path getOutputFile() throws IOException {
+    Path attemptOutput =
+      new Path(getAttemptOutputDir(), MAP_OUTPUT_FILENAME_STRING);
+    return lDirAlloc.getLocalPathToRead(attemptOutput.toString(), conf);
+  }
+
+  /**
+   * Create a local map output file name.
+   * 
+   * @param size the size of the file
+   * @return path
+   * @throws IOException
+   */
+  public Path getOutputFileForWrite(long size) throws IOException {
+    Path attemptOutput = 
+      new Path(getAttemptOutputDir(), MAP_OUTPUT_FILENAME_STRING);
+    return lDirAlloc.getLocalPathForWrite(attemptOutput.toString(), size, conf);
+  }
+
+  /**
+   * Create a local map output file name on the same volume.
+   */
+  public Path getOutputFileForWriteInVolume(Path existing) {
+    Path outputDir = new Path(existing.getParent(), JOB_OUTPUT_DIR);
+    Path attemptOutputDir = new Path(outputDir,
+        conf.get(JobContext.TASK_ATTEMPT_ID));
+    return new Path(attemptOutputDir, MAP_OUTPUT_FILENAME_STRING);
+  }
+
+  /**
+   * Return the path to a local map output index file created earlier
+   * 
+   * @return path
+   * @throws IOException
+   */
+  public Path getOutputIndexFile() throws IOException {
+    Path attemptIndexOutput =
+      new Path(getAttemptOutputDir(), MAP_OUTPUT_FILENAME_STRING +
+                                      MAP_OUTPUT_INDEX_SUFFIX_STRING);
+    return lDirAlloc.getLocalPathToRead(attemptIndexOutput.toString(), conf);
+  }
+
+  /**
+   * Create a local map output index file name.
+   * 
+   * @param size the size of the file
+   * @return path
+   * @throws IOException
+   */
+  public Path getOutputIndexFileForWrite(long size) throws IOException {
+    Path attemptIndexOutput =
+      new Path(getAttemptOutputDir(), MAP_OUTPUT_FILENAME_STRING +
+                                      MAP_OUTPUT_INDEX_SUFFIX_STRING);
+    return lDirAlloc.getLocalPathForWrite(attemptIndexOutput.toString(),
+        size, conf);
+  }
+
+  /**
+   * Create a local map output index file name on the same volume.
+   */
+  public Path getOutputIndexFileForWriteInVolume(Path existing) {
+    Path outputDir = new Path(existing.getParent(), JOB_OUTPUT_DIR);
+    Path attemptOutputDir = new Path(outputDir,
+        conf.get(JobContext.TASK_ATTEMPT_ID));
+    return new Path(attemptOutputDir, MAP_OUTPUT_FILENAME_STRING +
+                                      MAP_OUTPUT_INDEX_SUFFIX_STRING);
+  }
+
+  /**
+   * Return a local map spill file created earlier.
+   * 
+   * @param spillNumber the number
+   * @return path
+   * @throws IOException
+   */
+  public Path getSpillFile(int spillNumber) throws IOException {
+    return lDirAlloc.getLocalPathToRead(
+        String.format(SPILL_FILE_PATTERN,
+            conf.get(JobContext.TASK_ATTEMPT_ID), spillNumber), conf);
+  }
+
+  /**
+   * Create a local map spill file name.
+   * 
+   * @param spillNumber the number
+   * @param size the size of the file
+   * @return path
+   * @throws IOException
+   */
+  public Path getSpillFileForWrite(int spillNumber, long size)
+      throws IOException {
+    return lDirAlloc.getLocalPathForWrite(
+        String.format(String.format(SPILL_FILE_PATTERN,
+            conf.get(JobContext.TASK_ATTEMPT_ID), spillNumber)), size, conf);
+  }
+
+  /**
+   * Return a local map spill index file created earlier
+   * 
+   * @param spillNumber the number
+   * @return path
+   * @throws IOException
+   */
+  public Path getSpillIndexFile(int spillNumber) throws IOException {
+    return lDirAlloc.getLocalPathToRead(
+        String.format(SPILL_INDEX_FILE_PATTERN,
+            conf.get(JobContext.TASK_ATTEMPT_ID), spillNumber), conf);
+  }
+
+  /**
+   * Create a local map spill index file name.
+   * 
+   * @param spillNumber the number
+   * @param size the size of the file
+   * @return path
+   * @throws IOException
+   */
+  public Path getSpillIndexFileForWrite(int spillNumber, long size)
+      throws IOException {
+    return lDirAlloc.getLocalPathForWrite(
+        String.format(SPILL_INDEX_FILE_PATTERN,
+            conf.get(JobContext.TASK_ATTEMPT_ID), spillNumber), size, conf);
+  }
+
+  /**
+   * Return a local reduce input file created earlier
+   * 
+   * @param mapId a map task id
+   * @return path
+   * @throws IOException 
+   */
+  public Path getInputFile(int mapId) throws IOException {
+    throw new UnsupportedOperationException("Incompatible with LocalRunner");
+  }
+
+  /**
+   * Create a local reduce input file name.
+   * 
+   * @param mapId a map task id
+   * @param size the size of the file
+   * @return path
+   * @throws IOException
+   */
+  public Path getInputFileForWrite(org.apache.hadoop.mapreduce.TaskID mapId,
+      long size) throws IOException {
+    return lDirAlloc.getLocalPathForWrite(String.format(
+        REDUCE_INPUT_FILE_FORMAT_STRING,
+        getAttemptOutputDir().toString(), mapId.getId()),
+        size, conf);
+  }
+
+  /** Removes all of the files related to a task. */
+  public void removeAll() throws IOException {
+    throw new UnsupportedOperationException("Incompatible with LocalRunner");
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    if (conf instanceof JobConf) {
+      this.conf = (JobConf) conf;
+    } else {
+      this.conf = new JobConf(conf);
+    }
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+  
+}

Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ContainerHeartbeatHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ContainerHeartbeatHandler.java?rev=1376283&view=auto
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ContainerHeartbeatHandler.java (added)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ContainerHeartbeatHandler.java Wed Aug 22 22:11:39 2012
@@ -0,0 +1,45 @@
+package org.apache.hadoop.mapreduce.jobhistory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.v2.app2.AppContext;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerEvent;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerEventType;
+import org.apache.hadoop.yarn.Clock;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.event.EventHandler;
+
+public class ContainerHeartbeatHandler extends
+    HeartbeatHandlerBase<ContainerId> {
+
+ 
+  public ContainerHeartbeatHandler(AppContext context, 
+      int numThreads) {
+    super(context, numThreads, "ContainerHeartbeatHandler");
+  }
+
+  @Override
+  protected int getConfiguredTimeout(Configuration conf) {
+    // TODO Maybe define separate timeouts for Containers and tasks.
+    return conf.getInt(MRJobConfig.TASK_TIMEOUT, 5 * 60 * 1000);
+  }
+
+  @Override
+  protected int getConfiguredTimeoutCheckInterval(Configuration conf) {
+    return conf.getInt(MRJobConfig.TASK_TIMEOUT_CHECK_INTERVAL_MS, 30 * 1000);
+  }
+
+  @Override
+  public boolean hasTimedOut(ReportTime report, long currentTime) {
+    return (timeOut > 0) && (currentTime > report.getLastPing() + timeOut);
+
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public void handleTimeOut(ContainerId containerId) {
+    eventHandler.handle(new AMContainerEvent(containerId,
+        AMContainerEventType.C_TIMED_OUT));
+  }
+
+}

Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/jobhistory/HeartbeatHandlerBase.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/jobhistory/HeartbeatHandlerBase.java?rev=1376283&view=auto
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/jobhistory/HeartbeatHandlerBase.java (added)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/jobhistory/HeartbeatHandlerBase.java Wed Aug 22 22:11:39 2012
@@ -0,0 +1,150 @@
+package org.apache.hadoop.mapreduce.jobhistory;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.v2.app2.AppContext;
+import org.apache.hadoop.yarn.Clock;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.service.AbstractService;
+
+public abstract class HeartbeatHandlerBase<T> extends AbstractService {
+
+
+  protected int timeOut = 5 * 60 * 1000;// 5 mins
+  protected int timeOutCheckInterval = 30 * 1000; // 30 seconds.
+  protected Thread timeOutCheckerThread;
+  private final String name;
+  
+  @SuppressWarnings("rawtypes")
+  protected final EventHandler eventHandler;
+  protected final Clock clock;
+  protected final AppContext appContext;
+  
+  private ConcurrentMap<T, ReportTime> runningMap;
+  private volatile boolean stopped;
+  
+  public HeartbeatHandlerBase(AppContext appContext,int numThreads, String name) {
+    super(name);
+    this.name = name;
+    this.eventHandler = appContext.getEventHandler();
+    this.clock = appContext.getClock();
+    this.appContext = appContext;
+    this.runningMap = new ConcurrentHashMap<T, HeartbeatHandlerBase.ReportTime>();
+  }
+  
+  @Override
+  public void init(Configuration conf) {
+    super.init(conf);
+    // TODO XXX: TaskTimeout / ContainerTimeOut
+    timeOut = getConfiguredTimeout(conf);
+    timeOutCheckInterval = getConfiguredTimeoutCheckInterval(conf);
+  }
+
+  @Override
+  public void start() {
+    timeOutCheckerThread = new Thread(createPingChecker());
+    timeOutCheckerThread.setName(name + " PingChecker");
+    timeOutCheckerThread.start();
+    super.start();
+  }
+
+  @Override
+  public void stop() {
+    stopped = true;
+    timeOutCheckerThread.interrupt();
+    super.stop();
+  }
+  
+  protected Runnable createPingChecker() {
+    return new PingChecker();
+  }
+  protected abstract int getConfiguredTimeout(Configuration conf);
+  protected abstract int getConfiguredTimeoutCheckInterval(Configuration conf);
+  
+  public void progressing(T id) {
+    ReportTime time = runningMap.get(id);
+    if (time != null) {
+      time.setLastProgress(clock.getTime());
+    }
+  }
+  
+  public void pinged(T id) {
+    ReportTime time = runningMap.get(id);
+    if (time != null) {
+      time.setLastPing(clock.getTime());
+    }
+  }
+  
+  public void register(T id) {
+    runningMap.put(id, new ReportTime(clock.getTime()));
+  }
+  
+  public void unregister(T id) {
+    runningMap.remove(id);
+  }
+  
+  
+  
+  protected static class ReportTime {
+    private long lastPing;
+    private long lastProgress;
+    
+    public ReportTime(long time) {
+      setLastProgress(time);
+    }
+    
+    public synchronized void setLastPing(long time) {
+      lastPing = time;
+    }
+    
+    public synchronized void setLastProgress(long time) {
+      lastProgress = time;
+      lastPing = time;
+    }
+    
+    public synchronized long getLastPing() {
+      return lastPing;
+    }
+    
+    public synchronized long getLastProgress() {
+      return lastProgress;
+    }
+  }
+  
+  public abstract boolean hasTimedOut(ReportTime report, long currentTime);
+  
+  public abstract void handleTimeOut(T t);
+  
+  private class PingChecker implements Runnable {
+
+    @Override
+    public void run() {
+      while (!stopped && !Thread.currentThread().isInterrupted()) {
+        Iterator<Map.Entry<T, ReportTime>> iterator =
+            runningMap.entrySet().iterator();
+
+        // avoid calculating current time everytime in loop
+        long currentTime = clock.getTime();
+
+        while (iterator.hasNext()) {
+          Map.Entry<T, ReportTime> entry = iterator.next();    
+          if(hasTimedOut(entry.getValue(), currentTime)) {
+            // Timed out. Removed from list and send out an event.
+            iterator.remove();
+            handleTimeOut(entry.getKey());
+          }
+        }
+        try {
+          Thread.sleep(timeOutCheckInterval);
+        } catch (InterruptedException e) {
+          break;
+        }
+      }
+    }
+  }
+  
+}

Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEvent.java?rev=1376283&view=auto
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEvent.java (added)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEvent.java Wed Aug 22 22:11:39 2012
@@ -0,0 +1,42 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.jobhistory;
+
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.yarn.event.AbstractEvent;
+
+public class JobHistoryEvent extends AbstractEvent<EventType>{
+
+  private final JobId jobID;
+  private final HistoryEvent historyEvent;
+
+  public JobHistoryEvent(JobId jobID, HistoryEvent historyEvent) {
+    super(historyEvent.getEventType());
+    this.jobID = jobID;
+    this.historyEvent = historyEvent;
+  }
+
+  public JobId getJobID() {
+    return jobID;
+  }
+
+  public HistoryEvent getHistoryEvent() {
+    return historyEvent;
+  }
+}



Mime
View raw message