hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject svn commit: r1388591 - in /hadoop/common/branches/MR-3902/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/ hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/...
Date Fri, 21 Sep 2012 18:07:24 GMT
Author: sseth
Date: Fri Sep 21 18:07:22 2012
New Revision: 1388591

URL: http://svn.apache.org/viewvc?rev=1388591&view=rev
Log:
MAPREDUCE-4618. Re-wire LocalContainerAllocator/UberAM (sseth)

Added:
    hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapred/LocalContainerAllocator.java
    hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/local/LocalContainerRequestor.java
    hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/ContainerRequestor.java
Removed:
    hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java
    hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/local/LocalContainerAllocator.java
Modified:
    hadoop/common/branches/MR-3902/hadoop-mapreduce-project/CHANGES.txt.MR-3902
    hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java
    hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl2.java
    hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ContainerHeartbeatHandler.java
    hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/jobhistory/HeartbeatHandlerBase.java
    hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/MRAppMaster.java
    hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/TaskHeartbeatHandler.java
    hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/event/TaskAttemptRemoteStartEvent.java
    hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMCommunicator.java
    hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerRequestor.java
    hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/MRApp.java
    hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestStagingCleanup.java

Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/CHANGES.txt.MR-3902
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/CHANGES.txt.MR-3902?rev=1388591&r1=1388590&r2=1388591&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/CHANGES.txt.MR-3902 (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/CHANGES.txt.MR-3902 Fri Sep 21 18:07:22 2012
@@ -18,3 +18,5 @@ Branch MR-3902
   MAPREDUCE-4626. Fix and re-enable RMContainerAllocator unit tests (sseth)
 
   MAPREDUCE-4617. Re-wire AM Recovery (sseth)
+
+  MAPREDUCE-4618. Re-wire LocalContainerAllocator/UberAM (sseth)

Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java?rev=1388591&r1=1388590&r2=1388591&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java Fri Sep 21 18:07:22 2012
@@ -19,7 +19,6 @@
 package org.apache.hadoop.mapreduce.v2.app.local;
 
 import java.util.ArrayList;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -62,7 +61,6 @@ public class LocalContainerAllocator ext
 
   @SuppressWarnings("rawtypes")
   private final EventHandler eventHandler;
-  private AtomicInteger containerCount = new AtomicInteger();
   private long retryInterval;
   private long retrystartTime;
   private String nmHost;

Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapred/LocalContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapred/LocalContainerAllocator.java?rev=1388591&view=auto
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapred/LocalContainerAllocator.java (added)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapred/LocalContainerAllocator.java Fri Sep 21 18:07:22 2012
@@ -0,0 +1,556 @@
+/**
+s* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapred;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.HashSet;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FSError;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.UnsupportedFileSystemException;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobCounter;
+import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
+import org.apache.hadoop.mapreduce.v2.app2.AppContext;
+import org.apache.hadoop.mapreduce.v2.app2.TaskAttemptListener;
+import org.apache.hadoop.mapreduce.v2.app2.job.Job;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.JobCounterUpdateEvent;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.JobEvent;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.JobEventType;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventTerminated;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptRemoteStartEvent;
+import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerEvent;
+import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerTALaunchRequestEvent;
+import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerTAStopRequestEvent;
+import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerTASucceededEvent;
+import org.apache.hadoop.mapreduce.v2.app2.rm.ContainerAllocator;
+import org.apache.hadoop.mapreduce.v2.app2.rm.RMCommunicator;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.hadoop.yarn.util.Records;
+
+/**
+ * Allocates containers locally. Doesn't allocate a real container;
+ * instead sends an allocated event for all requests.
+ */
+// TODO (Post-3902): Maybe implement this via an InterceptingHandler - like Recovery.
+public class LocalContainerAllocator extends AbstractService
+    implements ContainerAllocator {
+
+  private static final Log LOG =
+      LogFactory.getLog(LocalContainerAllocator.class);
+  private static final File curDir = new File(".");
+
+  @SuppressWarnings("rawtypes")
+  private final EventHandler eventHandler;
+  private final NodeId nodeId;
+  private final int nmHttpPort;
+  private final ContainerId amContainerId;
+  private final TaskUmbilicalProtocol umbilical;
+  private FileContext curFC = null;
+  private final HashSet<File> localizedFiles;
+  private final JobId jobId;
+  private final AppContext appContext;
+  private final TaskAttemptListener taskAttemptListenern;
+  private final RMCommunicator rmCommunicator;
+  
+  private BlockingQueue<AMSchedulerEvent> eventQueue =
+      new LinkedBlockingQueue<AMSchedulerEvent>();
+  private Thread eventHandlingThread;
+  private boolean stopEventHandling = false;
+
+  public LocalContainerAllocator(AppContext appContext, JobId jobId,
+      String nmHost, int nmPort, int nmHttpPort, ContainerId cId,
+      TaskUmbilicalProtocol taskUmbilical,
+      TaskAttemptListener taskAttemptListener, RMCommunicator rmComm) {
+    super(LocalContainerAllocator.class.getSimpleName());
+    this.appContext = appContext;
+    this.eventHandler = appContext.getEventHandler();
+    this.nodeId = BuilderUtils.newNodeId(nmHost, nmPort);
+    this.nmHttpPort = nmHttpPort;
+    this.amContainerId = cId;
+    this.umbilical = taskUmbilical;
+    this.jobId = jobId;
+    this.taskAttemptListenern = taskAttemptListener;
+    this.rmCommunicator = rmComm;
+    // umbilical:  MRAppMaster creates (taskAttemptListener), passes to us
+    // (TODO/FIXME:  pointless to use RPC to talk to self; should create
+    // LocalTaskAttemptListener or similar:  implement umbilical protocol
+    // but skip RPC stuff)
+    
+    try {
+      curFC = FileContext.getFileContext(curDir.toURI());
+    } catch (UnsupportedFileSystemException ufse) {
+      LOG.error("Local filesystem " + curDir.toURI().toString()
+                + " is unsupported?? (should never happen)");
+    }
+
+    // Save list of files/dirs that are supposed to be present so can delete
+    // any extras created by one task before starting subsequent task.  Note
+    // that there's no protection against deleted or renamed localization;
+    // users who do that get what they deserve (and will have to disable
+    // uberization in order to run correctly).
+    File[] curLocalFiles = curDir.listFiles();
+    localizedFiles = new HashSet<File>(curLocalFiles.length);
+    for (int j = 0; j < curLocalFiles.length; ++j) {
+      localizedFiles.add(curLocalFiles[j]);
+    }
+
+    // Relocalization note/future FIXME (per chrisdo, 20110315):  At moment,
+    // full localization info is in AppSubmissionContext passed from client to
+    // RM and then to NM for AM-container launch:  no difference between AM-
+    // localization and MapTask- or ReduceTask-localization, so can assume all
+    // OK.  Longer-term, will need to override uber-AM container-localization
+    // request ("needed resources") with union of regular-AM-resources + task-
+    // resources (and, if maps and reduces ever differ, then union of all three
+    // types), OR will need localizer service/API that uber-AM can request
+    // after running (e.g., "localizeForTask()" or "localizeForMapTask()").
+  }
+
+  @Override
+  public void start() {
+    this.eventHandlingThread = new Thread() {
+      @SuppressWarnings("unchecked")
+      @Override
+      public void run() {
+
+        AMSchedulerEvent event;
+
+        while (!stopEventHandling && !Thread.currentThread().isInterrupted()) {
+          try {
+            event = LocalContainerAllocator.this.eventQueue.take();
+          } catch (InterruptedException e) {
+            LOG.error("Returning, interrupted : " + e);
+            return;
+          }
+
+          try {
+            handleEvent(event);
+          } catch (Throwable t) {
+            LOG.error("Error in handling event type " + event.getType()
+                + " to the LocalContainreAllocator", t);
+            // Kill the AM.
+            eventHandler.handle(new JobEvent(jobId, JobEventType.INTERNAL_ERROR));
+            return;
+          }
+        }
+      }
+    };
+    eventHandlingThread.start();
+    super.start();
+  }
+
+  @Override
+  public void stop() {
+    if (eventHandlingThread != null) {
+      eventHandlingThread.interrupt();
+    }
+    super.stop();
+  }
+
+  @Override
+  public void handle(AMSchedulerEvent event) {
+    try {
+      eventQueue.put(event);
+    } catch (InterruptedException e) {
+      throw new YarnException(e);  // FIXME? YarnException is "for runtime exceptions only"
+    }
+  }
+  
+  // Alternate Options
+  // - Fake containerIds for each and every container. (AMContaienr uses JvmID instead ?)
+  // - Since single threaded, works very well with the current flow. Normal flow via containers.
+  // - Change container to maintain a list of pending tasks.
+
+  // Currently, AMContainer and AMNode are short-circuited. Scheduler sends
+  // events directly to the task, whcih sends events back to the scheduler.
+
+  private boolean doneWithMaps = false;
+  private int finishedSubMaps = 0;
+  
+  @SuppressWarnings("unchecked")
+  void handleTaLaunchRequest(AMSchedulerTALaunchRequestEvent event) {
+    
+    LOG.info("Processing the event: " + event);
+    Container container = BuilderUtils.newContainer(amContainerId, nodeId,
+        this.nodeId.getHost() + ":" + nmHttpPort,
+        Records.newRecord(Resource.class), Records.newRecord(Priority.class),
+        null);
+
+    appContext.getAllContainers().addContainerIfNew(container);
+    appContext.getAllNodes().nodeSeen(nodeId);
+    
+    // Register a JVMId, so that the TAL can handle pings etc. 
+    WrappedJvmID jvmId = new WrappedJvmID(TypeConverter.fromYarn(jobId), event
+        .getAttemptID().getTaskId().getTaskType() == TaskType.MAP,
+        amContainerId.getId());
+    taskAttemptListenern.registerRunningJvm(jvmId, amContainerId);
+    
+
+    if (event.getAttemptID().getTaskId().getTaskType() == TaskType.MAP) {
+      JobCounterUpdateEvent jce = new JobCounterUpdateEvent(event
+          .getAttemptID().getTaskId().getJobId());
+      // TODO Setting OTHER_LOCAL_MAP for now.
+      jce.addCounterUpdate(JobCounter.OTHER_LOCAL_MAPS, 1);
+      eventHandler.handle(jce);
+    }
+
+    AMSchedulerTALaunchRequestEvent launchEv = (AMSchedulerTALaunchRequestEvent) event;
+    TaskAttemptId attemptID = launchEv.getTaskAttempt().getID();
+
+    Job job = appContext.getJob(jobId);
+    int numMapTasks = job.getTotalMaps();
+    int numReduceTasks = job.getTotalReduces();
+
+    // YARN (tracking) Task:
+    org.apache.hadoop.mapreduce.v2.app2.job.Task ytask = job.getTask(attemptID
+        .getTaskId());
+    // classic mapred Task:
+    org.apache.hadoop.mapred.Task remoteTask = launchEv.getRemoteTask();
+
+    // There is no port number because we are not really talking to a task
+    // tracker. The shuffle is just done through local files. So the
+    // port number is set to -1 in this case.
+
+    appContext.getEventHandler().handle(
+        new TaskAttemptRemoteStartEvent(attemptID, amContainerId,
+            rmCommunicator.getApplicationAcls(), -1));
+
+    if (numMapTasks == 0) {
+      doneWithMaps = true;
+    }
+
+    try {
+      if (remoteTask.isMapOrReduce())
+        if (attemptID.getTaskId().getTaskType() == TaskType.MAP
+            || attemptID.getTaskId().getTaskType() == TaskType.REDUCE) {
+          JobCounterUpdateEvent jce = new JobCounterUpdateEvent(attemptID
+              .getTaskId().getJobId());
+          jce.addCounterUpdate(JobCounter.TOTAL_LAUNCHED_UBERTASKS, 1);
+          if (remoteTask.isMapTask()) {
+            jce.addCounterUpdate(JobCounter.NUM_UBER_SUBMAPS, 1);
+          } else {
+            jce.addCounterUpdate(JobCounter.NUM_UBER_SUBREDUCES, 1);
+          }
+          appContext.getEventHandler().handle(jce);
+        }
+      
+      // Register the sub-task with TaskAttemptListener.
+      taskAttemptListenern.registerTaskAttempt(event.getAttemptID(), jvmId);
+      runSubtask(remoteTask, ytask.getType(), attemptID, numMapTasks,
+          (numReduceTasks > 0));
+    } catch (RuntimeException re) {
+      JobCounterUpdateEvent jce = new JobCounterUpdateEvent(attemptID
+          .getTaskId().getJobId());
+      jce.addCounterUpdate(JobCounter.NUM_FAILED_UBERTASKS, 1);
+      appContext.getEventHandler().handle(jce);
+      // this is our signal that the subtask failed in some way, so
+      // simulate a failed JVM/container and send a container-completed
+      // event to task attempt (i.e., move state machine from RUNNING
+      // to FAILED [and ultimately to FAILED])
+
+      // CLEANUP event generated.f
+      appContext.getEventHandler().handle(
+          new TaskAttemptEventTerminated(attemptID));
+
+    } catch (IOException ioe) {
+      // if umbilical itself barfs (in error-handler of runSubMap()),
+      // we're pretty much hosed, so do what YarnChild main() does
+      // (i.e., exit clumsily--but can never happen, so no worries!)
+      LOG.fatal("oopsie...  this can never happen: "
+          + StringUtils.stringifyException(ioe));
+      System.exit(-1);
+    }
+  }
+  
+  @SuppressWarnings("unchecked")
+  public void handleTaStopRequest(AMSchedulerTAStopRequestEvent sEvent) {
+    // Implies a failed or killed task.
+    // This will trigger a CLEANUP event. UberAM is supposed to fail if there's
+    // event a single failed attempt. Hence the CLEANUP is OK (otherwise delay
+    // cleanup till end of job). TODO Enforce job failure on single task attempt
+    // failure.
+    appContext.getEventHandler().handle(
+        new TaskAttemptEventTerminated(sEvent.getAttemptID()));
+    taskAttemptListenern.unregisterTaskAttempt(sEvent.getAttemptID());
+  }
+
+  @SuppressWarnings("unchecked")
+  public void handleTaSucceededRequest(AMSchedulerTASucceededEvent sEvent) {
+    // Successful taskAttempt.
+    // Same CLEANUP comment as handleTaStopRequest
+    appContext.getEventHandler().handle(
+        new TaskAttemptEventTerminated(sEvent.getAttemptID()));
+    taskAttemptListenern.unregisterTaskAttempt(sEvent.getAttemptID());
+  }
+
+  public void handleEvent(AMSchedulerEvent sEvent) {
+    LOG.info("Processing the event " + sEvent.toString());
+    switch (sEvent.getType()) {
+    case S_TA_LAUNCH_REQUEST:
+      handleTaLaunchRequest((AMSchedulerTALaunchRequestEvent) sEvent);
+      break;
+    case S_TA_STOP_REQUEST: // Effectively means a failure.
+      handleTaStopRequest((AMSchedulerTAStopRequestEvent) sEvent);
+      break;
+    case S_TA_SUCCEEDED:
+      handleTaSucceededRequest((AMSchedulerTASucceededEvent) sEvent);
+      break;
+    default:
+      LOG.warn("Invalid event type for LocalContainerAllocator: "
+          + sEvent.getType() + ", Event: " + sEvent);
+    }
+  }
+  
+  private class SubtaskRunner implements Runnable {
+    SubtaskRunner() {
+    }
+
+    @Override
+    public void run() {
+    }
+
+    private void runSubtask(org.apache.hadoop.mapred.Task task,
+                            final TaskType taskType,
+                            TaskAttemptId attemptID,
+                            final int numMapTasks,
+                            boolean renameOutputs)
+    throws RuntimeException, IOException {
+      org.apache.hadoop.mapred.TaskAttemptID classicAttemptID =
+          TypeConverter.fromYarn(attemptID);
+
+      try {
+        JobConf conf = new JobConf(getConfig());
+        conf.set(JobContext.TASK_ID, task.getTaskID().toString());
+        conf.set(JobContext.TASK_ATTEMPT_ID, classicAttemptID.toString());
+        conf.setBoolean(JobContext.TASK_ISMAP, (taskType == TaskType.MAP));
+        conf.setInt(JobContext.TASK_PARTITION, task.getPartition());
+        conf.set(JobContext.ID, task.getJobID().toString());
+
+        // Use the AM's local dir env to generate the intermediate step 
+        // output files
+        String[] localSysDirs = StringUtils.getTrimmedStrings(
+            System.getenv(ApplicationConstants.LOCAL_DIR_ENV));
+        conf.setStrings(MRConfig.LOCAL_DIR, localSysDirs);
+        LOG.info(MRConfig.LOCAL_DIR + " for uber task: "
+            + conf.get(MRConfig.LOCAL_DIR));
+
+        // mark this as an uberized subtask so it can set task counter
+        // (longer-term/FIXME:  could redefine as job counter and send
+        // "JobCounterEvent" to JobImpl on [successful] completion of subtask;
+        // will need new Job state-machine transition and JobImpl jobCounters
+        // map to handle)
+        conf.setBoolean("mapreduce.task.uberized", true);
+
+        // META-FIXME: do we want the extra sanity-checking (doneWithMaps,
+        // etc.), or just assume/hope the state machine(s) and uber-AM work
+        // as expected?
+        if (taskType == TaskType.MAP) {
+          if (doneWithMaps) {
+            LOG.error("CONTAINER_REMOTE_LAUNCH contains a map task ("
+                      + attemptID + "), but should be finished with maps");
+            throw new RuntimeException();
+          }
+
+          MapTask map = (MapTask)task;
+          map.setConf(conf);
+
+          map.run(conf, umbilical);
+
+          if (renameOutputs) {
+            renameMapOutputForReduce(conf, attemptID, map.getMapOutputFile());
+          }
+          relocalize();
+
+          if (++finishedSubMaps == numMapTasks) {
+            doneWithMaps = true;
+          }
+
+        } else /* TaskType.REDUCE */ {
+
+          if (!doneWithMaps) {
+            // check if event-queue empty?  whole idea of counting maps vs. 
+            // checking event queue is a tad wacky...but could enforce ordering
+            // (assuming no "lost events") at LocalMRAppMaster [CURRENT BUG(?): 
+            // doesn't send reduce event until maps all done]
+            LOG.error("CONTAINER_REMOTE_LAUNCH contains a reduce task ("
+                      + attemptID + "), but not yet finished with maps");
+            throw new RuntimeException();
+          }
+
+          // a.k.a. "mapreduce.jobtracker.address" in LocalJobRunner:
+          // set framework name to local to make task local
+          conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.LOCAL_FRAMEWORK_NAME);
+          conf.set(MRConfig.MASTER_ADDRESS, "local");  // bypass shuffle
+
+          ReduceTask reduce = (ReduceTask)task;
+          reduce.setConf(conf);          
+
+          reduce.run(conf, umbilical);
+          //relocalize();  // needed only if more than one reducer supported (is MAPREDUCE-434 fixed yet?)
+        }
+
+      } catch (FSError e) {
+        LOG.fatal("FSError from child", e);
+        // umbilical:  MRAppMaster creates (taskAttemptListener), passes to us
+        umbilical.fsError(classicAttemptID, e.getMessage());
+        throw new RuntimeException();
+
+      } catch (Exception exception) {
+        LOG.warn("Exception running local (uberized) 'child' : "
+            + StringUtils.stringifyException(exception));
+        try {
+          if (task != null) {
+            // do cleanup for the task
+            task.taskCleanup(umbilical);
+          }
+        } catch (Exception e) {
+          LOG.info("Exception cleaning up: "
+              + StringUtils.stringifyException(e));
+        }
+        // Report back any failures, for diagnostic purposes
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        exception.printStackTrace(new PrintStream(baos));
+        umbilical.reportDiagnosticInfo(classicAttemptID, baos.toString());
+        throw new RuntimeException();
+
+      } catch (Throwable throwable) {
+        LOG.fatal("Error running local (uberized) 'child' : "
+            + StringUtils.stringifyException(throwable));
+        Throwable tCause = throwable.getCause();
+        String cause = (tCause == null)
+            ? throwable.getMessage()
+                : StringUtils.stringifyException(tCause);
+            umbilical.fatalError(classicAttemptID, cause);
+        throw new RuntimeException();
+      }
+    }
+
+    /**
+     * Within the _local_ filesystem (not HDFS), all activity takes place within
+     * a single subdir (${local.dir}/usercache/$user/appcache/$appId/$contId/),
+     * and all sub-MapTasks create the same filename ("file.out").  Rename that
+     * to something unique (e.g., "map_0.out") to avoid collisions.
+     *
+     * Longer-term, we'll modify [something] to use TaskAttemptID-based
+     * filenames instead of "file.out". (All of this is entirely internal,
+     * so there are no particular compatibility issues.)
+     */
+    private void renameMapOutputForReduce(JobConf conf, TaskAttemptId mapId,
+                                          MapOutputFile subMapOutputFile)
+    throws IOException {
+      FileSystem localFs = FileSystem.getLocal(conf);
+      // move map output to reduce input
+      Path mapOut = subMapOutputFile.getOutputFile();
+      FileStatus mStatus = localFs.getFileStatus(mapOut);      
+      Path reduceIn = subMapOutputFile.getInputFileForWrite(
+          TypeConverter.fromYarn(mapId).getTaskID(), mStatus.getLen());
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Renaming map output file for task attempt "
+            + mapId.toString() + " from original location " + mapOut.toString()
+            + " to destination " + reduceIn.toString());
+      }
+      if (!localFs.mkdirs(reduceIn.getParent())) {
+        throw new IOException("Mkdirs failed to create "
+            + reduceIn.getParent().toString());
+      }
+      if (!localFs.rename(mapOut, reduceIn))
+        throw new IOException("Couldn't rename " + mapOut);
+    }
+
+    /**
+     * Also within the local filesystem, we need to restore the initial state
+     * of the directory as much as possible.  Compare current contents against
+     * the saved original state and nuke everything that doesn't belong, with
+     * the exception of the renamed map outputs.
+     *
+     * Any jobs that go out of their way to rename or delete things from the
+     * local directory are considered broken and deserve what they get...
+     */
+    private void relocalize() {
+      File[] curLocalFiles = curDir.listFiles();
+      for (int j = 0; j < curLocalFiles.length; ++j) {
+        if (!localizedFiles.contains(curLocalFiles[j])) {
+          // found one that wasn't there before:  delete it
+          boolean deleted = false;
+          try {
+            if (curFC != null) {
+              // this is recursive, unlike File delete():
+              deleted = curFC.delete(new Path(curLocalFiles[j].getName()),true);
+            }
+          } catch (IOException e) {
+            deleted = false;
+          }
+          if (!deleted) {
+            LOG.warn("Unable to delete unexpected local file/dir "
+                + curLocalFiles[j].getName() + ": insufficient permissions?");
+          }
+        }
+      }
+    }
+
+  } // end SubtaskRunner
+
+  
+  // _must_ either run subtasks sequentially or accept expense of new JVMs
+  // (i.e., fork()), else will get weird failures when maps try to create/
+  // write same dirname or filename:  no chdir() in Java
+  
+  // TODO: Since this is only a single thread, AMContainer can be used
+  // efficiently for the Uber lifecycle. Even if it is made multi-threaded, it
+  // may make sense to let AMContainer maintain a queue of pending tasks,
+  // instead of only a single pending task.
+
+  /**
+   * Blocking call. Only one attempt runs at any given point.
+   */
+  private void runSubtask(org.apache.hadoop.mapred.Task remoteTask,
+                          final TaskType taskType,
+                          TaskAttemptId attemptID,
+                          final int numMapTasks, 
+                          boolean renameOutputs)
+  throws RuntimeException, IOException {
+    SubtaskRunner subTaskRunner = new SubtaskRunner();
+    subTaskRunner.runSubtask(remoteTask, taskType, attemptID, numMapTasks,
+        renameOutputs);
+  }
+}

Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl2.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl2.java?rev=1388591&r1=1388590&r2=1388591&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl2.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl2.java Fri Sep 21 18:07:22 2012
@@ -92,11 +92,11 @@ public class TaskAttemptListenerImpl2 ex
   private Server server;
   
   // TODO XXX: Use this to figure out whether an incoming ping is valid.
