hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject svn commit: r1376283 [4/22] - in /hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client: ./ hadoop-mapreduce-client-app2/ hadoop-mapreduce-client-app2/src/ hadoop-mapreduce-client-app2/src/main/ hadoop-mapreduce-client-app2/sr...
Date Wed, 22 Aug 2012 22:11:48 GMT
Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/MRAppMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/MRAppMaster.java?rev=1376283&view=auto
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/MRAppMaster.java (added)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/MRAppMaster.java Wed Aug 22 22:11:39 2012
@@ -0,0 +1,1263 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app2;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.security.PrivilegedExceptionAction;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileOutputCommitter;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.TaskAttemptListenerImpl2;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.jobhistory.AMStartedEvent;
+import org.apache.hadoop.mapreduce.jobhistory.ContainerHeartbeatHandler;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler2;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
+import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
+import org.apache.hadoop.mapreduce.v2.app2.client.ClientService;
+import org.apache.hadoop.mapreduce.v2.app2.client.MRClientService;
+import org.apache.hadoop.mapreduce.v2.app2.job.Job;
+import org.apache.hadoop.mapreduce.v2.app2.job.Task;
+import org.apache.hadoop.mapreduce.v2.app2.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.JobEvent;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.JobEventType;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.JobFinishEvent;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventType;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskEvent;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskEventType;
+import org.apache.hadoop.mapreduce.v2.app2.job.impl.JobImpl;
+import org.apache.hadoop.mapreduce.v2.app2.launcher.ContainerLauncher;
+import org.apache.hadoop.mapreduce.v2.app2.launcher.ContainerLauncherImpl;
+import org.apache.hadoop.mapreduce.v2.app2.metrics.MRAppMetrics;
+import org.apache.hadoop.mapreduce.v2.app2.recover.Recovery;
+import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerEventType;
+import org.apache.hadoop.mapreduce.v2.app2.rm.ContainerAllocator;
+import org.apache.hadoop.mapreduce.v2.app2.rm.NMCommunicatorEvent;
+import org.apache.hadoop.mapreduce.v2.app2.rm.NMCommunicatorEventType;
+import org.apache.hadoop.mapreduce.v2.app2.rm.RMCommunicatorEventType;
+import org.apache.hadoop.mapreduce.v2.app2.rm.RMContainerAllocator;
+import org.apache.hadoop.mapreduce.v2.app2.rm.RMContainerRequestor;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainer;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerEventType;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerMap;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerState;
+import org.apache.hadoop.mapreduce.v2.app2.rm.node.AMNode;
+import org.apache.hadoop.mapreduce.v2.app2.rm.node.AMNodeEventType;
+import org.apache.hadoop.mapreduce.v2.app2.rm.node.AMNodeMap;
+import org.apache.hadoop.mapreduce.v2.app2.speculate.DefaultSpeculator;
+import org.apache.hadoop.mapreduce.v2.app2.speculate.Speculator;
+import org.apache.hadoop.mapreduce.v2.app2.speculate.SpeculatorEvent;
+import org.apache.hadoop.mapreduce.v2.app2.taskclean.TaskCleaner;
+import org.apache.hadoop.mapreduce.v2.app2.taskclean.TaskCleanerImpl;
+import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.ShutdownHookManager;
+import org.apache.hadoop.yarn.Clock;
+import org.apache.hadoop.yarn.ClusterInfo;
+import org.apache.hadoop.yarn.SystemClock;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.hadoop.yarn.service.CompositeService;
+import org.apache.hadoop.yarn.service.Service;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+
+/**
+ * The Map-Reduce Application Master.
+ * The state machine is encapsulated in the implementation of Job interface.
+ * All state changes happens via Job interface. Each event 
+ * results in a Finite State Transition in Job.
+ * 
+ * MR AppMaster is the composition of loosely coupled services. The services 
+ * interact with each other via events. The components resembles the 
+ * Actors model. The component acts on received event and send out the 
+ * events to other components.
+ * This keeps it highly concurrent with no or minimal synchronization needs.
+ * 
+ * The events are dispatched by a central Dispatch mechanism. All components
+ * register to the Dispatcher.
+ * 
+ * The information is shared across different components using AppContext.
+ */
+
+@SuppressWarnings("rawtypes")
+public class MRAppMaster extends CompositeService {
+
+  private static final Log LOG = LogFactory.getLog(MRAppMaster.class);
+
+  /**
+   * Priority of the MRAppMaster shutdown hook.
+   */
+  public static final int SHUTDOWN_HOOK_PRIORITY = 30;
+
+  private Clock clock;
+  private final long startTime;
+  private final long appSubmitTime;
+  private String appName;
+  private final ApplicationAttemptId appAttemptID;
+  private final ContainerId containerID;
+  private final String nmHost;
+  private final int nmPort;
+  private final int nmHttpPort;
+  private AMContainerMap containers;
+  private AMNodeMap nodes;
+  protected final MRAppMetrics metrics;
+  private Map<TaskId, TaskInfo> completedTasksFromPreviousRun;
+  private List<AMInfo> amInfos;
+  private AppContext context;
+  private Dispatcher dispatcher;
+  private ClientService clientService;
+  private Recovery recoveryServ;
+  private ContainerLauncher containerLauncher;
+  private TaskCleaner taskCleaner;
+  private Speculator speculator;
+  private ContainerHeartbeatHandler containerHeartbeatHandler;
+  private TaskHeartbeatHandler taskHeartbeatHandler;
+  private TaskAttemptListener taskAttemptListener;
+  private JobTokenSecretManager jobTokenSecretManager =
+      new JobTokenSecretManager();
+  private JobId jobId;
+  private boolean newApiCommitter;
+  private OutputCommitter committer;
+  private JobEventDispatcher jobEventDispatcher;
+  private JobHistoryEventHandler2 jobHistoryEventHandler;
+  private boolean inRecovery = false;
+  private SpeculatorEventDispatcher speculatorEventDispatcher;
+  private RMContainerRequestor rmContainerRequestor;
+  private ContainerAllocator amScheduler;
+  
+
+  private Job job;
+  private Credentials fsTokens = new Credentials(); // Filled during init
+  private UserGroupInformation currentUser; // Will be setup during init
+
+  public MRAppMaster(ApplicationAttemptId applicationAttemptId,
+      ContainerId containerId, String nmHost, int nmPort, int nmHttpPort,
+      long appSubmitTime) {
+    this(applicationAttemptId, containerId, nmHost, nmPort, nmHttpPort,
+        new SystemClock(), appSubmitTime);
+  }
+
+  public MRAppMaster(ApplicationAttemptId applicationAttemptId,
+      ContainerId containerId, String nmHost, int nmPort, int nmHttpPort,
+      Clock clock, long appSubmitTime) {
+    super(MRAppMaster.class.getName());
+    this.clock = clock;
+    this.startTime = clock.getTime();
+    this.appSubmitTime = appSubmitTime;
+    this.appAttemptID = applicationAttemptId;
+    this.containerID = containerId;
+    this.nmHost = nmHost;
+    this.nmPort = nmPort;
+    this.nmHttpPort = nmHttpPort;
+    this.metrics = MRAppMetrics.create();
+    LOG.info("Created MRAppMaster for application " + applicationAttemptId);
+  }
+
+  @Override
+  public void init(final Configuration conf) {
+
+    conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
+
+    downloadTokensAndSetupUGI(conf);
+
+    context = new RunningAppContext(conf);
+
+    // Job name is the same as the app name util we support DAG of jobs
+    // for an app later
+    appName = conf.get(MRJobConfig.JOB_NAME, "<missing app name>");
+
+    conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, appAttemptID.getAttemptId());
+     
+    newApiCommitter = false;
+    jobId = MRBuilderUtils.newJobId(appAttemptID.getApplicationId(),
+        appAttemptID.getApplicationId().getId());
+    int numReduceTasks = conf.getInt(MRJobConfig.NUM_REDUCES, 0);
+    if ((numReduceTasks > 0 && 
+        conf.getBoolean("mapred.reducer.new-api", false)) ||
+          (numReduceTasks == 0 && 
+           conf.getBoolean("mapred.mapper.new-api", false)))  {
+      newApiCommitter = true;
+      LOG.info("Using mapred newApiCommitter.");
+    }
+
+    committer = createOutputCommitter(conf);
+    boolean recoveryEnabled = conf.getBoolean(
+        MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
+    boolean recoverySupportedByCommitter = committer.isRecoverySupported();
+    if (recoveryEnabled && recoverySupportedByCommitter
+        && appAttemptID.getAttemptId() > 1) {
+      LOG.info("Recovery is enabled. "
+          + "Will try to recover from previous life on best effort basis.");
+      recoveryServ = createRecoveryService(context);
+      addIfService(recoveryServ);
+      dispatcher = recoveryServ.getDispatcher();
+      clock = recoveryServ.getClock();
+      inRecovery = true;
+    } else {
+      LOG.info("Not starting RecoveryService: recoveryEnabled: "
+          + recoveryEnabled + " recoverySupportedByCommitter: "
+          + recoverySupportedByCommitter + " ApplicationAttemptID: "
+          + appAttemptID.getAttemptId());
+      dispatcher = createDispatcher();
+      addIfService(dispatcher);
+    }
+
+    
+    
+    taskHeartbeatHandler = createTaskHeartbeatHandler(context, conf);
+    addIfService(taskHeartbeatHandler);
+    
+    containerHeartbeatHandler = createContainerHeartbeatHandler(context, conf);
+    addIfService(containerHeartbeatHandler);
+    
+    //service to handle requests to TaskUmbilicalProtocol
+    taskAttemptListener = createTaskAttemptListener(context, taskHeartbeatHandler, containerHeartbeatHandler);
+    addIfService(taskAttemptListener);
+
+    containers = new AMContainerMap(containerHeartbeatHandler, taskAttemptListener, dispatcher.getEventHandler(), context);
+    addIfService(containers);
+    dispatcher.register(AMContainerEventType.class, containers);
+    
+    nodes = new AMNodeMap(dispatcher.getEventHandler(), context);
+    addIfService(nodes);
+    dispatcher.register(AMNodeEventType.class, nodes);
+    
+    //service to do the task cleanup
+    taskCleaner = createTaskCleaner(context);
+    addIfService(taskCleaner);
+
+    //service to handle requests from JobClient
+    clientService = createClientService(context);
+    addIfService(clientService);
+
+    //service to log job history events
+    EventHandler<JobHistoryEvent> historyService = 
+        createJobHistoryHandler(context);
+    dispatcher.register(org.apache.hadoop.mapreduce.jobhistory.EventType.class,
+        historyService);
+
+    this.jobEventDispatcher = new JobEventDispatcher();
+
+    //register the event dispatchers
+    dispatcher.register(JobEventType.class, jobEventDispatcher);
+    dispatcher.register(TaskEventType.class, new TaskEventDispatcher());
+    dispatcher.register(TaskAttemptEventType.class, 
+        new TaskAttemptEventDispatcher());
+    dispatcher.register(TaskCleaner.EventType.class, taskCleaner);
+   
+    if (conf.getBoolean(MRJobConfig.MAP_SPECULATIVE, false)
+        || conf.getBoolean(MRJobConfig.REDUCE_SPECULATIVE, false)) {
+      //optional service to speculate on task attempts' progress
+      speculator = createSpeculator(conf, context);
+      addIfService(speculator);
+    }
+
+    speculatorEventDispatcher = new SpeculatorEventDispatcher(conf);
+    dispatcher.register(Speculator.EventType.class,
+        speculatorEventDispatcher);
+
+    // service to allocate containers from RM (if non-uber) or to fake it (uber)
+    rmContainerRequestor = createRMContainerRequestor(clientService, context);
+    addIfService(rmContainerRequestor);
+    dispatcher.register(RMCommunicatorEventType.class, rmContainerRequestor);
+    
+    // TODO XXX: Get rid of eventHandlers being sent as part of the constructors. context should be adequate.
+    amScheduler = createAMScheduler(rmContainerRequestor, context);
+    addIfService(amScheduler);
+    dispatcher.register(AMSchedulerEventType.class, amScheduler);
+    
+//    containerAllocator = createContainerAllocator(clientService, context);
+//    addIfService(containerAllocator);
+//    dispatcher.register(ContainerAllocator.EventType.class, containerAllocator);
+
+    // TODO XXX: initialization of RMComm, Scheduler, etc.
+    
+        
+    // TODO XXX: Rename to NMComm
+    // corresponding service to launch allocated containers via NodeManager
+//    containerLauncher = createNMCommunicator(context);
+    containerLauncher = createContainerLauncher(context);
+    addIfService(containerLauncher);
+    dispatcher.register(NMCommunicatorEventType.class, containerLauncher);
+
+    // Add the staging directory cleaner before the history server but after
+    // the container allocator so the staging directory is cleaned after
+    // the history has been flushed but before unregistering with the RM.
+    addService(createStagingDirCleaningService());
+
+    // Add the JobHistoryEventHandler last so that it is properly stopped first.
+    // This will guarantee that all history-events are flushed before AM goes
+    // ahead with shutdown.
+    // Note: Even though JobHistoryEventHandler is started last, if any
+    // component creates a JobHistoryEvent in the meanwhile, it will be just be
+    // queued inside the JobHistoryEventHandler 
+    addIfService(historyService);
+
+    super.init(conf);
+  } // end of init()
+
+  protected Dispatcher createDispatcher() {
+    return new AsyncDispatcher();
+  }
+
+  private OutputCommitter createOutputCommitter(Configuration conf) {
+    OutputCommitter committer = null;
+
+    LOG.info("OutputCommitter set in config "
+        + conf.get("mapred.output.committer.class"));
+
+    if (newApiCommitter) {
+      org.apache.hadoop.mapreduce.v2.api.records.TaskId taskID = MRBuilderUtils
+          .newTaskId(jobId, 0, TaskType.MAP);
+      org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID = MRBuilderUtils
+          .newTaskAttemptId(taskID, 0);
+      TaskAttemptContext taskContext = new TaskAttemptContextImpl(conf,
+          TypeConverter.fromYarn(attemptID));
+      OutputFormat outputFormat;
+      try {
+        outputFormat = ReflectionUtils.newInstance(taskContext
+            .getOutputFormatClass(), conf);
+        committer = outputFormat.getOutputCommitter(taskContext);
+      } catch (Exception e) {
+        throw new YarnException(e);
+      }
+    } else {
+      committer = ReflectionUtils.newInstance(conf.getClass(
+          "mapred.output.committer.class", FileOutputCommitter.class,
+          org.apache.hadoop.mapred.OutputCommitter.class), conf);
+    }
+    LOG.info("OutputCommitter is " + committer.getClass().getName());
+    return committer;
+  }
+
+  protected boolean keepJobFiles(JobConf conf) {
+    return (conf.getKeepTaskFilesPattern() != null || conf
+        .getKeepFailedTaskFiles());
+  }
+  
+  /**
+   * Create the default file System for this job.
+   * @param conf the conf object
+   * @return the default filesystem for this job
+   * @throws IOException
+   */
+  protected FileSystem getFileSystem(Configuration conf) throws IOException {
+    return FileSystem.get(conf);
+  }
+  
+  /**
+   * clean up staging directories for the job.
+   * @throws IOException
+   */
+  public void cleanupStagingDir() throws IOException {
+    /* make sure we clean the staging files */
+    String jobTempDir = null;
+    FileSystem fs = getFileSystem(getConfig());
+    try {
+      if (!keepJobFiles(new JobConf(getConfig()))) {
+        jobTempDir = getConfig().get(MRJobConfig.MAPREDUCE_JOB_DIR);
+        if (jobTempDir == null) {
+          LOG.warn("Job Staging directory is null");
+          return;
+        }
+        Path jobTempDirPath = new Path(jobTempDir);
+        LOG.info("Deleting staging directory " + FileSystem.getDefaultUri(getConfig()) +
+            " " + jobTempDir);
+        fs.delete(jobTempDirPath, true);
+      }
+    } catch(IOException io) {
+      LOG.error("Failed to cleanup staging dir " + jobTempDir, io);
+    }
+  }
+  
+  /**
+   * Exit call. Just in a function call to enable testing.
+   */
+  protected void sysexit() {
+    System.exit(0);
+  }
+  protected class JobFinishEventHandlerCR implements EventHandler<JobFinishEvent> {
+    // Considering TaskAttempts are marked as completed before a container exit,
+    // it's very likely that a Container may not have "completed" by the time a
+    // job completes. This would imply that TaskAtetmpts may not be at a FINAL
+    // internal state (state machine state), and cleanup would not have happened.
+    
+    // Since the shutdown handler has been called in the same thread which
+    // is handling all other async events, creating a separate thread for shutdown.
+    //
+    // For now, checking to see if all containers have COMPLETED, with a 5
+    // second timeout before the exit.
+    // TODO XXX: Modify TaskAttemptCleaner to empty it's queue while stopping.
+    public void handle(JobFinishEvent event) {
+      AMShutdownRunnable r = new AMShutdownRunnable();
+      Thread t = new Thread(r, "AMShutdownThread");
+      t.start();
+    }
+    
+    protected void maybeSendJobEndNotification() {
+      if (getConfig().get(MRJobConfig.MR_JOB_END_NOTIFICATION_URL) != null) {
+        try {
+          LOG.info("Job end notification started for jobID : "
+              + job.getReport().getJobId());
+          JobEndNotifier notifier = new JobEndNotifier();
+          notifier.setConf(getConfig());
+          notifier.notify(job.getReport());
+        } catch (InterruptedException ie) {
+          LOG.warn("Job end notification interrupted for jobID : "
+              + job.getReport().getJobId(), ie);
+        }
+      }
+    }
+    
+    protected void stopAllServices() {
+      try {
+        // Stop all services
+        // This will also send the final report to the ResourceManager
+        LOG.info("Calling stop for all the services");
+        stop();
+
+      } catch (Throwable t) {
+        LOG.warn("Graceful stop failed ", t);
+      }
+    }
+    
+    protected void exit() {
+      LOG.info("Exiting MR AppMaster..GoodBye!");
+      sysexit();
+    }
+    
+    private void stopAM() {
+      stopAllServices();
+      exit();
+    }
+    
+    protected boolean allContainersComplete() {
+      for (AMContainer amContainer : context.getAllContainers().values()) {
+        if (amContainer.getState() != AMContainerState.COMPLETED) {
+          return false;
+        }
+      }
+      return true;
+    }
+    
+    protected boolean allTaskAttemptsComplete() {
+      // TODO XXX: Implement.
+      // TaskAttempts will transition to their final state machine state only
+      // after a container is complete and sends out a TA_TERMINATED event.
+      return true;
+    }
+
+    private class AMShutdownRunnable implements Runnable {
+      @Override
+      public void run() {
+        maybeSendJobEndNotification();
+        while (!allContainersComplete() || !allTaskAttemptsComplete()) {
+          try {
+            synchronized(this) {
+              wait(100l);
+            }
+          } catch (InterruptedException e) {
+            LOG.info("AM Shutdown Thread interrupted. Exiting");
+            break;
+          }
+        }
+        stopAM();
+        LOG.info("AM Shutdown Thread Completing");
+      }
+    }
+  }
+  
+  private class JobFinishEventHandler implements EventHandler<JobFinishEvent> {
+    @Override
+    public void handle(JobFinishEvent event) {
+      // job has finished
+      // this is the only job, so shut down the Appmaster
+      // note in a workflow scenario, this may lead to creation of a new
+      // job (FIXME?)
+      // Send job-end notification
+      if (getConfig().get(MRJobConfig.MR_JOB_END_NOTIFICATION_URL) != null) {
+        try {
+          LOG.info("Job end notification started for jobID : "
+              + job.getReport().getJobId());
+          JobEndNotifier notifier = new JobEndNotifier();
+          notifier.setConf(getConfig());
+          notifier.notify(job.getReport());
+        } catch (InterruptedException ie) {
+          LOG.warn("Job end notification interrupted for jobID : "
+              + job.getReport().getJobId(), ie);
+        }
+      }
+
+      // TODO:currently just wait for some time so clients can know the
+      // final states. Will be removed once RM come on.
+      try {
+        Thread.sleep(5000);
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+
+      try {
+        // Stop all services
+        // This will also send the final report to the ResourceManager
+        LOG.info("Calling stop for all the services");
+        stop();
+
+      } catch (Throwable t) {
+        LOG.warn("Graceful stop failed ", t);
+      }
+
+      //Bring the process down by force.
+      //Not needed after HADOOP-7140
+      LOG.info("Exiting MR AppMaster..GoodBye!");
+      sysexit();
+    }
+  }
+  
+  /**
+   * create an event handler that handles the job finish event.
+   * @return the job finish event handler.
+   */
+  protected EventHandler<JobFinishEvent> createJobFinishEventHandler() {
+    return new JobFinishEventHandler();
+  }
+
+  /**
+   * Create the recovery service.
+   * @return an instance of the recovery service.
+   */
+  protected Recovery createRecoveryService(AppContext appContext) {
+//    return new RecoveryService(appContext.getApplicationAttemptId(),
+//        appContext.getClock(), getCommitter());
+    // TODO XXX Uncomment after fixing RecoveryService
+    return null;
+  }
+  
+  /**
+   * Create the RMContainerRequestor.
+   * @param clientService the MR Client Service.
+   * @param appContext the application context.
+   * @return an instance of the RMContainerRequestor.
+   */
+  protected RMContainerRequestor createRMContainerRequestor(
+      ClientService clientService, AppContext appContext) {
+    return new RMContainerRequestor(clientService, appContext);
+  }
+  
+  /**
+   * Create the AM Scheduler.
+   * @param requestor The RM Container Requestor.
+   * @param appContext the application context.
+   * @return an instance of the AMScheduler.
+   */
+  protected ContainerAllocator createAMScheduler(
+      RMContainerRequestor requestor, AppContext appContext) {
+    return new RMContainerAllocator(requestor, appContext);
+  }
+
+  /** Create and initialize (but don't start) a single job. */
+  protected Job createJob(Configuration conf) {
+
+    // create single job
+    Job newJob =
+        new JobImpl(jobId, appAttemptID, conf, dispatcher.getEventHandler(),
+            taskAttemptListener, jobTokenSecretManager, fsTokens, clock,
+            completedTasksFromPreviousRun, metrics, committer, newApiCommitter,
+            currentUser.getUserName(), appSubmitTime, amInfos,
+            taskHeartbeatHandler, context);
+    ((RunningAppContext) context).jobs.put(newJob.getID(), newJob);
+
+    dispatcher.register(JobFinishEvent.Type.class,
+        createJobFinishEventHandler());     
+    return newJob;
+  } // end createJob()
+
+
+  /**
+   * Obtain the tokens needed by the job and put them in the UGI
+   * @param conf
+   */
+  protected void downloadTokensAndSetupUGI(Configuration conf) {
+
+    try {
+      this.currentUser = UserGroupInformation.getCurrentUser();
+
+      if (UserGroupInformation.isSecurityEnabled()) {
+        // Read the file-system tokens from the localized tokens-file.
+        Path jobSubmitDir = 
+            FileContext.getLocalFSFileContext().makeQualified(
+                new Path(new File(MRJobConfig.JOB_SUBMIT_DIR)
+                    .getAbsolutePath()));
+        Path jobTokenFile = 
+            new Path(jobSubmitDir, MRJobConfig.APPLICATION_TOKENS_FILE);
+        fsTokens.addAll(Credentials.readTokenStorageFile(jobTokenFile, conf));
+        LOG.info("jobSubmitDir=" + jobSubmitDir + " jobTokenFile="
+            + jobTokenFile);
+
+        for (Token<? extends TokenIdentifier> tk : fsTokens.getAllTokens()) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Token of kind " + tk.getKind()
+                + "in current ugi in the AppMaster for service "
+                + tk.getService());
+          }
+          currentUser.addToken(tk); // For use by AppMaster itself.
+        }
+      }
+    } catch (IOException e) {
+      throw new YarnException(e);
+    }
+  }
+
+  protected void addIfService(Object object) {
+    if (object instanceof Service) {
+      addService((Service) object);
+    }
+  }
+
+  protected EventHandler<JobHistoryEvent> createJobHistoryHandler(
+      AppContext context) {
+    this.jobHistoryEventHandler = new JobHistoryEventHandler2(context,
+      getStartCount());
+    return this.jobHistoryEventHandler;
+  }
+
+  protected AbstractService createStagingDirCleaningService() {
+    return new StagingDirCleaningService();
+  }
+
+  protected Speculator createSpeculator(Configuration conf, AppContext context) {
+    Class<? extends Speculator> speculatorClass;
+
+    try {
+      speculatorClass
+          // "yarn.mapreduce.job.speculator.class"
+          = conf.getClass(MRJobConfig.MR_AM_JOB_SPECULATOR,
+                          DefaultSpeculator.class,
+                          Speculator.class);
+      Constructor<? extends Speculator> speculatorConstructor
+          = speculatorClass.getConstructor
+               (Configuration.class, AppContext.class);
+      Speculator result = speculatorConstructor.newInstance(conf, context);
+
+      return result;
+    } catch (InstantiationException ex) {
+      LOG.error("Can't make a speculator -- check "
+          + MRJobConfig.MR_AM_JOB_SPECULATOR, ex);
+      throw new YarnException(ex);
+    } catch (IllegalAccessException ex) {
+      LOG.error("Can't make a speculator -- check "
+          + MRJobConfig.MR_AM_JOB_SPECULATOR, ex);
+      throw new YarnException(ex);
+    } catch (InvocationTargetException ex) {
+      LOG.error("Can't make a speculator -- check "
+          + MRJobConfig.MR_AM_JOB_SPECULATOR, ex);
+      throw new YarnException(ex);
+    } catch (NoSuchMethodException ex) {
+      LOG.error("Can't make a speculator -- check "
+          + MRJobConfig.MR_AM_JOB_SPECULATOR, ex);
+      throw new YarnException(ex);
+    }
+  }
+
+  protected TaskAttemptListener createTaskAttemptListener(AppContext context,
+      TaskHeartbeatHandler thh, ContainerHeartbeatHandler chh) {
+    TaskAttemptListener lis = new TaskAttemptListenerImpl2(context, thh, chh,
+        jobTokenSecretManager);
+    return lis;
+  }
+  
+  protected TaskHeartbeatHandler createTaskHeartbeatHandler(AppContext context,
+      Configuration conf) {
+    TaskHeartbeatHandler thh = new TaskHeartbeatHandler(context, conf.getInt(
+        MRJobConfig.MR_AM_TASK_LISTENER_THREAD_COUNT,
+        MRJobConfig.DEFAULT_MR_AM_TASK_LISTENER_THREAD_COUNT));
+    return thh;
+  }
+
+  protected ContainerHeartbeatHandler createContainerHeartbeatHandler(AppContext context,
+      Configuration conf) {
+    ContainerHeartbeatHandler chh = new ContainerHeartbeatHandler(context, conf.getInt(
+        MRJobConfig.MR_AM_TASK_LISTENER_THREAD_COUNT,
+        MRJobConfig.DEFAULT_MR_AM_TASK_LISTENER_THREAD_COUNT));
+    return chh;
+  }
+  
+  protected TaskCleaner createTaskCleaner(AppContext context) {
+    return new TaskCleanerImpl(context);
+  }
+
+//  protected ContainerAllocator createContainerAllocator(
+//      final ClientService clientService, final AppContext context) {
+//    return new ContainerAllocatorRouter(clientService, context);
+//  }
+
+  protected ContainerLauncher
+      createContainerLauncher(final AppContext context) {
+    return new ContainerLauncherRouter(context);
+  }
+  
+//  protected ContainerLauncher createNMCommunicator(final AppContext context) {
+//    return new ContainerLauncherImpl(context);
+//  }
+
+  //TODO:should have an interface for MRClientService
+  protected ClientService createClientService(AppContext context) {
+    return new MRClientService(context);
+  }
+
+  public ApplicationId getAppID() {
+    return appAttemptID.getApplicationId();
+  }
+
+  public ApplicationAttemptId getAttemptID() {
+    return appAttemptID;
+  }
+
+  public JobId getJobId() {
+    return jobId;
+  }
+
+  public OutputCommitter getCommitter() {
+    return committer;
+  }
+
+  public boolean isNewApiCommitter() {
+    return newApiCommitter;
+  }
+
+  public int getStartCount() {
+    return appAttemptID.getAttemptId();
+  }
+
+  public AppContext getContext() {
+    return context;
+  }
+
+  public Dispatcher getDispatcher() {
+    return dispatcher;
+  }
+
+  public Map<TaskId, TaskInfo> getCompletedTaskFromPreviousRun() {
+    return completedTasksFromPreviousRun;
+  }
+
+  public List<AMInfo> getAllAMInfos() {
+    return amInfos;
+  }
+  
+//  public ContainerAllocator getContainerAllocator() {
+//    return containerAllocator;
+//  }
+  
+  public ContainerLauncher getContainerLauncher() {
+    return containerLauncher;
+  }
+
+  public TaskAttemptListener getTaskAttemptListener() {
+    return taskAttemptListener;
+  }
+  
+  public TaskHeartbeatHandler getTaskHeartbeatHandler() {
+    return taskHeartbeatHandler;
+  }
+
+  /**
+   * By the time life-cycle of this router starts, job-init would have already
+   * happened.
+   */
+//  private final class ContainerAllocatorRouter extends AbstractService
+//      implements ContainerAllocator {
+//    private final ClientService clientService;
+//    private final AppContext context;
+//    private ContainerAllocator containerAllocator;
+//
+//    ContainerAllocatorRouter(ClientService clientService,
+//        AppContext context) {
+//      super(ContainerAllocatorRouter.class.getName());
+//      this.clientService = clientService;
+//      this.context = context;
+//    }
+//
+//    @Override
+//    public synchronized void start() {
+//      if (job.isUber()) {
+//        this.containerAllocator = new LocalContainerAllocator(
+//            this.clientService, this.context, nmHost, nmPort, nmHttpPort
+//            , containerID);
+//      } else {
+//        
+//        // TODO XXX UberAM
+////        this.containerAllocator = new RMContainerAllocator(
+////            this.clientService, this.context);
+//      }
+//      ((Service)this.containerAllocator).init(getConfig());
+//      ((Service)this.containerAllocator).start();
+//      super.start();
+//    }
+//
+//    @Override
+//    public synchronized void stop() {
+//      ((Service)this.containerAllocator).stop();
+//      super.stop();
+//    }
+//
+//    @Override
+//    public void handle(ContainerAllocatorEvent event) {
+//      this.containerAllocator.handle(event);
+//    }
+//
+//    public void setSignalled(boolean isSignalled) {
+//      ((RMCommunicator) containerAllocator).setSignalled(true);
+//    }
+//  }
+
+  /**
+   * By the time life-cycle of this router starts, job-init would have already
+   * happened.
+   */
+  private final class ContainerLauncherRouter extends AbstractService
+      implements ContainerLauncher {
+    private final AppContext context;
+    private ContainerLauncher containerLauncher;
+
+    ContainerLauncherRouter(AppContext context) {
+      super(ContainerLauncherRouter.class.getName());
+      this.context = context;
+    }
+
+    @Override
+    public synchronized void start() {
+      if (job.isUber()) {
+        // TODO XXX: Handle uber.
+//        this.containerLauncher = new LocalContainerLauncher(context,
+//            (TaskUmbilicalProtocol) taskAttemptListener);
+      } else {
+        this.containerLauncher = new ContainerLauncherImpl(context);
+      }
+      ((Service)this.containerLauncher).init(getConfig());
+      ((Service)this.containerLauncher).start();
+      super.start();
+    }
+
+    @Override
+    public void handle(NMCommunicatorEvent event) {
+        this.containerLauncher.handle(event);
+    }
+
+    @Override
+    public synchronized void stop() {
+      ((Service)this.containerLauncher).stop();
+      super.stop();
+    }
+  }
+
+  private final class StagingDirCleaningService extends AbstractService {
+    StagingDirCleaningService() {
+      super(StagingDirCleaningService.class.getName());
+    }
+
+    @Override
+    public synchronized void stop() {
+      try {
+        cleanupStagingDir();
+      } catch (IOException io) {
+        LOG.error("Failed to cleanup staging dir: ", io);
+      }
+      super.stop();
+    }
+  }
+
+  private class RunningAppContext implements AppContext {
+
+    private final Map<JobId, Job> jobs = new ConcurrentHashMap<JobId, Job>();
+    private final Configuration conf;
+    private final ClusterInfo clusterInfo = new ClusterInfo();
+
+    public RunningAppContext(Configuration config) {
+      this.conf = config;
+    }
+
+    @Override
+    public ApplicationAttemptId getApplicationAttemptId() {
+      return appAttemptID;
+    }
+
+    @Override
+    public ApplicationId getApplicationID() {
+      return appAttemptID.getApplicationId();
+    }
+
+    @Override
+    public String getApplicationName() {
+      return appName;
+    }
+
+    @Override
+    public long getStartTime() {
+      return startTime;
+    }
+
+    @Override
+    public Job getJob(JobId jobID) {
+      return jobs.get(jobID);
+    }
+
+    @Override
+    public Map<JobId, Job> getAllJobs() {
+      return jobs;
+    }
+
+    @Override
+    public EventHandler getEventHandler() {
+      return dispatcher.getEventHandler();
+    }
+
+    @Override
+    public CharSequence getUser() {
+      return this.conf.get(MRJobConfig.USER_NAME);
+    }
+
+    @Override
+    public Clock getClock() {
+      return clock;
+    }
+    
+    @Override
+    public ClusterInfo getClusterInfo() {
+      return this.clusterInfo;
+    }
+    
+    @Override
+    public AMContainer getContainer(ContainerId containerId) {
+      return containers.get(containerId);
+    }
+
+    @Override
+    public AMContainerMap getAllContainers() {
+      return containers;
+    }
+
+    @Override
+    public AMNode getNode(NodeId nodeId) {
+      return nodes.get(nodeId);
+    }
+
+    @Override
+    public AMNodeMap getAllNodes() {
+      return nodes;
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public void start() {
+
+    // Pull completedTasks etc from recovery
+    if (inRecovery) {
+      completedTasksFromPreviousRun = recoveryServ.getCompletedTasks();
+      amInfos = recoveryServ.getAMInfos();
+    }
+
+    // / Create the AMInfo for the current AppMaster
+    if (amInfos == null) {
+      amInfos = new LinkedList<AMInfo>();
+    }
+    AMInfo amInfo =
+        MRBuilderUtils.newAMInfo(appAttemptID, startTime, containerID, nmHost,
+            nmPort, nmHttpPort);
+    amInfos.add(amInfo);
+
+    // /////////////////// Create the job itself.
+    job = createJob(getConfig());
+
+    // End of creating the job.
+
+    // Send out an MR AM inited event for this AM and all previous AMs.
+    for (AMInfo info : amInfos) {
+      dispatcher.getEventHandler().handle(
+          new JobHistoryEvent(job.getID(), new AMStartedEvent(info
+              .getAppAttemptId(), info.getStartTime(), info.getContainerId(),
+              info.getNodeManagerHost(), info.getNodeManagerPort(), info
+                  .getNodeManagerHttpPort())));
+    }
+
+    // metrics system init is really init & start.
+    // It's more test friendly to put it here.
+    DefaultMetricsSystem.initialize("MRAppMaster");
+
+    // create a job event for job intialization
+    JobEvent initJobEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT);
+    // Send init to the job (this does NOT trigger job execution)
+    // This is a synchronous call, not an event through dispatcher. We want
+    // job-init to be done completely here.
+    jobEventDispatcher.handle(initJobEvent);
+
+
+    // JobImpl's InitTransition is done (call above is synchronous), so the
+    // "uber-decision" (MR-1220) has been made.  Query job and switch to
+    // ubermode if appropriate (by registering different container-allocator
+    // and container-launcher services/event-handlers).
+
+    if (job.isUber()) {
+      speculatorEventDispatcher.disableSpeculation();
+      LOG.info("MRAppMaster uberizing job " + job.getID()
+               + " in local container (\"uber-AM\") on node "
+               + nmHost + ":" + nmPort + ".");
+    } else {
+      // send init to speculator only for non-uber jobs. 
+      // This won't yet start as dispatcher isn't started yet.
+      dispatcher.getEventHandler().handle(
+          new SpeculatorEvent(job.getID(), clock.getTime()));
+      LOG.info("MRAppMaster launching normal, non-uberized, multi-container "
+               + "job " + job.getID() + ".");
+    }
+
+    //start all the components
+    super.start();
+
+    // All components have started, start the job.
+    startJobs();
+  }
+
+  /**
+   * This can be overridden to instantiate multiple jobs and create a 
+   * workflow.
+   *
+   * TODO:  Rework the design to actually support this.  Currently much of the
+   * job stuff has been moved to init() above to support uberization (MR-1220).
+   * In a typical workflow, one presumably would want to uberize only a subset
+   * of the jobs (the "small" ones), which is awkward with the current design.
+   */
+  @SuppressWarnings("unchecked")
+  protected void startJobs() {
+    /** create a job-start event to get this ball rolling */
+    JobEvent startJobEvent = new JobEvent(job.getID(), JobEventType.JOB_START);
+    /** send the job-start event. this triggers the job execution. */
+    dispatcher.getEventHandler().handle(startJobEvent);
+  }
+
+  private class JobEventDispatcher implements EventHandler<JobEvent> {
+    @SuppressWarnings("unchecked")
+    @Override
+    public void handle(JobEvent event) {
+      ((EventHandler<JobEvent>)context.getJob(event.getJobId())).handle(event);
+    }
+  }
+
+  private class TaskEventDispatcher implements EventHandler<TaskEvent> {
+    @SuppressWarnings("unchecked")
+    @Override
+    public void handle(TaskEvent event) {
+      Task task = context.getJob(event.getTaskID().getJobId()).getTask(
+          event.getTaskID());
+      ((EventHandler<TaskEvent>)task).handle(event);
+    }
+  }
+
+  private class TaskAttemptEventDispatcher 
+          implements EventHandler<TaskAttemptEvent> {
+    @SuppressWarnings("unchecked")
+    @Override
+    public void handle(TaskAttemptEvent event) {
+      Job job = context.getJob(event.getTaskAttemptID().getTaskId().getJobId());
+      Task task = job.getTask(event.getTaskAttemptID().getTaskId());
+      TaskAttempt attempt = task.getAttempt(event.getTaskAttemptID());
+      ((EventHandler<TaskAttemptEvent>) attempt).handle(event);
+    }
+  }
+
+  private class SpeculatorEventDispatcher implements
+      EventHandler<SpeculatorEvent> {
+    private final Configuration conf;
+    private volatile boolean disabled;
+    public SpeculatorEventDispatcher(Configuration config) {
+      this.conf = config;
+    }
+    @Override
+    public void handle(SpeculatorEvent event) {
+      if (disabled) {
+        return;
+      }
+
+      TaskId tId = event.getTaskID();
+      TaskType tType = null;
+      /* event's TaskId will be null if the event type is JOB_CREATE or
+       * ATTEMPT_STATUS_UPDATE
+       */
+      if (tId != null) {
+        tType = tId.getTaskType(); 
+      }
+      boolean shouldMapSpec =
+              conf.getBoolean(MRJobConfig.MAP_SPECULATIVE, false);
+      boolean shouldReduceSpec =
+              conf.getBoolean(MRJobConfig.REDUCE_SPECULATIVE, false);
+
+      /* The point of the following is to allow the MAP and REDUCE speculative
+       * config values to be independent:
+       * IF spec-exec is turned on for maps AND the task is a map task
+       * OR IF spec-exec is turned on for reduces AND the task is a reduce task
+       * THEN call the speculator to handle the event.
+       */
+      if ( (shouldMapSpec && (tType == null || tType == TaskType.MAP))
+        || (shouldReduceSpec && (tType == null || tType == TaskType.REDUCE))) {
+        // Speculator IS enabled, direct the event to there.
+        speculator.handle(event);
+      }
+    }
+
+    public void disableSpeculation() {
+      disabled = true;
+    }
+
+  }
+
+  private static void validateInputParam(String value, String param)
+      throws IOException {
+    if (value == null) {
+      String msg = param + " is null";
+      LOG.error(msg);
+      throw new IOException(msg);
+    }
+  }
+
+  public static void main(String[] args) {
+    try {
+      Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
+      String containerIdStr =
+          System.getenv(ApplicationConstants.AM_CONTAINER_ID_ENV);
+      String nodeHostString = System.getenv(ApplicationConstants.NM_HOST_ENV);
+      String nodePortString = System.getenv(ApplicationConstants.NM_PORT_ENV);
+      String nodeHttpPortString =
+          System.getenv(ApplicationConstants.NM_HTTP_PORT_ENV);
+      String appSubmitTimeStr =
+          System.getenv(ApplicationConstants.APP_SUBMIT_TIME_ENV);
+      
+      validateInputParam(containerIdStr,
+          ApplicationConstants.AM_CONTAINER_ID_ENV);
+      validateInputParam(nodeHostString, ApplicationConstants.NM_HOST_ENV);
+      validateInputParam(nodePortString, ApplicationConstants.NM_PORT_ENV);
+      validateInputParam(nodeHttpPortString,
+          ApplicationConstants.NM_HTTP_PORT_ENV);
+      validateInputParam(appSubmitTimeStr,
+          ApplicationConstants.APP_SUBMIT_TIME_ENV);
+
+      ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
+      ApplicationAttemptId applicationAttemptId =
+          containerId.getApplicationAttemptId();
+      long appSubmitTime = Long.parseLong(appSubmitTimeStr);
+      
+      MRAppMaster appMaster =
+          new MRAppMaster(applicationAttemptId, containerId, nodeHostString,
+              Integer.parseInt(nodePortString),
+              Integer.parseInt(nodeHttpPortString), appSubmitTime);
+      ShutdownHookManager.get().addShutdownHook(
+        new MRAppMasterShutdownHook(appMaster), SHUTDOWN_HOOK_PRIORITY);
+      YarnConfiguration conf = new YarnConfiguration(new JobConf());
+      conf.addResource(new Path(MRJobConfig.JOB_CONF_FILE));
+      String jobUserName = System
+          .getenv(ApplicationConstants.Environment.USER.name());
+      conf.set(MRJobConfig.USER_NAME, jobUserName);
+      // Do not automatically close FileSystem objects so that in case of
+      // SIGTERM I have a chance to write out the job history. I'll be closing
+      // the objects myself.
+      conf.setBoolean("fs.automatic.close", false);
+      initAndStartAppMaster(appMaster, conf, jobUserName);
+    } catch (Throwable t) {
+      LOG.fatal("Error starting MRAppMaster", t);
+      System.exit(1);
+    }
+  }
+
+  // The shutdown hook that runs when a signal is received AND during normal
+  // close of the JVM.
+  static class MRAppMasterShutdownHook implements Runnable {
+    MRAppMaster appMaster;
+    MRAppMasterShutdownHook(MRAppMaster appMaster) {
+      this.appMaster = appMaster;
+    }
+    public void run() {
+      LOG.info("MRAppMaster received a signal. Signaling RMCommunicator and "
+        + "JobHistoryEventHandler.");
+      // Notify the JHEH and RMCommunicator that a SIGTERM has been received so
+      // that they don't take too long in shutting down 
+//      if(appMaster.containerAllocator instanceof ContainerAllocatorRouter) {
+//        ((ContainerAllocatorRouter) appMaster.containerAllocator)
+//        .setSignalled(true);
+//      }
+      if(appMaster.jobHistoryEventHandler != null) {
+        appMaster.jobHistoryEventHandler.setSignalled(true);
+      }
+      appMaster.stop();
+    }
+  }
+
+  protected static void initAndStartAppMaster(final MRAppMaster appMaster,
+      final YarnConfiguration conf, String jobUserName) throws IOException,
+      InterruptedException {
+    UserGroupInformation.setConfiguration(conf);
+    UserGroupInformation appMasterUgi = UserGroupInformation
+        .createRemoteUser(jobUserName);
+    appMasterUgi.doAs(new PrivilegedExceptionAction<Object>() {
+      @Override
+      public Object run() throws Exception {
+        appMaster.init(conf);
+        appMaster.start();
+        return null;
+      }
+    });
+  }
+}

Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/MRClientSecurityInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/MRClientSecurityInfo.java?rev=1376283&view=auto
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/MRClientSecurityInfo.java (added)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/MRClientSecurityInfo.java Wed Aug 22 22:11:39 2012
@@ -0,0 +1,58 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app2;
+
+import java.lang.annotation.Annotation;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.v2.api.MRClientProtocolPB;
+import org.apache.hadoop.security.KerberosInfo;
+import org.apache.hadoop.security.SecurityInfo;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.security.token.TokenInfo;
+import org.apache.hadoop.security.token.TokenSelector;
+import org.apache.hadoop.yarn.security.client.ClientTokenSelector;
+
+public class MRClientSecurityInfo extends SecurityInfo {
+
+  @Override
+  public KerberosInfo getKerberosInfo(Class<?> protocol, Configuration conf) {
+    return null;
+  }
+
+  @Override
+  public TokenInfo getTokenInfo(Class<?> protocol, Configuration conf) {
+    if (!protocol.equals(MRClientProtocolPB.class)) {
+      return null;
+    }
+    return new TokenInfo() {
+
+      @Override
+      public Class<? extends Annotation> annotationType() {
+        return null;
+      }
+
+      @Override
+      public Class<? extends TokenSelector<? extends TokenIdentifier>>
+          value() {
+        return ClientTokenSelector.class;
+      }
+    };
+  }
+}

Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/TaskAttemptListener.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/TaskAttemptListener.java?rev=1376283&view=auto
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/TaskAttemptListener.java (added)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/TaskAttemptListener.java Wed Aug 22 22:11:39 2012
@@ -0,0 +1,68 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app2;
+
+import java.net.InetSocketAddress;
+
+import org.apache.hadoop.mapred.Task;
+import org.apache.hadoop.mapred.WrappedJvmID;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+
+/**
+ * This class listens for changes to the state of a Task.
+ */
+public interface TaskAttemptListener {
+
+  InetSocketAddress getAddress();
+
+  void registerRunningJvm(WrappedJvmID jvmID, ContainerId containerId);
+  
+  void registerTaskAttempt(TaskAttemptId attemptId, WrappedJvmID jvmId);
+  
+  void unregisterRunningJvm(WrappedJvmID jvmID);
+  
+  void unregisterTaskAttempt(TaskAttemptId attemptID);
+  /**
+   * Register a JVM with the listener.  This should be called as soon as a 
+   * JVM ID is assigned to a task attempt, before it has been launched.
+   * @param task the task itself for this JVM.
+   * @param jvmID The ID of the JVM .
+   */
+//  void registerPendingTask(Task task, WrappedJvmID jvmID);
+  
+  /**
+   * Register task attempt. This should be called when the JVM has been
+   * launched.
+   * 
+   * @param attemptID
+   *          the id of the attempt for this JVM.
+   * @param jvmID the ID of the JVM.
+   */
+//  void registerLaunchedTask(TaskAttemptId attemptID, WrappedJvmID jvmID);
+
+  /**
+   * Unregister the JVM and the attempt associated with it.  This should be 
+   * called when the attempt/JVM has finished executing and is being cleaned up.
+   * @param attemptID the ID of the attempt.
+   * @param jvmID the ID of the JVM for that attempt.
+   */
+//  void unregister(TaskAttemptId attemptID, WrappedJvmID jvmID);
+
+}

Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/TaskHeartbeatHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/TaskHeartbeatHandler.java?rev=1376283&view=auto
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/TaskHeartbeatHandler.java (added)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/TaskHeartbeatHandler.java Wed Aug 22 22:11:39 2012
@@ -0,0 +1,194 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app2;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptDiagnosticsUpdateEvent;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventType;
+import org.apache.hadoop.yarn.Clock;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.service.AbstractService;
+
+
+/**
+ * This class keeps track of tasks that have already been launched. It
+ * determines if a task is alive and running or marks a task as dead if it does
+ * not hear from it for a long time.
+ * 
+ */
+@SuppressWarnings({"unchecked", "rawtypes"})
+public class TaskHeartbeatHandler extends AbstractService {
+  
+  // TODO XXX: Extend HeartbeatHandlerBase
+  
+  private static class ReportTime {
+    private long lastPing;
+    private long lastProgress;
+    
+    public ReportTime(long time) {
+      setLastProgress(time);
+    }
+    
+    public synchronized void setLastPing(long time) {
+      lastPing = time;
+    }
+    
+    public synchronized void setLastProgress(long time) {
+      lastProgress = time;
+      lastPing = time;
+    }
+    
+    public synchronized long getLastPing() {
+      return lastPing;
+    }
+    
+    public synchronized long getLastProgress() {
+      return lastProgress;
+    }
+  }
+  
+  private static final Log LOG = LogFactory.getLog(TaskHeartbeatHandler.class);
+  private static final int PING_TIMEOUT = 5 * 60 * 1000;
+  
+  //thread which runs periodically to see the last time since a heartbeat is
+  //received from a task.
+  private Thread lostTaskCheckerThread;
+  private volatile boolean stopped;
+  private int taskTimeOut = 5 * 60 * 1000;// 5 mins
+  private int taskTimeOutCheckInterval = 30 * 1000; // 30 seconds.
+
+  private final EventHandler eventHandler;
+  private final Clock clock;
+  private final AppContext context;
+  
+  private ConcurrentMap<TaskAttemptId, ReportTime> runningAttempts;
+
+  public TaskHeartbeatHandler(AppContext context) {
+    this(context, 16);
+  }
+  
+  public TaskHeartbeatHandler(AppContext context,
+      int numThreads) {
+    super("TaskHeartbeatHandler");
+    this.eventHandler = context.getEventHandler();
+    this.clock = context.getClock();
+    this.context = context;
+    numThreads = numThreads <= 0 ? 1 : numThreads;
+    runningAttempts =
+      new ConcurrentHashMap<TaskAttemptId, ReportTime>(16, 0.75f, numThreads);
+  }
+
+  @Override
+  public void init(Configuration conf) {
+    super.init(conf);
+    taskTimeOut = conf.getInt(MRJobConfig.TASK_TIMEOUT, 5 * 60 * 1000);
+    taskTimeOutCheckInterval =
+        conf.getInt(MRJobConfig.TASK_TIMEOUT_CHECK_INTERVAL_MS, 30 * 1000);
+  }
+
+  @Override
+  public void start() {
+    lostTaskCheckerThread = new Thread(new PingChecker());
+    lostTaskCheckerThread.setName("TaskHeartbeatHandler PingChecker");
+    lostTaskCheckerThread.start();
+    super.start();
+  }
+
+  @Override
+  public void stop() {
+    stopped = true;
+    lostTaskCheckerThread.interrupt();
+    super.stop();
+  }
+
+  public void progressing(TaskAttemptId attemptID) {
+  //only put for the registered attempts
+    //TODO throw an exception if the task isn't registered.
+    ReportTime time = runningAttempts.get(attemptID);
+    if(time != null) {
+      time.setLastProgress(clock.getTime());
+    }
+  }
+
+  public void pinged(TaskAttemptId attemptID) {
+    //only put for the registered attempts
+      //TODO throw an exception if the task isn't registered.
+      ReportTime time = runningAttempts.get(attemptID);
+      if(time != null) {
+        time.setLastPing(clock.getTime());
+      }
+    }
+  
+  public void register(TaskAttemptId attemptID) {
+    runningAttempts.put(attemptID, new ReportTime(clock.getTime()));
+  }
+
+  public void unregister(TaskAttemptId attemptID) {
+    runningAttempts.remove(attemptID);
+  }
+
+  private class PingChecker implements Runnable {
+
+    @Override
+    public void run() {
+      while (!stopped && !Thread.currentThread().isInterrupted()) {
+        Iterator<Map.Entry<TaskAttemptId, ReportTime>> iterator =
+            runningAttempts.entrySet().iterator();
+
+        // avoid calculating current time everytime in loop
+        long currentTime = clock.getTime();
+
+        while (iterator.hasNext()) {
+          Map.Entry<TaskAttemptId, ReportTime> entry = iterator.next();
+          boolean taskTimedOut = (taskTimeOut > 0) && 
+              (currentTime > (entry.getValue().getLastProgress() + taskTimeOut));
+          boolean pingTimedOut =
+              (currentTime > (entry.getValue().getLastPing() + PING_TIMEOUT));
+              
+          if(taskTimedOut || pingTimedOut) {
+            // task is lost, remove from the list and raise lost event
+            iterator.remove();
+            eventHandler.handle(new TaskAttemptDiagnosticsUpdateEvent(entry
+                .getKey(), "AttemptID:" + entry.getKey().toString()
+                + " Timed out after " + taskTimeOut / 1000 + " secs"));
+            eventHandler.handle(new TaskAttemptEvent(entry.getKey(),
+                TaskAttemptEventType.TA_TIMED_OUT));
+          }
+        }
+        try {
+          Thread.sleep(taskTimeOutCheckInterval);
+        } catch (InterruptedException e) {
+          LOG.info("TaskHeartbeatHandler thread interrupted");
+          break;
+        }
+      }
+    }
+  }
+
+}

Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/client/ClientService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/client/ClientService.java?rev=1376283&view=auto
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/client/ClientService.java (added)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/client/ClientService.java Wed Aug 22 22:11:39 2012
@@ -0,0 +1,28 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app2.client;
+
+import java.net.InetSocketAddress;
+
+public interface ClientService {
+
+  InetSocketAddress getBindAddress();
+
+  int getHttpPort();
+}

Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/client/MRClientService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/client/MRClientService.java?rev=1376283&view=auto
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/client/MRClientService.java (added)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/client/MRClientService.java Wed Aug 22 22:11:39 2012
@@ -0,0 +1,392 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app2.client;
+
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetCountersRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetCountersResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDiagnosticsRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDiagnosticsResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptCompletionEventsRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptCompletionEventsResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptReportRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptReportResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportsRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportsResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillJobRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillJobResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskAttemptRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskAttemptResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskResponse;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
+import org.apache.hadoop.mapreduce.v2.app2.AppContext;
+import org.apache.hadoop.mapreduce.v2.app2.job.Job;
+import org.apache.hadoop.mapreduce.v2.app2.job.Task;
+import org.apache.hadoop.mapreduce.v2.app2.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.JobDiagnosticsUpdateEvent;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.JobEvent;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.JobEventType;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptDiagnosticsUpdateEvent;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventType;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskEvent;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskEventType;
+import org.apache.hadoop.mapreduce.v2.app2.security.authorize.MRAMPolicyProvider;
+import org.apache.hadoop.mapreduce.v2.app2.webapp.AMWebApp;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.PolicyProvider;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.ipc.RPCUtil;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.security.client.ClientToAMSecretManager;
+import org.apache.hadoop.yarn.security.client.ClientTokenIdentifier;
+import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.hadoop.yarn.webapp.WebApp;
+import org.apache.hadoop.yarn.webapp.WebApps;
+
+/**
+ * This module is responsible for talking to the 
+ * jobclient (user facing).
+ *
+ */
+public class MRClientService extends AbstractService 
+    implements ClientService {
+
+  static final Log LOG = LogFactory.getLog(MRClientService.class);
+  
+  private MRClientProtocol protocolHandler;
+  private Server server;
+  private WebApp webApp;
+  private InetSocketAddress bindAddress;
+  private AppContext appContext;
+
+  public MRClientService(AppContext appContext) {
+    super("MRClientService");
+    this.appContext = appContext;
+    this.protocolHandler = new MRClientProtocolHandler();
+  }
+
+  public void start() {
+    Configuration conf = getConfig();
+    YarnRPC rpc = YarnRPC.create(conf);
+    InetSocketAddress address = new InetSocketAddress(0);
+
+    ClientToAMSecretManager secretManager = null;
+    if (UserGroupInformation.isSecurityEnabled()) {
+      secretManager = new ClientToAMSecretManager();
+      String secretKeyStr =
+          System
+              .getenv(ApplicationConstants.APPLICATION_CLIENT_SECRET_ENV_NAME);
+      byte[] bytes = Base64.decodeBase64(secretKeyStr);
+      ClientTokenIdentifier identifier = new ClientTokenIdentifier(
+          this.appContext.getApplicationID());
+      secretManager.setMasterKey(identifier, bytes);
+    }
+    server =
+        rpc.getServer(MRClientProtocol.class, protocolHandler, address,
+            conf, secretManager,
+            conf.getInt(MRJobConfig.MR_AM_JOB_CLIENT_THREAD_COUNT, 
+                MRJobConfig.DEFAULT_MR_AM_JOB_CLIENT_THREAD_COUNT),
+                MRJobConfig.MR_AM_JOB_CLIENT_PORT_RANGE);
+    
+    // Enable service authorization?
+    if (conf.getBoolean(
+        CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, 
+        false)) {
+      refreshServiceAcls(conf, new MRAMPolicyProvider());
+    }
+
+    server.start();
+    this.bindAddress = NetUtils.getConnectAddress(server);
+    LOG.info("Instantiated MRClientService at " + this.bindAddress);
+    try {
+      webApp = WebApps.$for("mapreduce", AppContext.class, appContext, "ws").with(conf).
+          start(new AMWebApp());
+    } catch (Exception e) {
+      LOG.error("Webapps failed to start. Ignoring for now:", e);
+    }
+    super.start();
+  }
+
+  void refreshServiceAcls(Configuration configuration, 
+      PolicyProvider policyProvider) {
+    this.server.refreshServiceAcl(configuration, policyProvider);
+  }
+
+  public void stop() {
+    server.stop();
+    if (webApp != null) {
+      webApp.stop();
+    }
+    super.stop();
+  }
+
+  @Override
+  public InetSocketAddress getBindAddress() {
+    return bindAddress;
+  }
+
+  @Override
+  public int getHttpPort() {
+    return webApp.port();
+  }
+
+  class MRClientProtocolHandler implements MRClientProtocol {
+
+    private RecordFactory recordFactory = 
+      RecordFactoryProvider.getRecordFactory(null);
+
+    @Override
+    public InetSocketAddress getConnectAddress() {
+      return getBindAddress();
+    }
+    
+    private Job verifyAndGetJob(JobId jobID, 
+        boolean modifyAccess) throws YarnRemoteException {
+      Job job = appContext.getJob(jobID);
+      return job;
+    }
+ 
+    private Task verifyAndGetTask(TaskId taskID, 
+        boolean modifyAccess) throws YarnRemoteException {
+      Task task = verifyAndGetJob(taskID.getJobId(), 
+          modifyAccess).getTask(taskID);
+      if (task == null) {
+        throw RPCUtil.getRemoteException("Unknown Task " + taskID);
+      }
+      return task;
+    }
+
+    private TaskAttempt verifyAndGetAttempt(TaskAttemptId attemptID, 
+        boolean modifyAccess) throws YarnRemoteException {
+      TaskAttempt attempt = verifyAndGetTask(attemptID.getTaskId(), 
+          modifyAccess).getAttempt(attemptID);
+      if (attempt == null) {
+        throw RPCUtil.getRemoteException("Unknown TaskAttempt " + attemptID);
+      }
+      return attempt;
+    }
+
+    @Override
+    public GetCountersResponse getCounters(GetCountersRequest request) 
+      throws YarnRemoteException {
+      JobId jobId = request.getJobId();
+      Job job = verifyAndGetJob(jobId, false);
+      GetCountersResponse response =
+        recordFactory.newRecordInstance(GetCountersResponse.class);
+      response.setCounters(TypeConverter.toYarn(job.getAllCounters()));
+      return response;
+    }
+    
+    @Override
+    public GetJobReportResponse getJobReport(GetJobReportRequest request) 
+      throws YarnRemoteException {
+      JobId jobId = request.getJobId();
+      Job job = verifyAndGetJob(jobId, false);
+      GetJobReportResponse response = 
+        recordFactory.newRecordInstance(GetJobReportResponse.class);
+      if (job != null) {
+        response.setJobReport(job.getReport());
+      }
+      else {
+        response.setJobReport(null);
+      }
+      return response;
+    }
+
+    @Override
+    public GetTaskAttemptReportResponse getTaskAttemptReport(
+        GetTaskAttemptReportRequest request) throws YarnRemoteException {
+      TaskAttemptId taskAttemptId = request.getTaskAttemptId();
+      GetTaskAttemptReportResponse response =
+        recordFactory.newRecordInstance(GetTaskAttemptReportResponse.class);
+      response.setTaskAttemptReport(
+          verifyAndGetAttempt(taskAttemptId, false).getReport());
+      return response;
+    }
+
+    @Override
+    public GetTaskReportResponse getTaskReport(GetTaskReportRequest request) 
+      throws YarnRemoteException {
+      TaskId taskId = request.getTaskId();
+      GetTaskReportResponse response = 
+        recordFactory.newRecordInstance(GetTaskReportResponse.class);
+      response.setTaskReport(verifyAndGetTask(taskId, false).getReport());
+      return response;
+    }
+
+    @Override
+    public GetTaskAttemptCompletionEventsResponse getTaskAttemptCompletionEvents(
+        GetTaskAttemptCompletionEventsRequest request) 
+        throws YarnRemoteException {
+      JobId jobId = request.getJobId();
+      int fromEventId = request.getFromEventId();
+      int maxEvents = request.getMaxEvents();
+      Job job = verifyAndGetJob(jobId, false);
+      
+      GetTaskAttemptCompletionEventsResponse response = 
+        recordFactory.newRecordInstance(GetTaskAttemptCompletionEventsResponse.class);
+      response.addAllCompletionEvents(Arrays.asList(
+          job.getTaskAttemptCompletionEvents(fromEventId, maxEvents)));
+      return response;
+    }
+    
+    @SuppressWarnings("unchecked")
+    @Override
+    public KillJobResponse killJob(KillJobRequest request) 
+      throws YarnRemoteException {
+      JobId jobId = request.getJobId();
+      String message = "Kill Job received from client " + jobId;
+      LOG.info(message);
+  	  verifyAndGetJob(jobId, true);
+      appContext.getEventHandler().handle(
+          new JobDiagnosticsUpdateEvent(jobId, message));
+      appContext.getEventHandler().handle(
+          new JobEvent(jobId, JobEventType.JOB_KILL));
+      KillJobResponse response = 
+        recordFactory.newRecordInstance(KillJobResponse.class);
+      return response;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public KillTaskResponse killTask(KillTaskRequest request) 
+      throws YarnRemoteException {
+      TaskId taskId = request.getTaskId();
+      String message = "Kill task received from client " + taskId;
+      LOG.info(message);
+      verifyAndGetTask(taskId, true);
+      appContext.getEventHandler().handle(
+          new TaskEvent(taskId, TaskEventType.T_KILL));
+      KillTaskResponse response = 
+        recordFactory.newRecordInstance(KillTaskResponse.class);
+      return response;
+    }
+    
+    @SuppressWarnings("unchecked")
+    @Override
+    public KillTaskAttemptResponse killTaskAttempt(
+        KillTaskAttemptRequest request) throws YarnRemoteException {
+      TaskAttemptId taskAttemptId = request.getTaskAttemptId();
+      String message = "Kill task attempt received from client " + taskAttemptId;
+      LOG.info(message);
+      verifyAndGetAttempt(taskAttemptId, true);
+      appContext.getEventHandler().handle(
+          new TaskAttemptDiagnosticsUpdateEvent(taskAttemptId, message));
+      appContext.getEventHandler().handle(
+          new TaskAttemptEvent(taskAttemptId, 
+              TaskAttemptEventType.TA_KILL_REQUEST));
+      KillTaskAttemptResponse response = 
+        recordFactory.newRecordInstance(KillTaskAttemptResponse.class);
+      return response;
+    }
+
+    @Override
+    public GetDiagnosticsResponse getDiagnostics(
+        GetDiagnosticsRequest request) throws YarnRemoteException {
+      TaskAttemptId taskAttemptId = request.getTaskAttemptId();
+      
+      GetDiagnosticsResponse response = 
+        recordFactory.newRecordInstance(GetDiagnosticsResponse.class);
+      response.addAllDiagnostics(
+          verifyAndGetAttempt(taskAttemptId, false).getDiagnostics());
+      return response;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public FailTaskAttemptResponse failTaskAttempt(
+        FailTaskAttemptRequest request) throws YarnRemoteException {
+      TaskAttemptId taskAttemptId = request.getTaskAttemptId();
+      String message = "Fail task attempt received from client " + taskAttemptId;
+      LOG.info(message);
+      verifyAndGetAttempt(taskAttemptId, true);
+      appContext.getEventHandler().handle(
+          new TaskAttemptDiagnosticsUpdateEvent(taskAttemptId, message));
+      appContext.getEventHandler().handle(
+          new TaskAttemptEvent(taskAttemptId, 
+              TaskAttemptEventType.TA_FAIL_REQUEST));
+      FailTaskAttemptResponse response = recordFactory.
+        newRecordInstance(FailTaskAttemptResponse.class);
+      return response;
+    }
+
+    private final Object getTaskReportsLock = new Object();
+
+    @Override
+    public GetTaskReportsResponse getTaskReports(
+        GetTaskReportsRequest request) throws YarnRemoteException {
+      JobId jobId = request.getJobId();
+      TaskType taskType = request.getTaskType();
+      
+      GetTaskReportsResponse response = 
+        recordFactory.newRecordInstance(GetTaskReportsResponse.class);
+      
+      Job job = verifyAndGetJob(jobId, false);
+      Collection<Task> tasks = job.getTasks(taskType).values();
+      LOG.info("Getting task report for " + taskType + "   " + jobId
+          + ". Report-size will be " + tasks.size());
+
+      // Take lock to allow only one call, otherwise heap will blow up because
+      // of counters in the report when there are multiple callers.
+      synchronized (getTaskReportsLock) {
+        for (Task task : tasks) {
+          response.addTaskReport(task.getReport());
+        }
+      }
+
+      return response;
+    }
+
+    @Override
+    public GetDelegationTokenResponse getDelegationToken(
+        GetDelegationTokenRequest request) throws YarnRemoteException {
+      throw RPCUtil.getRemoteException("MR AM not authorized to issue delegation" +
+      		" token");
+    }
+  }
+}

Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/client/package-info.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/client/package-info.java?rev=1376283&view=auto
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/client/package-info.java (added)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/client/package-info.java Wed Aug 22 22:11:39 2012
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@InterfaceAudience.Private
+package org.apache.hadoop.mapreduce.v2.app2.client;
+import org.apache.hadoop.classification.InterfaceAudience;

Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/Job.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/Job.java?rev=1376283&view=auto
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/Job.java (added)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/Job.java Wed Aug 22 22:11:39 2012
@@ -0,0 +1,99 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app2.job;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.JobACL;
+import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.AccessControlList;
+
+
+/**
+ * Main interface to interact with the job. Provides only getters. 
+ */
+public interface Job {
+
+  JobId getID();
+  String getName();
+  JobState getState();
+  JobReport getReport();
+
+  /**
+   * Get all the counters of this job. This includes job-counters aggregated
+   * together with the counters of each task. This creates a clone of the
+   * Counters, so use this judiciously.  
+   * @return job-counters and aggregate task-counters
+   */
+  Counters getAllCounters();
+
+  Map<TaskId,Task> getTasks();
+  Map<TaskId,Task> getTasks(TaskType taskType);
+  Task getTask(TaskId taskID);
+  List<String> getDiagnostics();
+  int getTotalMaps();
+  int getTotalReduces();
+  int getCompletedMaps();
+  int getCompletedReduces();
+  float getProgress();
+  boolean isUber();
+  String getUserName();
+  String getQueueName();
+  
+  Configuration getConf();
+  
+  /**
+   * @return a path to where the config file for this job is located.
+   */
+  Path getConfFile();
+  
+  /**
+   * @return a parsed version of the config files pointed to by 
+   * {@link #getConfFile()}.
+   * @throws IOException on any error trying to load the conf file. 
+   */
+  Configuration loadConfFile() throws IOException;
+  
+  /**
+   * @return the ACLs for this job for each type of JobACL given. 
+   */
+  Map<JobACL, AccessControlList> getJobACLs();
+
+  TaskAttemptCompletionEvent[]
+      getTaskAttemptCompletionEvents(int fromEventId, int maxEvents);
+
+  /**
+   * @return information for MR AppMasters (previously failed and current)
+   */
+  List<AMInfo> getAMInfos();
+  
+  boolean checkAccess(UserGroupInformation callerUGI, JobACL jobOperation);
+}



Mime
View raw message