-  private ConcurrentMap<TaskAttemptID, WrappedJvmID>
-    jvmIDToActiveAttemptMap
-      = new ConcurrentHashMap<TaskAttemptID, WrappedJvmID>();
+  private ConcurrentMap<TaskAttemptID, WrappedJvmID> attemptToJvmIdMap =
+      new ConcurrentHashMap<TaskAttemptID, WrappedJvmID>();
   // jvmIdToContainerIdMap also serving to check whether the container is still running.
-  private ConcurrentMap<WrappedJvmID, ContainerId> jvmIDToContainerIdMap = new ConcurrentHashMap<WrappedJvmID, ContainerId>();
+  private ConcurrentMap<WrappedJvmID, ContainerId> jvmIDToContainerIdMap =
+      new ConcurrentHashMap<WrappedJvmID, ContainerId>();
 //  private Set<WrappedJvmID> launchedJVMs = Collections
 //      .newSetFromMap(new ConcurrentHashMap<WrappedJvmID, Boolean>()); 
   
@@ -162,7 +162,7 @@ public class TaskAttemptListenerImpl2 ex
   }
 
   private void pingContainerHeartbeatHandler(TaskAttemptID attemptID) {
-    containerHeartbeatHandler.pinged(jvmIDToContainerIdMap.get(jvmIDToActiveAttemptMap.get(attemptID)));
+    containerHeartbeatHandler.pinged(jvmIDToContainerIdMap.get(attemptToJvmIdMap.get(attemptID)));
   }
   
   /**
@@ -494,14 +494,14 @@ public class TaskAttemptListenerImpl2 ex
   }
   
   public void registerTaskAttempt(TaskAttemptId attemptId, WrappedJvmID jvmId) {
-    jvmIDToActiveAttemptMap.put(TypeConverter.fromYarn(attemptId), jvmId);
+    attemptToJvmIdMap.put(TypeConverter.fromYarn(attemptId), jvmId);
   }
   
   // Unregister called by the Container. Registration happens when TAL asks
   // the container for a task.
   @Override
   public void unregisterTaskAttempt(TaskAttemptId attemptId) {
-    jvmIDToActiveAttemptMap.remove(TypeConverter.fromYarn(attemptId));
+    attemptToJvmIdMap.remove(TypeConverter.fromYarn(attemptId));
   }
 
   public org.apache.hadoop.mapred.Task pullTaskAttempt(ContainerId containerId) {

Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ContainerHeartbeatHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ContainerHeartbeatHandler.java?rev=1388591&r1=1388590&r2=1388591&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ContainerHeartbeatHandler.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ContainerHeartbeatHandler.java Fri Sep 21 18:07:22 2012
@@ -5,9 +5,7 @@ import org.apache.hadoop.mapreduce.MRJob
 import org.apache.hadoop.mapreduce.v2.app2.AppContext;
 import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerEvent;
 import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerEventType;
-import org.apache.hadoop.yarn.Clock;
 import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.event.EventHandler;
 
 public class ContainerHeartbeatHandler extends
     HeartbeatHandlerBase<ContainerId> {
@@ -30,14 +28,14 @@ public class ContainerHeartbeatHandler e
   }
 
   @Override
-  public boolean hasTimedOut(ReportTime report, long currentTime) {
+  protected boolean hasTimedOut(ReportTime report, long currentTime) {
     return (timeOut > 0) && (currentTime > report.getLastPing() + timeOut);
 
   }
 
   @SuppressWarnings("unchecked")
   @Override
-  public void handleTimeOut(ContainerId containerId) {
+  protected void handleTimeOut(ContainerId containerId) {
     eventHandler.handle(new AMContainerEvent(containerId,
         AMContainerEventType.C_TIMED_OUT));
   }

Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/jobhistory/HeartbeatHandlerBase.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/jobhistory/HeartbeatHandlerBase.java?rev=1388591&r1=1388590&r2=1388591&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/jobhistory/HeartbeatHandlerBase.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/jobhistory/HeartbeatHandlerBase.java Fri Sep 21 18:07:22 2012
@@ -26,16 +26,17 @@ public abstract class HeartbeatHandlerBa
   
   private ConcurrentMap<T, ReportTime> runningMap;
   private volatile boolean stopped;
-  
-  public HeartbeatHandlerBase(AppContext appContext,int numThreads, String name) {
+
+  public HeartbeatHandlerBase(AppContext appContext, int numThreads, String name) {
     super(name);
     this.name = name;
     this.eventHandler = appContext.getEventHandler();
     this.clock = appContext.getClock();
     this.appContext = appContext;
-    this.runningMap = new ConcurrentHashMap<T, HeartbeatHandlerBase.ReportTime>();
+    this.runningMap = new ConcurrentHashMap<T, HeartbeatHandlerBase.ReportTime>(
+        16, 0.75f, numThreads);
   }
-  
+
   @Override
   public void init(Configuration conf) {
     super.init(conf);
@@ -55,7 +56,9 @@ public abstract class HeartbeatHandlerBa
   @Override
   public void stop() {
     stopped = true;
-    timeOutCheckerThread.interrupt();
+    if (timeOutCheckerThread != null) {
+      timeOutCheckerThread.interrupt();
+    }
     super.stop();
   }
   
@@ -115,9 +118,9 @@ public abstract class HeartbeatHandlerBa
     }
   }
   
-  public abstract boolean hasTimedOut(ReportTime report, long currentTime);
+  protected abstract boolean hasTimedOut(ReportTime report, long currentTime);
   
-  public abstract void handleTimeOut(T t);
+  protected abstract void handleTimeOut(T t);
   
   private class PingChecker implements Runnable {
 

Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/MRAppMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/MRAppMaster.java?rev=1388591&r1=1388590&r2=1388591&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/MRAppMaster.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/MRAppMaster.java Fri Sep 21 18:07:22 2012
@@ -36,7 +36,9 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.FileOutputCommitter;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.LocalContainerAllocator;
 import org.apache.hadoop.mapred.TaskAttemptListenerImpl2;
+import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.OutputFormat;
@@ -68,13 +70,15 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app2.job.impl.JobImpl;
 import org.apache.hadoop.mapreduce.v2.app2.launcher.ContainerLauncher;
 import org.apache.hadoop.mapreduce.v2.app2.launcher.ContainerLauncherImpl;
+import org.apache.hadoop.mapreduce.v2.app2.local.LocalContainerRequestor;
 import org.apache.hadoop.mapreduce.v2.app2.metrics.MRAppMetrics;
 import org.apache.hadoop.mapreduce.v2.app2.recover.Recovery;
 import org.apache.hadoop.mapreduce.v2.app2.recover.RecoveryService;
 import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerEventType;
 import org.apache.hadoop.mapreduce.v2.app2.rm.ContainerAllocator;
-import org.apache.hadoop.mapreduce.v2.app2.rm.NMCommunicatorEvent;
+import org.apache.hadoop.mapreduce.v2.app2.rm.ContainerRequestor;
 import org.apache.hadoop.mapreduce.v2.app2.rm.NMCommunicatorEventType;
+import org.apache.hadoop.mapreduce.v2.app2.rm.RMCommunicator;
 import org.apache.hadoop.mapreduce.v2.app2.rm.RMCommunicatorEventType;
 import org.apache.hadoop.mapreduce.v2.app2.rm.RMContainerAllocator;
 import org.apache.hadoop.mapreduce.v2.app2.rm.RMContainerRequestor;
@@ -176,7 +180,7 @@ public class MRAppMaster extends Composi
   private JobHistoryEventHandler2 jobHistoryEventHandler;
   private boolean inRecovery = false;
   private SpeculatorEventDispatcher speculatorEventDispatcher;
-  private RMContainerRequestor rmContainerRequestor;
+  private ContainerRequestor containerRequestor;
   private ContainerAllocator amScheduler;
   
 
@@ -310,26 +314,9 @@ public class MRAppMaster extends Composi
     dispatcher.register(Speculator.EventType.class,
         speculatorEventDispatcher);
 
-    // service to allocate containers from RM (if non-uber) or to fake it (uber)
-    rmContainerRequestor = createRMContainerRequestor(clientService, context);
-    addIfService(rmContainerRequestor);
-    dispatcher.register(RMCommunicatorEventType.class, rmContainerRequestor);
-    
-    // TODO XXX: Get rid of eventHandlers being sent as part of the constructors. context should be adequate.
-    amScheduler = createAMScheduler(rmContainerRequestor, context);
-    addIfService(amScheduler);
-    dispatcher.register(AMSchedulerEventType.class, amScheduler);
-    
-//    containerAllocator = createContainerAllocator(clientService, context);
-//    addIfService(containerAllocator);
-//    dispatcher.register(ContainerAllocator.EventType.class, containerAllocator);
-
-    // TODO XXX: initialization of RMComm, Scheduler, etc.
-    
-        
-    // TODO XXX: Rename to NMComm
-    // corresponding service to launch allocated containers via NodeManager
-//    containerLauncher = createNMCommunicator(context);
+    //    TODO XXX: Rename to NMComm
+    //    corresponding service to launch allocated containers via NodeManager
+    //    containerLauncher = createNMCommunicator(context);
     containerLauncher = createContainerLauncher(context);
     addIfService(containerLauncher);
     dispatcher.register(NMCommunicatorEventType.class, containerLauncher);
@@ -508,17 +495,21 @@ public class MRAppMaster extends Composi
         maybeSendJobEndNotification();
         // TODO XXX Add a timeout.
         LOG.info("Waiting for all containers and TaskAttempts to complete");
-        while (!allContainersComplete() || !allTaskAttemptsComplete()) {
-          try {
-            synchronized(this) {
-              wait(100l);
+        if (!job.isUber()) {
+          while (!allContainersComplete() || !allTaskAttemptsComplete()) {
+            try {
+              synchronized (this) {
+                wait(100l);
+              }
+            } catch (InterruptedException e) {
+              LOG.info("AM Shutdown Thread interrupted. Exiting");
+              break;
             }
-          } catch (InterruptedException e) {
-            LOG.info("AM Shutdown Thread interrupted. Exiting");
-            break;
           }
+          LOG.info("All Containers and TaskAttempts Complete. Stopping services");
+        } else {
+          LOG.info("Uberized job. Not waiting for all containers to finish");
         }
-        LOG.info("All Containers and TaskAttempts Complete. Stopping services");
         stopAM();
         LOG.info("AM Shutdown Thread Completing");
       }
@@ -593,20 +584,37 @@ public class MRAppMaster extends Composi
    * @param appContext the application context.
    * @return an instance of the RMContainerRequestor.
    */
-  protected RMContainerRequestor createRMContainerRequestor(
+  protected ContainerRequestor createContainerRequestor(
       ClientService clientService, AppContext appContext) {
-    return new RMContainerRequestor(clientService, appContext);
+    ContainerRequestor containerRequestor;
+    if (job.isUber()) {
+      containerRequestor = new LocalContainerRequestor(clientService,
+          appContext);
+    } else {
+      containerRequestor = new RMContainerRequestor(clientService, appContext);
+    }
+    return containerRequestor;
   }
-  
+
   /**
    * Create the AM Scheduler.
-   * @param requestor The RM Container Requestor.
+   * 
+   * @param requestor The Container Requestor.
    * @param appContext the application context.
    * @return an instance of the AMScheduler.
    */
-  protected ContainerAllocator createAMScheduler(
-      RMContainerRequestor requestor, AppContext appContext) {
-    return new RMContainerAllocator(requestor, appContext);
+  protected ContainerAllocator createAMScheduler(ContainerRequestor requestor,
+      AppContext appContext) {
+    if (job.isUber()) {
+      return new LocalContainerAllocator(appContext, jobId, nmHost, nmPort,
+          nmHttpPort, containerID, (TaskUmbilicalProtocol) taskAttemptListener,
+          taskAttemptListener, (RMCommunicator)containerRequestor);
+    } else {
+      // TODO XXX: This is terrible. Assuming RMContainerRequestor is sent in
+      // when non-uberized. Fix RMContainerRequestor to be a proper interface, etc.
+      return new RMContainerAllocator((RMContainerRequestor) requestor,
+          appContext);
+    }
   }
 
   /** Create and initialize (but don't start) a single job. */
@@ -727,32 +735,25 @@ public class MRAppMaster extends Composi
         MRJobConfig.DEFAULT_MR_AM_TASK_LISTENER_THREAD_COUNT));
     return thh;
   }
-
+  
   protected ContainerHeartbeatHandler createContainerHeartbeatHandler(AppContext context,
       Configuration conf) {
     ContainerHeartbeatHandler chh = new ContainerHeartbeatHandler(context, conf.getInt(
         MRJobConfig.MR_AM_TASK_LISTENER_THREAD_COUNT,
         MRJobConfig.DEFAULT_MR_AM_TASK_LISTENER_THREAD_COUNT));
+    // TODO XXX: Define a CONTAINER_LISTENER_THREAD_COUNT
     return chh;
   }
   
+
   protected TaskCleaner createTaskCleaner(AppContext context) {
     return new TaskCleanerImpl(context);
   }
 
-//  protected ContainerAllocator createContainerAllocator(
-//      final ClientService clientService, final AppContext context) {
-//    return new ContainerAllocatorRouter(clientService, context);
-//  }
-
   protected ContainerLauncher
       createContainerLauncher(final AppContext context) {
-    return new ContainerLauncherRouter(context);
+    return new ContainerLauncherImpl(context);
   }
-  
-//  protected ContainerLauncher createNMCommunicator(final AppContext context) {
-//    return new ContainerLauncherImpl(context);
-//  }
 
   //TODO:should have an interface for MRClientService
   protected ClientService createClientService(AppContext context) {
@@ -797,12 +798,8 @@ public class MRAppMaster extends Composi
 
   public List<AMInfo> getAllAMInfos() {
     return amInfos;
-  }
-  
-//  public ContainerAllocator getContainerAllocator() {
-//    return containerAllocator;
-//  }
-  
+  }  
+
   public ContainerLauncher getContainerLauncher() {
     return containerLauncher;
   }
@@ -815,96 +812,6 @@ public class MRAppMaster extends Composi
     return taskHeartbeatHandler;
   }
 
-  /**
-   * By the time life-cycle of this router starts, job-init would have already
-   * happened.
-   */
-//  private final class ContainerAllocatorRouter extends AbstractService
-//      implements ContainerAllocator {
-//    private final ClientService clientService;
-//    private final AppContext context;
-//    private ContainerAllocator containerAllocator;
-//
-//    ContainerAllocatorRouter(ClientService clientService,
-//        AppContext context) {
-//      super(ContainerAllocatorRouter.class.getName());
-//      this.clientService = clientService;
-//      this.context = context;
-//    }
-//
-//    @Override
-//    public synchronized void start() {
-//      if (job.isUber()) {
-//        this.containerAllocator = new LocalContainerAllocator(
-//            this.clientService, this.context, nmHost, nmPort, nmHttpPort
-//            , containerID);
-//      } else {
-//        
-//        // TODO XXX UberAM
-////        this.containerAllocator = new RMContainerAllocator(
-////            this.clientService, this.context);
-//      }
-//      ((Service)this.containerAllocator).init(getConfig());
-//      ((Service)this.containerAllocator).start();
-//      super.start();
-//    }
-//
-//    @Override
-//    public synchronized void stop() {
-//      ((Service)this.containerAllocator).stop();
-//      super.stop();
-//    }
-//
-//    @Override
-//    public void handle(ContainerAllocatorEvent event) {
-//      this.containerAllocator.handle(event);
-//    }
-//
-//    public void setSignalled(boolean isSignalled) {
-//      ((RMCommunicator) containerAllocator).setSignalled(true);
-//    }
-//  }
-
-  /**
-   * By the time life-cycle of this router starts, job-init would have already
-   * happened.
-   */
-  private final class ContainerLauncherRouter extends AbstractService
-      implements ContainerLauncher {
-    private final AppContext context;
-    private ContainerLauncher containerLauncher;
-
-    ContainerLauncherRouter(AppContext context) {
-      super(ContainerLauncherRouter.class.getName());
-      this.context = context;
-    }
-
-    @Override
-    public synchronized void start() {
-      if (job.isUber()) {
-        // TODO XXX: Handle uber.
-//        this.containerLauncher = new LocalContainerLauncher(context,
-//            (TaskUmbilicalProtocol) taskAttemptListener);
-      } else {
-        this.containerLauncher = new ContainerLauncherImpl(context);
-      }
-      ((Service)this.containerLauncher).init(getConfig());
-      ((Service)this.containerLauncher).start();
-      super.start();
-    }
-
-    @Override
-    public void handle(NMCommunicatorEvent event) {
-        this.containerLauncher.handle(event);
-    }
-
-    @Override
-    public synchronized void stop() {
-      ((Service)this.containerLauncher).stop();
-      super.stop();
-    }
-  }
-
   private final class StagingDirCleaningService extends AbstractService {
     StagingDirCleaningService() {
       super(StagingDirCleaningService.class.getName());
@@ -1055,6 +962,16 @@ public class MRAppMaster extends Composi
       LOG.info("MRAppMaster launching normal, non-uberized, multi-container "
                + "job " + job.getID() + ".");
     }
+    // service to allocate containers from RM (if non-uber) or to fake it (uber)
+    containerRequestor = createContainerRequestor(clientService, context);
+    addIfService(containerRequestor);
+    ((Service)containerRequestor).init(getConfig());
+    dispatcher.register(RMCommunicatorEventType.class, containerRequestor);
+
+    amScheduler = createAMScheduler(containerRequestor, context);
+    addIfService(amScheduler);
+    ((Service)amScheduler).init(getConfig());
+    dispatcher.register(AMSchedulerEventType.class, amScheduler);
 
     //start all the components
     super.start();
@@ -1223,11 +1140,11 @@ public class MRAppMaster extends Composi
       LOG.info("MRAppMaster received a signal. Signaling RMCommunicator and "
         + "JobHistoryEventHandler.");
       // Notify the JHEH and RMCommunicator that a SIGTERM has been received so
-      // that they don't take too long in shutting down 
-//      if(appMaster.containerAllocator instanceof ContainerAllocatorRouter) {
-//        ((ContainerAllocatorRouter) appMaster.containerAllocator)
-//        .setSignalled(true);
-//      }
+      // that they don't take too long in shutting down
+      
+      // Signal the RMCommunicator.
+      ((RMCommunicator)appMaster.containerRequestor).setSignalled(true);
+
       if(appMaster.jobHistoryEventHandler != null) {
         appMaster.jobHistoryEventHandler.setSignalled(true);
       }

Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/TaskHeartbeatHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/TaskHeartbeatHandler.java?rev=1388591&r1=1388590&r2=1388591&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/TaskHeartbeatHandler.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/TaskHeartbeatHandler.java Fri Sep 21 18:07:22 2012
@@ -52,14 +52,14 @@ public class TaskHeartbeatHandler extend
   }
 
   @Override
-  public boolean hasTimedOut(
+  protected boolean hasTimedOut(
       org.apache.hadoop.mapreduce.jobhistory.HeartbeatHandlerBase.ReportTime report,
       long currentTime) {
     return (timeOut > 0) && (currentTime > report.getLastPing() + timeOut);
   }
 
   @Override
-  public void handleTimeOut(TaskAttemptId attemptId) {
+  protected void handleTimeOut(TaskAttemptId attemptId) {
     eventHandler.handle(new TaskAttemptDiagnosticsUpdateEvent(attemptId,
         "AttemptID:" + attemptId.toString()
         + " Timed out after " + timeOut / 1000 + " secs"));

Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/event/TaskAttemptRemoteStartEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/event/TaskAttemptRemoteStartEvent.java?rev=1388591&r1=1388590&r2=1388591&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/event/TaskAttemptRemoteStartEvent.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/event/TaskAttemptRemoteStartEvent.java Fri Sep 21 18:07:22 2012
@@ -1,10 +1,27 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
 package org.apache.hadoop.mapreduce.v2.app2.job.event;
 
 import java.util.Map;
 
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
-import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 
 public class TaskAttemptRemoteStartEvent extends TaskAttemptEvent {

Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/local/LocalContainerRequestor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/local/LocalContainerRequestor.java?rev=1388591&view=auto
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/local/LocalContainerRequestor.java (added)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/local/LocalContainerRequestor.java Fri Sep 21 18:07:22 2012
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app2.local;
+
+import java.util.ArrayList;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.v2.app2.AppContext;
+import org.apache.hadoop.mapreduce.v2.app2.client.ClientService;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.JobEvent;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.JobEventType;
+import org.apache.hadoop.mapreduce.v2.app2.rm.ContainerRequestor;
+import org.apache.hadoop.mapreduce.v2.app2.rm.RMCommunicator;
+import org.apache.hadoop.mapreduce.v2.app2.rm.RMCommunicatorEvent;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.AMResponse;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+
+/**
+ * For UberAM, the LocalContainerRequestor is responsible for sending keep-alive
+ * heartbeats to the RM, along with sending over job progress. Also provides any
+ * additional information to the rest of the AM - ApplicationACLs etc.
+ */
+public class LocalContainerRequestor extends RMCommunicator implements
+    ContainerRequestor {
+
+  private static final Log LOG =
+      LogFactory.getLog(LocalContainerRequestor.class);
+  
+  private long retrystartTime;
+  private long retryInterval;
+  
+  public LocalContainerRequestor(ClientService clientService, AppContext context) {
+    super(clientService, context);
+  }
+
+  @Override
+  public void init(Configuration conf) {
+    super.init(conf);
+    retryInterval =
+        getConfig().getLong(MRJobConfig.MR_AM_TO_RM_WAIT_INTERVAL_MS,
+            MRJobConfig.DEFAULT_MR_AM_TO_RM_WAIT_INTERVAL_MS);
+    // Init startTime to current time. If all goes well, it will be reset after
+    // first attempt to contact RM.
+    retrystartTime = System.currentTimeMillis();
+  }
+  
+  @SuppressWarnings("unchecked")
+  @Override
+  protected void heartbeat() throws Exception {
+    AllocateRequest allocateRequest = BuilderUtils.newAllocateRequest(
+        this.applicationAttemptId, this.lastResponseID, super
+            .getApplicationProgress(), new ArrayList<ResourceRequest>(),
+        new ArrayList<ContainerId>());
+    AllocateResponse allocateResponse = scheduler.allocate(allocateRequest);
+    AMResponse response;
+    try {
+      response = allocateResponse.getAMResponse();
+      // Reset retry count if no exception occurred.
+      retrystartTime = System.currentTimeMillis();
+    } catch (Exception e) {
+      // This can happen when the connection to the RM has gone down. Keep
+      // re-trying until the retryInterval has expired.
+      if (System.currentTimeMillis() - retrystartTime >= retryInterval) {
+        LOG.error("Could not contact RM after " + retryInterval + " milliseconds.");
+        eventHandler.handle(new JobEvent(this.getJob().getID(),
+                                         JobEventType.INTERNAL_ERROR));
+        throw new YarnException("Could not contact RM after " +
+                                retryInterval + " milliseconds.");
+      }
+      // Throw this up to the caller, which may decide to ignore it and
+      // continue to attempt to contact the RM.
+      throw e;
+    }
+    if (response.getReboot()) {
+      LOG.info("Event from RM: shutting down Application Master");
+      // This can happen if the RM has been restarted. If it is in that state,
+      // this application must clean itself up.
+      eventHandler.handle(new JobEvent(this.getJob().getID(),
+                                       JobEventType.INTERNAL_ERROR));
+      throw new YarnException("Resource Manager doesn't recognize AttemptId: " +
+                               this.getContext().getApplicationID());
+    }
+  }
+
+  @Override
+  public void handle(RMCommunicatorEvent rawEvent) {
+    switch (rawEvent.getType()) {
+    case CONTAINER_DEALLOCATE:
+      LOG.warn("Unexpected eventType: " + rawEvent.getType() + ", Event: "
+          + rawEvent);
+      break;
+    case CONTAINER_FAILED:
+      LOG.warn("Unexpected eventType: " + rawEvent.getType() + ", Event: "
+          + rawEvent);
+      break;
+    case CONTAINER_REQ:
+      LOG.warn("Unexpected eventType: " + rawEvent.getType() + ", Event: "
+          + rawEvent);
+      break;
+    default:
+      break;
+    }
+  }
+}

Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/ContainerRequestor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/ContainerRequestor.java?rev=1388591&view=auto
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/ContainerRequestor.java (added)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/ContainerRequestor.java Fri Sep 21 18:07:22 2012
@@ -0,0 +1,27 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app2.rm;
+
+import org.apache.hadoop.yarn.event.EventHandler;
+
+public interface ContainerRequestor extends EventHandler<RMCommunicatorEvent> {
+  
+  
+
+}

Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMCommunicator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMCommunicator.java?rev=1388591&r1=1388590&r2=1388591&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMCommunicator.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMCommunicator.java Fri Sep 21 18:07:22 2012
@@ -82,7 +82,7 @@ public abstract class RMCommunicator ext
   protected volatile boolean isSignalled = false;
 
   public RMCommunicator(ClientService clientService, AppContext context) {
-    super("RMCommunicator");
+    super(RMCommunicator.class.getSimpleName());
     this.clientService = clientService;
     this.context = context;
     this.eventHandler = context.getEventHandler();

Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerRequestor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerRequestor.java?rev=1388591&r1=1388590&r2=1388591&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerRequestor.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerRequestor.java Fri Sep 21 18:07:22 2012
@@ -52,7 +52,6 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.util.BuilderUtils;
 import org.apache.hadoop.yarn.util.Records;
 
@@ -61,14 +60,13 @@ import org.apache.hadoop.yarn.util.Recor
  * Keeps the data structures to send container requests to RM.
  */
 // TODO XXX: Eventually rename to RMCommunicator
-public class RMContainerRequestor extends RMCommunicator implements EventHandler<RMCommunicatorEvent> {
+public class RMContainerRequestor extends RMCommunicator implements ContainerRequestor {
   
   private static final Log LOG = LogFactory.getLog(RMContainerRequestor.class);
   static final String ANY = "*";
   
   private final Clock clock;
 
-  private int lastResponseID;
   private Resource availableResources; // aka headroom.
   private long retrystartTime;
   private long retryInterval;

Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/MRApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/MRApp.java?rev=1388591&r1=1388590&r2=1388591&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/MRApp.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/MRApp.java Fri Sep 21 18:07:22 2012
@@ -74,6 +74,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerTAStopRequestEvent;
 import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerTASucceededEvent;
 import org.apache.hadoop.mapreduce.v2.app2.rm.ContainerAllocator;
+import org.apache.hadoop.mapreduce.v2.app2.rm.ContainerRequestor;
 import org.apache.hadoop.mapreduce.v2.app2.rm.NMCommunicatorEvent;
 import org.apache.hadoop.mapreduce.v2.app2.rm.RMCommunicatorContainerDeAllocateRequestEvent;
 import org.apache.hadoop.mapreduce.v2.app2.rm.RMCommunicatorEvent;
@@ -81,8 +82,8 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainer;
 import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerAssignTAEvent;
 import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerEvent;
-import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerEventLaunched;
 import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerEventCompleted;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerEventLaunched;
 import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerEventType;
 import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerLaunchRequestEvent;
 import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerState;
@@ -547,7 +548,7 @@ public class MRApp extends MRAppMaster {
   }
 
   @Override
-  protected RMContainerRequestor createRMContainerRequestor(
+  protected ContainerRequestor createContainerRequestor(
       ClientService clientService, AppContext appContext) {
     return new MRAppContainerRequestor(clientService, appContext);
   }
@@ -592,9 +593,9 @@ public class MRApp extends MRAppMaster {
   }
  
   @Override
-  protected ContainerAllocator createAMScheduler(
-      RMContainerRequestor requestor, AppContext appContext) {
-    return new MRAppAMScheduler();    
+  protected ContainerAllocator createAMScheduler(ContainerRequestor requestor,
+      AppContext appContext) {
+    return new MRAppAMScheduler();
   }
 
   protected class MRAppAMScheduler implements ContainerAllocator {

Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestStagingCleanup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestStagingCleanup.java?rev=1388591&r1=1388590&r2=1388591&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestStagingCleanup.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestStagingCleanup.java Fri Sep 21 18:07:22 2012
@@ -38,7 +38,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app2.job.Job;
 import org.apache.hadoop.mapreduce.v2.app2.job.event.JobFinishEvent;
 import org.apache.hadoop.mapreduce.v2.app2.job.impl.JobImpl;
-import org.apache.hadoop.mapreduce.v2.app2.rm.RMContainerRequestor;
+import org.apache.hadoop.mapreduce.v2.app2.rm.ContainerRequestor;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -140,7 +140,7 @@ import org.junit.Test;
     }
 
     @Override
-    protected RMContainerRequestor createRMContainerRequestor(
+    protected ContainerRequestor createContainerRequestor(
         ClientService clientService, AppContext appContext) {
       return new TestCleanupContainerRequestor(clientService, appContext);
     }



Mime
View raw message