hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject svn commit: r1376283 [3/22] - in /hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client: ./ hadoop-mapreduce-client-app2/ hadoop-mapreduce-client-app2/src/ hadoop-mapreduce-client-app2/src/main/ hadoop-mapreduce-client-app2/sr...
Date Wed, 22 Aug 2012 22:11:48 GMT
Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler2.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler2.java?rev=1376283&view=auto
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler2.java (added)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler2.java Wed Aug 22 22:11:39 2012
@@ -0,0 +1,917 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.jobhistory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.JobCounter;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.app2.AppContext;
+import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.service.AbstractService;
+
+/**
+ * The job history events get routed to this class. This class writes the Job
+ * history events to the DFS directly into a staging dir and then moved to a
+ * done-dir. JobHistory implementation is in this package to access package
+ * private classes.
+ */
+public class JobHistoryEventHandler2 extends AbstractService
+    implements EventHandler<JobHistoryEvent> {
+
+  private final AppContext context;
+  private final int startCount;
+
+  private int eventCounter;
+
+  //TODO Does the FS object need to be different ? 
+  private FileSystem stagingDirFS; // log Dir FileSystem
+  private FileSystem doneDirFS; // done Dir FileSystem
+
+
+  private Path stagingDirPath = null;
+  private Path doneDirPrefixPath = null; // folder for completed jobs
+
+  private int maxUnflushedCompletionEvents;
+  private int postJobCompletionMultiplier;
+  private long flushTimeout;
+  private int minQueueSizeForBatchingFlushes; // TODO: Rename
+
+  private int numUnflushedCompletionEvents = 0;
+  private boolean isTimerActive;
+
+
+  protected BlockingQueue<JobHistoryEvent> eventQueue =
+    new LinkedBlockingQueue<JobHistoryEvent>();
+  protected Thread eventHandlingThread;
+  private volatile boolean stopped;
+  private final Object lock = new Object();
+
+  private static final Log LOG = LogFactory.getLog(
+      JobHistoryEventHandler2.class);
+
+  protected static final Map<JobId, MetaInfo> fileMap =
+    Collections.<JobId,MetaInfo>synchronizedMap(new HashMap<JobId,MetaInfo>());
+
+  // Has a signal (SIGTERM etc) been issued?
+  protected volatile boolean isSignalled = false;
+
+  public JobHistoryEventHandler2(AppContext context, int startCount) {
+    super("JobHistoryEventHandler");
+    this.context = context;
+    this.startCount = startCount;
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.hadoop.yarn.service.AbstractService#init(org.
+   * apache.hadoop.conf.Configuration)
+   * Initializes the FileSystem and Path objects for the log and done directories.
+   * Creates these directories if they do not already exist.
+   */
+  @Override
+  public void init(Configuration conf) {
+
+    String stagingDirStr = null;
+    String doneDirStr = null;
+    String userDoneDirStr = null;
+    try {
+      stagingDirStr = JobHistoryUtils.getConfiguredHistoryStagingDirPrefix(conf);
+      doneDirStr =
+          JobHistoryUtils.getConfiguredHistoryIntermediateDoneDirPrefix(conf);
+      userDoneDirStr =
+          JobHistoryUtils.getHistoryIntermediateDoneDirForUser(conf);
+    } catch (IOException e) {
+      LOG.error("Failed while getting the configured log directories", e);
+      throw new YarnException(e);
+    }
+
+    //Check for the existence of the history staging dir. Maybe create it. 
+    try {
+      stagingDirPath =
+          FileSystem.get(conf).makeQualified(new Path(stagingDirStr));
+      stagingDirFS = FileSystem.get(stagingDirPath.toUri(), conf);
+      mkdir(stagingDirFS, stagingDirPath, new FsPermission(
+          JobHistoryUtils.HISTORY_STAGING_DIR_PERMISSIONS));
+    } catch (IOException e) {
+      LOG.error("Failed while checking for/creating  history staging path: ["
+          + stagingDirPath + "]", e);
+      throw new YarnException(e);
+    }
+
+    //Check for the existence of intermediate done dir.
+    Path doneDirPath = null;
+    try {
+      doneDirPath = FileSystem.get(conf).makeQualified(new Path(doneDirStr));
+      doneDirFS = FileSystem.get(doneDirPath.toUri(), conf);
+      // This directory will be in a common location, or this may be a cluster
+      // meant for a single user. Creating based on the conf. Should ideally be
+      // created by the JobHistoryServer or as part of deployment.
+      if (!doneDirFS.exists(doneDirPath)) {
+      if (JobHistoryUtils.shouldCreateNonUserDirectory(conf)) {
+        LOG.info("Creating intermediate history logDir: ["
+            + doneDirPath
+            + "] + based on conf. Should ideally be created by the JobHistoryServer: "
+            + MRJobConfig.MR_AM_CREATE_JH_INTERMEDIATE_BASE_DIR);
+          mkdir(
+              doneDirFS,
+              doneDirPath,
+              new FsPermission(
+            JobHistoryUtils.HISTORY_INTERMEDIATE_DONE_DIR_PERMISSIONS
+                .toShort()));
+          // TODO Temporary toShort till new FsPermission(FsPermissions)
+          // respects
+        // sticky
+      } else {
+          String message = "Not creating intermediate history logDir: ["
+                + doneDirPath
+                + "] based on conf: "
+                + MRJobConfig.MR_AM_CREATE_JH_INTERMEDIATE_BASE_DIR
+                + ". Either set to true or pre-create this directory with" +
+                " appropriate permissions";
+        LOG.error(message);
+        throw new YarnException(message);
+      }
+      }
+    } catch (IOException e) {
+      LOG.error("Failed checking for the existance of history intermediate " +
+      		"done directory: [" + doneDirPath + "]");
+      throw new YarnException(e);
+    }
+
+    //Check/create user directory under intermediate done dir.
+    try {
+      doneDirPrefixPath =
+          FileSystem.get(conf).makeQualified(new Path(userDoneDirStr));
+      mkdir(doneDirFS, doneDirPrefixPath, new FsPermission(
+          JobHistoryUtils.HISTORY_INTERMEDIATE_USER_DIR_PERMISSIONS));
+    } catch (IOException e) {
+      LOG.error("Error creating user intermediate history done directory: [ "
+          + doneDirPrefixPath + "]", e);
+      throw new YarnException(e);
+    }
+
+    // Maximum number of unflushed completion-events that can stay in the queue
+    // before flush kicks in.
+    maxUnflushedCompletionEvents =
+        conf.getInt(MRJobConfig.MR_AM_HISTORY_MAX_UNFLUSHED_COMPLETE_EVENTS,
+            MRJobConfig.DEFAULT_MR_AM_HISTORY_MAX_UNFLUSHED_COMPLETE_EVENTS);
+    // We want to cut down flushes after job completes so as to write quicker,
+    // so we increase maxUnflushedEvents post Job completion by using the
+    // following multiplier.
+    postJobCompletionMultiplier =
+        conf.getInt(
+            MRJobConfig.MR_AM_HISTORY_JOB_COMPLETE_UNFLUSHED_MULTIPLIER,
+            MRJobConfig.DEFAULT_MR_AM_HISTORY_JOB_COMPLETE_UNFLUSHED_MULTIPLIER);
+    // Max time until which flush doesn't take place.
+    flushTimeout =
+        conf.getLong(MRJobConfig.MR_AM_HISTORY_COMPLETE_EVENT_FLUSH_TIMEOUT_MS,
+            MRJobConfig.DEFAULT_MR_AM_HISTORY_COMPLETE_EVENT_FLUSH_TIMEOUT_MS);
+    minQueueSizeForBatchingFlushes =
+        conf.getInt(
+            MRJobConfig.MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD,
+            MRJobConfig.DEFAULT_MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD);
+    
+    super.init(conf);
+  }
+
+  private void mkdir(FileSystem fs, Path path, FsPermission fsp)
+      throws IOException {
+    if (!fs.exists(path)) {
+      try {
+        fs.mkdirs(path, fsp);
+        FileStatus fsStatus = fs.getFileStatus(path);
+        LOG.info("Perms after creating " + fsStatus.getPermission().toShort()
+            + ", Expected: " + fsp.toShort());
+        if (fsStatus.getPermission().toShort() != fsp.toShort()) {
+          LOG.info("Explicitly setting permissions to : " + fsp.toShort()
+              + ", " + fsp);
+          fs.setPermission(path, fsp);
+        }
+      } catch (FileAlreadyExistsException e) {
+        LOG.info("Directory: [" + path + "] already exists.");
+      }
+    }
+  }
+
+  @Override
+  public void start() {
+    eventHandlingThread = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        JobHistoryEvent event = null;
+        while (!stopped && !Thread.currentThread().isInterrupted()) {
+
+          // Log the size of the history-event-queue every so often.
+          if (eventCounter != 0 && eventCounter % 1000 == 0) {
+            eventCounter = 0;
+            LOG.info("Size of the JobHistory event queue is "
+                + eventQueue.size());
+          } else {
+            eventCounter++;
+          }
+
+          try {
+            event = eventQueue.take();
+          } catch (InterruptedException e) {
+            LOG.info("EventQueue take interrupted. Returning");
+            return;
+          }
+          // If an event has been removed from the queue. Handle it.
+          // The rest of the queue is handled via stop()
+          // Clear the interrupt status if it's set before calling handleEvent
+          // and set it if it was set before calling handleEvent. 
+          // Interrupts received from other threads during handleEvent cannot be
+          // dealth with - Shell.runCommand() ignores them.
+          synchronized (lock) {
+            boolean isInterrupted = Thread.interrupted();
+            handleEvent(event);
+            if (isInterrupted) {
+              Thread.currentThread().interrupt();
+            }
+          }
+        }
+      }
+    });
+    eventHandlingThread.start();
+    super.start();
+  }
+
+  @Override
+  public void stop() {
+    LOG.info("Stopping JobHistoryEventHandler. "
+        + "Size of the outstanding queue size is " + eventQueue.size());
+    stopped = true;
+    //do not interrupt while event handling is in progress
+    synchronized(lock) {
+      if (eventHandlingThread != null)
+        eventHandlingThread.interrupt();
+    }
+
+    try {
+      if (eventHandlingThread != null)
+        eventHandlingThread.join();
+    } catch (InterruptedException ie) {
+      LOG.info("Interruped Exception while stopping", ie);
+    }
+
+    // Cancel all timers - so that they aren't invoked during or after
+    // the metaInfo object is wrapped up.
+    for (MetaInfo mi : fileMap.values()) {
+      try {
+        mi.shutDownTimer();
+      } catch (IOException e) {
+        LOG.info("Exception while cancelling delayed flush timer. "
+            + "Likely caused by a failed flush " + e.getMessage());
+      }
+    }
+
+    //write all the events remaining in queue
+    Iterator<JobHistoryEvent> it = eventQueue.iterator();
+    while(it.hasNext()) {
+      JobHistoryEvent ev = it.next();
+      LOG.info("In stop, writing event " + ev.getType());
+      handleEvent(ev);
+    }
+
+    // Process JobUnsuccessfulCompletionEvent for jobIds which still haven't
+    // closed their event writers
+    Iterator<JobId> jobIt = fileMap.keySet().iterator();
+    if(isSignalled) {
+      while (jobIt.hasNext()) {
+        JobId toClose = jobIt.next();
+        MetaInfo mi = fileMap.get(toClose);
+        if(mi != null && mi.isWriterActive()) {
+          LOG.warn("Found jobId " + toClose
+            + " to have not been closed. Will close");
+          //Create a JobFinishEvent so that it is written to the job history
+          JobUnsuccessfulCompletionEvent jucEvent =
+            new JobUnsuccessfulCompletionEvent(TypeConverter.fromYarn(toClose),
+              System.currentTimeMillis(), context.getJob(toClose)
+              .getCompletedMaps(), context.getJob(toClose).getCompletedReduces(),
+              JobState.KILLED.toString());
+          JobHistoryEvent jfEvent = new JobHistoryEvent(toClose, jucEvent);
+          //Bypass the queue mechanism which might wait. Call the method directly
+          handleEvent(jfEvent);
+        }
+      }
+    }
+
+    //close all file handles
+    for (MetaInfo mi : fileMap.values()) {
+      try {
+        mi.closeWriter();
+      } catch (IOException e) {
+        LOG.info("Exception while closing file " + e.getMessage());
+      }
+    }
+    LOG.info("Stopped JobHistoryEventHandler. super.stop()");
+    super.stop();
+  }
+
+  protected EventWriter createEventWriter(Path historyFilePath)
+      throws IOException {
+    FSDataOutputStream out = stagingDirFS.create(historyFilePath, true);
+    return new EventWriter(out);
+  }
+  
+  /**
+   * Create an event writer for the Job represented by the jobID.
+   * Writes out the job configuration to the log directory.
+   * This should be the first call to history for a job
+   * 
+   * @param jobId the jobId.
+   * @throws IOException
+   */
+  protected void setupEventWriter(JobId jobId)
+      throws IOException {
+    if (stagingDirPath == null) {
+      LOG.error("Log Directory is null, returning");
+      throw new IOException("Missing Log Directory for History");
+    }
+
+    MetaInfo oldFi = fileMap.get(jobId);
+    Configuration conf = getConfig();
+
+    // TODO Ideally this should be written out to the job dir
+    // (.staging/jobid/files - RecoveryService will need to be patched)
+    Path historyFile = JobHistoryUtils.getStagingJobHistoryFile(
+        stagingDirPath, jobId, startCount);
+    String user = UserGroupInformation.getCurrentUser().getShortUserName();
+    if (user == null) {
+      throw new IOException(
+          "User is null while setting up jobhistory eventwriter");
+    }
+
+    String jobName = context.getJob(jobId).getName();
+    EventWriter writer = (oldFi == null) ? null : oldFi.writer;
+ 
+    Path logDirConfPath =
+        JobHistoryUtils.getStagingConfFile(stagingDirPath, jobId, startCount);
+    if (writer == null) {
+      try {
+        writer = createEventWriter(historyFile);
+        LOG.info("Event Writer setup for JobId: " + jobId + ", File: "
+            + historyFile);
+      } catch (IOException ioe) {
+        LOG.info("Could not create log file: [" + historyFile + "] + for job "
+            + "[" + jobName + "]");
+        throw ioe;
+      }
+      
+      //Write out conf only if the writer isn't already setup.
+      if (conf != null) {
+        // TODO Ideally this should be written out to the job dir
+        // (.staging/jobid/files - RecoveryService will need to be patched)
+        FSDataOutputStream jobFileOut = null;
+        try {
+          if (logDirConfPath != null) {
+            jobFileOut = stagingDirFS.create(logDirConfPath, true);
+            conf.writeXml(jobFileOut);
+            jobFileOut.close();
+          }
+        } catch (IOException e) {
+          LOG.info("Failed to write the job configuration file", e);
+          throw e;
+        }
+      }
+    }
+
+    MetaInfo fi = new MetaInfo(historyFile, logDirConfPath, writer,
+        user, jobName, jobId);
+    fi.getJobSummary().setJobId(jobId);
+    fileMap.put(jobId, fi);
+  }
+
+  /** Close the event writer for this id 
+   * @throws IOException */
+  public void closeWriter(JobId id) throws IOException {
+    try {
+      final MetaInfo mi = fileMap.get(id);
+      if (mi != null) {
+        mi.closeWriter();
+      }
+      
+    } catch (IOException e) {
+      LOG.error("Error closing writer for JobID: " + id);
+      throw e;
+    }
+  }
+
+  @Override
+  public void handle(JobHistoryEvent event) {
+    try {
+      if (isJobCompletionEvent(event.getHistoryEvent())) {
+        // When the job is complete, flush slower but write faster.
+        maxUnflushedCompletionEvents =
+            maxUnflushedCompletionEvents * postJobCompletionMultiplier;
+      }
+
+      eventQueue.put(event);
+    } catch (InterruptedException e) {
+      throw new YarnException(e);
+    }
+  }
+
+  private boolean isJobCompletionEvent(HistoryEvent historyEvent) {
+    if (EnumSet.of(EventType.JOB_FINISHED, EventType.JOB_FAILED,
+        EventType.JOB_KILLED).contains(historyEvent.getEventType())) {
+      return true;
+    }
+    return false;
+  }
+
+  protected void handleEvent(JobHistoryEvent event) {
+    synchronized (lock) {
+
+      // If this is JobSubmitted Event, setup the writer
+      if (event.getHistoryEvent().getEventType() == EventType.AM_STARTED) {
+        try {
+          setupEventWriter(event.getJobID());
+        } catch (IOException ioe) {
+          LOG.error("Error JobHistoryEventHandler in handleEvent: " + event,
+              ioe);
+          throw new YarnException(ioe);
+        }
+      }
+
+      // For all events
+      // (1) Write it out
+      // (2) Process it for JobSummary
+      MetaInfo mi = fileMap.get(event.getJobID());
+      try {
+        HistoryEvent historyEvent = event.getHistoryEvent();
+        if (! (historyEvent instanceof NormalizedResourceEvent)) {
+          mi.writeEvent(historyEvent);
+        }
+        processEventForJobSummary(event.getHistoryEvent(), mi.getJobSummary(),
+            event.getJobID());
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("In HistoryEventHandler "
+              + event.getHistoryEvent().getEventType());
+        }
+      } catch (IOException e) {
+        LOG.error("Error writing History Event: " + event.getHistoryEvent(),
+            e);
+        throw new YarnException(e);
+      }
+
+      if (event.getHistoryEvent().getEventType() == EventType.JOB_SUBMITTED) {
+        JobSubmittedEvent jobSubmittedEvent =
+            (JobSubmittedEvent) event.getHistoryEvent();
+        mi.getJobIndexInfo().setSubmitTime(jobSubmittedEvent.getSubmitTime());
+        mi.getJobIndexInfo().setQueueName(jobSubmittedEvent.getJobQueueName());
+      }
+     
+      // If this is JobFinishedEvent, close the writer and setup the job-index
+      if (event.getHistoryEvent().getEventType() == EventType.JOB_FINISHED) {
+        try {
+          JobFinishedEvent jFinishedEvent =
+              (JobFinishedEvent) event.getHistoryEvent();
+          mi.getJobIndexInfo().setFinishTime(jFinishedEvent.getFinishTime());
+          mi.getJobIndexInfo().setNumMaps(jFinishedEvent.getFinishedMaps());
+          mi.getJobIndexInfo().setNumReduces(
+              jFinishedEvent.getFinishedReduces());
+          mi.getJobIndexInfo().setJobStatus(JobState.SUCCEEDED.toString());
+          closeEventWriter(event.getJobID());
+        } catch (IOException e) {
+          throw new YarnException(e);
+        }
+      }
+
+      if (event.getHistoryEvent().getEventType() == EventType.JOB_FAILED
+          || event.getHistoryEvent().getEventType() == EventType.JOB_KILLED) {
+        try {
+          JobUnsuccessfulCompletionEvent jucEvent = 
+              (JobUnsuccessfulCompletionEvent) event
+              .getHistoryEvent();
+          mi.getJobIndexInfo().setFinishTime(jucEvent.getFinishTime());
+          mi.getJobIndexInfo().setNumMaps(jucEvent.getFinishedMaps());
+          mi.getJobIndexInfo().setNumReduces(jucEvent.getFinishedReduces());
+          mi.getJobIndexInfo().setJobStatus(jucEvent.getStatus());
+          closeEventWriter(event.getJobID());
+        } catch (IOException e) {
+          throw new YarnException(e);
+        }
+      }
+    }
+  }
+
+  public void processEventForJobSummary(HistoryEvent event, JobSummary summary, 
+      JobId jobId) {
+    // context.getJob could be used for some of this info as well.
+    switch (event.getEventType()) {
+    case JOB_SUBMITTED:
+      JobSubmittedEvent jse = (JobSubmittedEvent) event;
+      summary.setUser(jse.getUserName());
+      summary.setQueue(jse.getJobQueueName());
+      summary.setJobSubmitTime(jse.getSubmitTime());
+      summary.setJobName(jse.getJobName());
+      break;
+    case NORMALIZED_RESOURCE:
+      NormalizedResourceEvent normalizedResourceEvent = 
+            (NormalizedResourceEvent) event;
+      if (normalizedResourceEvent.getTaskType() == TaskType.MAP) {
+        summary.setResourcesPerMap(normalizedResourceEvent.getMemory());
+      } else if (normalizedResourceEvent.getTaskType() == TaskType.REDUCE) {
+        summary.setResourcesPerReduce(normalizedResourceEvent.getMemory());
+      }
+      break;  
+    case JOB_INITED:
+      JobInitedEvent jie = (JobInitedEvent) event;
+      summary.setJobLaunchTime(jie.getLaunchTime());
+      break;
+    case MAP_ATTEMPT_STARTED:
+      TaskAttemptStartedEvent mtase = (TaskAttemptStartedEvent) event;
+      if (summary.getFirstMapTaskLaunchTime() == 0)
+        summary.setFirstMapTaskLaunchTime(mtase.getStartTime());
+      break;
+    case REDUCE_ATTEMPT_STARTED:
+      TaskAttemptStartedEvent rtase = (TaskAttemptStartedEvent) event;
+      if (summary.getFirstReduceTaskLaunchTime() == 0)
+        summary.setFirstReduceTaskLaunchTime(rtase.getStartTime());
+      break;
+    case JOB_FINISHED:
+      JobFinishedEvent jfe = (JobFinishedEvent) event;
+      summary.setJobFinishTime(jfe.getFinishTime());
+      summary.setNumFinishedMaps(jfe.getFinishedMaps());
+      summary.setNumFailedMaps(jfe.getFailedMaps());
+      summary.setNumFinishedReduces(jfe.getFinishedReduces());
+      summary.setNumFailedReduces(jfe.getFailedReduces());
+      if (summary.getJobStatus() == null)
+        summary
+            .setJobStatus(org.apache.hadoop.mapreduce.JobStatus.State.SUCCEEDED
+                .toString());
+      // TODO JOB_FINISHED does not have state. Effectively job history does not
+      // have state about the finished job.
+      setSummarySlotSeconds(summary, jfe.getTotalCounters());
+      break;
+    case JOB_FAILED:
+    case JOB_KILLED:
+      JobUnsuccessfulCompletionEvent juce = (JobUnsuccessfulCompletionEvent) event;
+      summary.setJobStatus(juce.getStatus());
+      summary.setNumFinishedMaps(context.getJob(jobId).getTotalMaps());
+      summary.setNumFinishedReduces(context.getJob(jobId).getTotalReduces());
+      summary.setJobFinishTime(juce.getFinishTime());
+      setSummarySlotSeconds(summary, context.getJob(jobId).getAllCounters());
+      break;
+    }
+  }
+
+  private void setSummarySlotSeconds(JobSummary summary, Counters allCounters) {
+
+    Counter slotMillisMapCounter = allCounters
+      .findCounter(JobCounter.SLOTS_MILLIS_MAPS);
+    if (slotMillisMapCounter != null) {
+      summary.setMapSlotSeconds(slotMillisMapCounter.getValue() / 1000);
+    }
+
+    Counter slotMillisReduceCounter = allCounters
+      .findCounter(JobCounter.SLOTS_MILLIS_REDUCES);
+    if (slotMillisReduceCounter != null) {
+      summary.setReduceSlotSeconds(slotMillisReduceCounter.getValue() / 1000);
+    }
+  }
+
+  protected void closeEventWriter(JobId jobId) throws IOException {
+
+    final MetaInfo mi = fileMap.get(jobId);
+    if (mi == null) {
+      throw new IOException("No MetaInfo found for JobId: [" + jobId + "]");
+    }
+
+    if (!mi.isWriterActive()) {
+      throw new IOException(
+          "Inactive Writer: Likely received multiple JobFinished / " +
+          "JobUnsuccessful events for JobId: ["
+              + jobId + "]");
+    }
+
+    // Close the Writer
+    try {
+      mi.closeWriter();
+    } catch (IOException e) {
+      LOG.error("Error closing writer for JobID: " + jobId);
+      throw e;
+    }
+     
+    if (mi.getHistoryFile() == null) {
+      LOG.warn("No file for job-history with " + jobId + " found in cache!");
+    }
+    if (mi.getConfFile() == null) {
+      LOG.warn("No file for jobconf with " + jobId + " found in cache!");
+    }
+      
+    // Writing out the summary file.
+    // TODO JH enhancement - reuse this file to store additional indexing info
+    // like ACLs, etc. JHServer can use HDFS append to build an index file
+    // with more info than is available via the filename.
+    Path qualifiedSummaryDoneFile = null;
+    FSDataOutputStream summaryFileOut = null;
+    try {
+      String doneSummaryFileName = getTempFileName(JobHistoryUtils
+          .getIntermediateSummaryFileName(jobId));
+      qualifiedSummaryDoneFile = doneDirFS.makeQualified(new Path(
+          doneDirPrefixPath, doneSummaryFileName));
+      summaryFileOut = doneDirFS.create(qualifiedSummaryDoneFile, true);
+      summaryFileOut.writeUTF(mi.getJobSummary().getJobSummaryString());
+      summaryFileOut.close();
+    } catch (IOException e) {
+      LOG.info("Unable to write out JobSummaryInfo to ["
+          + qualifiedSummaryDoneFile + "]", e);
+      throw e;
+    }
+
+    try {
+
+      // Move historyFile to Done Folder.
+      Path qualifiedDoneFile = null;
+      if (mi.getHistoryFile() != null) {
+        Path historyFile = mi.getHistoryFile();
+        Path qualifiedLogFile = stagingDirFS.makeQualified(historyFile);
+        String doneJobHistoryFileName =
+            getTempFileName(FileNameIndexUtils.getDoneFileName(mi
+                .getJobIndexInfo()));
+        qualifiedDoneFile =
+            doneDirFS.makeQualified(new Path(doneDirPrefixPath,
+                doneJobHistoryFileName));
+        moveToDoneNow(qualifiedLogFile, qualifiedDoneFile);
+      }
+
+      // Move confFile to Done Folder
+      Path qualifiedConfDoneFile = null;
+      if (mi.getConfFile() != null) {
+        Path confFile = mi.getConfFile();
+        Path qualifiedConfFile = stagingDirFS.makeQualified(confFile);
+        String doneConfFileName =
+            getTempFileName(JobHistoryUtils
+                .getIntermediateConfFileName(jobId));
+        qualifiedConfDoneFile =
+            doneDirFS.makeQualified(new Path(doneDirPrefixPath,
+                doneConfFileName));
+        moveToDoneNow(qualifiedConfFile, qualifiedConfDoneFile);
+      }
+      
+      moveTmpToDone(qualifiedSummaryDoneFile);
+      moveTmpToDone(qualifiedConfDoneFile);
+      moveTmpToDone(qualifiedDoneFile);
+
+    } catch (IOException e) {
+      LOG.error("Error closing writer for JobID: " + jobId);
+      throw e;
+    }
+  }
+
+  private class FlushTimerTask extends TimerTask {
+    private MetaInfo metaInfo;
+    private IOException ioe = null;
+    private volatile boolean shouldRun = true;
+
+    FlushTimerTask(MetaInfo metaInfo) {
+      this.metaInfo = metaInfo;
+    }
+
+    @Override
+    public void run() {
+      synchronized (lock) {
+        try {
+          if (!metaInfo.isTimerShutDown() && shouldRun)
+            metaInfo.flush();
+        } catch (IOException e) {
+          ioe = e;
+        }
+      }
+    }
+
+    public IOException getException() {
+      return ioe;
+    }
+
+    public void stop() {
+      shouldRun = false;
+      this.cancel();
+    }
+  }
+
+  protected class MetaInfo {
+    private Path historyFile;
+    private Path confFile;
+    private EventWriter writer;
+    JobIndexInfo jobIndexInfo;
+    JobSummary jobSummary;
+    Timer flushTimer; 
+    FlushTimerTask flushTimerTask;
+    private boolean isTimerShutDown = false;
+
+    MetaInfo(Path historyFile, Path conf, EventWriter writer, String user,
+        String jobName, JobId jobId) {
+      this.historyFile = historyFile;
+      this.confFile = conf;
+      this.writer = writer;
+      this.jobIndexInfo =
+          new JobIndexInfo(-1, -1, user, jobName, jobId, -1, -1, null);
+      this.jobSummary = new JobSummary();
+      this.flushTimer = new Timer("FlushTimer", true);
+    }
+
+    Path getHistoryFile() {
+      return historyFile;
+    }
+
+    Path getConfFile() {
+      return confFile;
+    }
+
+    JobIndexInfo getJobIndexInfo() {
+      return jobIndexInfo;
+    }
+
+    JobSummary getJobSummary() {
+      return jobSummary;
+    }
+
+    boolean isWriterActive() {
+      return writer != null;
+    }
+    
+    boolean isTimerShutDown() {
+      return isTimerShutDown;
+    }
+
+    void closeWriter() throws IOException {
+      synchronized (lock) {
+        if (writer != null) {
+          writer.close();
+        }
+        writer = null;
+      }
+    }
+
+    void writeEvent(HistoryEvent event) throws IOException {
+      synchronized (lock) {
+        if (writer != null) {
+          writer.write(event);
+          processEventForFlush(event);
+          maybeFlush(event);
+        }
+      }
+    }
+
+    void processEventForFlush(HistoryEvent historyEvent) throws IOException {
+      if (EnumSet.of(EventType.MAP_ATTEMPT_FINISHED,
+          EventType.MAP_ATTEMPT_FAILED, EventType.MAP_ATTEMPT_KILLED,
+          EventType.REDUCE_ATTEMPT_FINISHED, EventType.REDUCE_ATTEMPT_FAILED,
+          EventType.REDUCE_ATTEMPT_KILLED, EventType.TASK_FINISHED,
+          EventType.TASK_FAILED, EventType.JOB_FINISHED, EventType.JOB_FAILED,
+          EventType.JOB_KILLED).contains(historyEvent.getEventType())) {
+        numUnflushedCompletionEvents++;
+        if (!isTimerActive) {
+          resetFlushTimer();
+          if (!isTimerShutDown) {
+            flushTimerTask = new FlushTimerTask(this);
+            flushTimer.schedule(flushTimerTask, flushTimeout);
+          }
+        }
+      }
+    }
+
+    void resetFlushTimer() throws IOException {
+      if (flushTimerTask != null) {
+        IOException exception = flushTimerTask.getException();
+        flushTimerTask.stop();
+        if (exception != null) {
+          throw exception;
+        }
+        flushTimerTask = null;
+      }
+      isTimerActive = false;
+    }
+
+    void maybeFlush(HistoryEvent historyEvent) throws IOException {
+      if ((eventQueue.size() < minQueueSizeForBatchingFlushes 
+          && numUnflushedCompletionEvents > 0)
+          || numUnflushedCompletionEvents >= maxUnflushedCompletionEvents 
+          || isJobCompletionEvent(historyEvent)) {
+        this.flush();
+      }
+    }
+
+    void flush() throws IOException {
+      synchronized (lock) {
+        if (numUnflushedCompletionEvents != 0) { // skipped timer cancel.
+          writer.flush();
+          numUnflushedCompletionEvents = 0;
+          resetFlushTimer();
+        }
+      }
+    }
+
+    void shutDownTimer() throws IOException {
+      synchronized (lock) {
+        isTimerShutDown = true;
+        flushTimer.cancel();
+        if (flushTimerTask != null && flushTimerTask.getException() != null) {
+          throw flushTimerTask.getException();
+        }
+      }
+    }
+  }
+
+  private void moveTmpToDone(Path tmpPath) throws IOException {
+    if (tmpPath != null) {
+      String tmpFileName = tmpPath.getName();
+      String fileName = getFileNameFromTmpFN(tmpFileName);
+      Path path = new Path(tmpPath.getParent(), fileName);
+      doneDirFS.rename(tmpPath, path);
+      LOG.info("Moved tmp to done: " + tmpPath + " to " + path);
+    }
+  }
+  
+  // TODO If the FS objects are the same, this should be a rename instead of a
+  // copy.
+  private void moveToDoneNow(Path fromPath, Path toPath) throws IOException {
+    // check if path exists, in case of retries it may not exist
+    if (stagingDirFS.exists(fromPath)) {
+      LOG.info("Moving " + fromPath.toString() + " to " + toPath.toString());
+      // TODO temporarily removing the existing dst
+      if (doneDirFS.exists(toPath)) {
+        doneDirFS.delete(toPath, true);
+      }
+      boolean copied = FileUtil.copy(stagingDirFS, fromPath, doneDirFS, toPath,
+          false, getConfig());
+
+      if (copied)
+        LOG.info("Copied to done location: " + toPath);
+      else 
+          LOG.info("copy failed");
+      doneDirFS.setPermission(toPath, new FsPermission(
+          JobHistoryUtils.HISTORY_INTERMEDIATE_FILE_PERMISSIONS));
+      
+      stagingDirFS.delete(fromPath, false);
+    }
+    }
+
+  boolean pathExists(FileSystem fileSys, Path path) throws IOException {
+    return fileSys.exists(path);
+  }
+
+  private String getTempFileName(String srcFile) {
+    return srcFile + "_tmp";
+  }
+  
+  private String getFileNameFromTmpFN(String tmpFileName) {
+    //TODO. Some error checking here.
+    return tmpFileName.substring(0, tmpFileName.length()-4);
+  }
+
+  public void setSignalled(boolean isSignalled) {
+    this.isSignalled = isSignalled;
+    LOG.info("JobHistoryEventHandler notified that isSignalled was "
+      + isSignalled);
+  }
+}

Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobSummary.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobSummary.java?rev=1376283&view=auto
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobSummary.java (added)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobSummary.java Wed Aug 22 22:11:39 2012
@@ -0,0 +1,254 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.jobhistory;
+
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.util.StringUtils;
+
+public class JobSummary {
+  private JobId jobId;
+  private long jobSubmitTime;
+  private long jobLaunchTime;
+  private long firstMapTaskLaunchTime; // MapAttempteStarted |
+                                       // TaskAttemptStartEvent
+  private long firstReduceTaskLaunchTime; // ReduceAttemptStarted |
+                                          // TaskAttemptStartEvent
+  private long jobFinishTime;
+  private int numFinishedMaps;
+  private int numFailedMaps;
+  private int numFinishedReduces;
+  private int numFailedReduces;
+  private int resourcesPerMap; // resources used per map/min resource
+  private int resourcesPerReduce; // resources used per reduce/min resource
+  // resource models
+  // private int numSlotsPerReduce; | Doesn't make sense with potentially
+  // different resource models
+  private String user;
+  private String queue;
+  private String jobStatus;
+  private long mapSlotSeconds; // TODO Not generated yet in MRV2
+  private long reduceSlotSeconds; // TODO Not generated yet MRV2
+  // private int clusterSlotCapacity;
+  private String jobName;
+
+  JobSummary() {
+  }
+
+  public JobId getJobId() {
+    return jobId;
+  }
+
+  public void setJobId(JobId jobId) {
+    this.jobId = jobId;
+  }
+
+  public long getJobSubmitTime() {
+    return jobSubmitTime;
+  }
+
+  public void setJobSubmitTime(long jobSubmitTime) {
+    this.jobSubmitTime = jobSubmitTime;
+  }
+
+  public long getJobLaunchTime() {
+    return jobLaunchTime;
+  }
+
+  public void setJobLaunchTime(long jobLaunchTime) {
+    this.jobLaunchTime = jobLaunchTime;
+  }
+
+  public long getFirstMapTaskLaunchTime() {
+    return firstMapTaskLaunchTime;
+  }
+
+  public void setFirstMapTaskLaunchTime(long firstMapTaskLaunchTime) {
+    this.firstMapTaskLaunchTime = firstMapTaskLaunchTime;
+  }
+
+  public long getFirstReduceTaskLaunchTime() {
+    return firstReduceTaskLaunchTime;
+  }
+
+  public void setFirstReduceTaskLaunchTime(long firstReduceTaskLaunchTime) {
+    this.firstReduceTaskLaunchTime = firstReduceTaskLaunchTime;
+  }
+
+  public long getJobFinishTime() {
+    return jobFinishTime;
+  }
+
+  public void setJobFinishTime(long jobFinishTime) {
+    this.jobFinishTime = jobFinishTime;
+  }
+
+  public int getNumFinishedMaps() {
+    return numFinishedMaps;
+  }
+
+  public void setNumFinishedMaps(int numFinishedMaps) {
+    this.numFinishedMaps = numFinishedMaps;
+  }
+
+  public int getNumFailedMaps() {
+    return numFailedMaps;
+  }
+
+  public void setNumFailedMaps(int numFailedMaps) {
+    this.numFailedMaps = numFailedMaps;
+  }
+
+  public int getResourcesPerMap() {
+    return resourcesPerMap;
+  }
+  
+  public void setResourcesPerMap(int resourcesPerMap) {
+    this.resourcesPerMap = resourcesPerMap;
+  }
+  
+  public int getNumFinishedReduces() {
+    return numFinishedReduces;
+  }
+
+  public void setNumFinishedReduces(int numFinishedReduces) {
+    this.numFinishedReduces = numFinishedReduces;
+  }
+
+  public int getNumFailedReduces() {
+    return numFailedReduces;
+  }
+
+  public void setNumFailedReduces(int numFailedReduces) {
+    this.numFailedReduces = numFailedReduces;
+  }
+
+  public int getResourcesPerReduce() {
+    return this.resourcesPerReduce;
+  }
+  
+  public void setResourcesPerReduce(int resourcesPerReduce) {
+    this.resourcesPerReduce = resourcesPerReduce;
+  }
+  
+  public String getUser() {
+    return user;
+  }
+
+  public void setUser(String user) {
+    this.user = user;
+  }
+
+  public String getQueue() {
+    return queue;
+  }
+
+  public void setQueue(String queue) {
+    this.queue = queue;
+  }
+
+  public String getJobStatus() {
+    return jobStatus;
+  }
+
+  public void setJobStatus(String jobStatus) {
+    this.jobStatus = jobStatus;
+  }
+
+  public long getMapSlotSeconds() {
+    return mapSlotSeconds;
+  }
+
+  public void setMapSlotSeconds(long mapSlotSeconds) {
+    this.mapSlotSeconds = mapSlotSeconds;
+  }
+
+  public long getReduceSlotSeconds() {
+    return reduceSlotSeconds;
+  }
+
+  public void setReduceSlotSeconds(long reduceSlotSeconds) {
+    this.reduceSlotSeconds = reduceSlotSeconds;
+  }
+
+  public String getJobName() {
+    return jobName;
+  }
+
+  public void setJobName(String jobName) {
+    this.jobName = jobName;
+  }
+
+  public String getJobSummaryString() {
+    SummaryBuilder summary = new SummaryBuilder()
+      .add("jobId", jobId)
+      .add("submitTime", jobSubmitTime)
+      .add("launchTime", jobLaunchTime)
+      .add("firstMapTaskLaunchTime", firstMapTaskLaunchTime)
+      .add("firstReduceTaskLaunchTime", firstReduceTaskLaunchTime)
+      .add("finishTime", jobFinishTime)
+      .add("resourcesPerMap", resourcesPerMap)
+      .add("resourcesPerReduce", resourcesPerReduce)
+      .add("numMaps", numFinishedMaps + numFailedMaps)
+      .add("numReduces", numFinishedReduces + numFailedReduces)
+      .add("user", user)
+      .add("queue", queue)
+      .add("status", jobStatus)
+      .add("mapSlotSeconds", mapSlotSeconds)
+      .add("reduceSlotSeconds", reduceSlotSeconds)
+      .add("jobName", jobName);
+    return summary.toString();
+  }
+
+  static final char EQUALS = '=';
+  static final char[] charsToEscape = { StringUtils.COMMA, EQUALS,
+      StringUtils.ESCAPE_CHAR };
+  
+  static class SummaryBuilder {
+    final StringBuilder buffer = new StringBuilder();
+
+    // A little optimization for a very common case
+    SummaryBuilder add(String key, long value) {
+      return _add(key, Long.toString(value));
+    }
+
+    <T> SummaryBuilder add(String key, T value) {
+      return _add(key, StringUtils.escapeString(String.valueOf(value),
+          StringUtils.ESCAPE_CHAR, charsToEscape));
+    }
+
+    SummaryBuilder add(SummaryBuilder summary) {
+      if (buffer.length() > 0)
+        buffer.append(StringUtils.COMMA);
+      buffer.append(summary.buffer);
+      return this;
+    }
+
+    SummaryBuilder _add(String key, String value) {
+      if (buffer.length() > 0)
+        buffer.append(StringUtils.COMMA);
+      buffer.append(key).append(EQUALS).append(value);
+      return this;
+    }
+
+    @Override
+    public String toString() {
+      return buffer.toString();
+    }
+  }
+}

Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/AppContext.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/AppContext.java?rev=1376283&view=auto
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/AppContext.java (added)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/AppContext.java Wed Aug 22 22:11:39 2012
@@ -0,0 +1,71 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app2;
+
+import java.util.Map;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.app2.job.Job;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainer;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerMap;
+import org.apache.hadoop.mapreduce.v2.app2.rm.node.AMNode;
+import org.apache.hadoop.mapreduce.v2.app2.rm.node.AMNodeMap;
+import org.apache.hadoop.yarn.Clock;
+import org.apache.hadoop.yarn.ClusterInfo;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.event.EventHandler;
+
+
+/**
+ * Context interface for sharing information across components in YARN App.
+ */
+@InterfaceAudience.Private
+public interface AppContext {
+
+  ApplicationId getApplicationID();
+
+  ApplicationAttemptId getApplicationAttemptId();
+
+  String getApplicationName();
+
+  long getStartTime();
+
+  CharSequence getUser();
+
+  Job getJob(JobId jobID);
+
+  Map<JobId, Job> getAllJobs();
+
+  @SuppressWarnings("rawtypes")
+  EventHandler getEventHandler();
+
+  Clock getClock();
+  
+  ClusterInfo getClusterInfo();
+  
+  AMContainer getContainer(ContainerId containerId);
+  AMContainerMap getAllContainers();
+  
+  AMNode getNode(NodeId nodeId);
+  AMNodeMap getAllNodes();
+}

Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/ControlledClock.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/ControlledClock.java?rev=1376283&view=auto
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/ControlledClock.java (added)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/ControlledClock.java Wed Aug 22 22:11:39 2012
@@ -0,0 +1,43 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.hadoop.mapreduce.v2.app2;
+
+import org.apache.hadoop.yarn.Clock;
+
+public class ControlledClock implements Clock {
+  private long time = -1;
+  private final Clock actualClock;
+  public ControlledClock(Clock actualClock) {
+    this.actualClock = actualClock;
+  }
+  public synchronized void setTime(long time) {
+    this.time = time;
+  }
+  public synchronized void reset() {
+    time = -1;
+  }
+
+  @Override
+  public synchronized long getTime() {
+    if (time != -1) {
+      return time;
+    }
+    return actualClock.getTime();
+  }
+
+}

Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/JobEndNotifier.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/JobEndNotifier.java?rev=1376283&view=auto
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/JobEndNotifier.java (added)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/JobEndNotifier.java Wed Aug 22 22:11:39 2012
@@ -0,0 +1,187 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app2;
+
+import java.io.IOException;
+import java.net.HttpURLConnection;
+import java.net.InetSocketAddress;
+import java.net.MalformedURLException;
+import java.net.Proxy;
+import java.net.URL;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
+import org.mortbay.log.Log;
+
+/**
+ * <p>This class handles job end notification. Submitters of jobs can choose to
+ * be notified of the end of a job by supplying a URL to which a connection
+ * will be established.
+ * <ul><li> The URL connection is fire and forget by default.</li> <li>
+ * User can specify number of retry attempts and a time interval at which to
+ * attempt retries</li><li>
+ * Cluster administrators can set final parameters to set maximum number of
+ * tries (0 would disable job end notification) and max time interval and a
+ * proxy if needed</li><li>
+ * The URL may contain sentinels which will be replaced by jobId and jobStatus 
+ * (eg. SUCCEEDED/KILLED/FAILED) </li> </ul>
+ * </p>
+ */
+public class JobEndNotifier implements Configurable {
+  private static final String JOB_ID = "$jobId";
+  private static final String JOB_STATUS = "$jobStatus";
+
+  private Configuration conf;
+  protected String userUrl;
+  protected String proxyConf;
+  protected int numTries; //Number of tries to attempt notification
+  protected int waitInterval; //Time to wait between retrying notification
+  protected URL urlToNotify; //URL to notify read from the config
+  protected Proxy proxyToUse = Proxy.NO_PROXY; //Proxy to use for notification
+
+  /**
+   * Parse the URL that needs to be notified of the end of the job, along
+   * with the number of retries in case of failure, the amount of time to
+   * wait between retries and proxy settings
+   * @param conf the configuration 
+   */
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+    
+    numTries = Math.min(
+      conf.getInt(MRJobConfig.MR_JOB_END_RETRY_ATTEMPTS, 0) + 1
+      , conf.getInt(MRJobConfig.MR_JOB_END_NOTIFICATION_MAX_ATTEMPTS, 1)
+    );
+    waitInterval = Math.min(
+    conf.getInt(MRJobConfig.MR_JOB_END_RETRY_INTERVAL, 5)
+    , conf.getInt(MRJobConfig.MR_JOB_END_NOTIFICATION_MAX_RETRY_INTERVAL, 5)
+    );
+    waitInterval = (waitInterval < 0) ? 5 : waitInterval;
+
+    userUrl = conf.get(MRJobConfig.MR_JOB_END_NOTIFICATION_URL);
+
+    proxyConf = conf.get(MRJobConfig.MR_JOB_END_NOTIFICATION_PROXY);
+
+    //Configure the proxy to use if its set. It should be set like
+    //proxyType@proxyHostname:port
+    if(proxyConf != null && !proxyConf.equals("") &&
+         proxyConf.lastIndexOf(":") != -1) {
+      int typeIndex = proxyConf.indexOf("@");
+      Proxy.Type proxyType = Proxy.Type.HTTP;
+      if(typeIndex != -1 &&
+        proxyConf.substring(0, typeIndex).compareToIgnoreCase("socks") == 0) {
+        proxyType = Proxy.Type.SOCKS;
+      }
+      String hostname = proxyConf.substring(typeIndex + 1,
+        proxyConf.lastIndexOf(":"));
+      String portConf = proxyConf.substring(proxyConf.lastIndexOf(":") + 1);
+      try {
+        int port = Integer.parseInt(portConf);
+        proxyToUse = new Proxy(proxyType,
+          new InetSocketAddress(hostname, port));
+        Log.info("Job end notification using proxy type \"" + proxyType + 
+        "\" hostname \"" + hostname + "\" and port \"" + port + "\"");
+      } catch(NumberFormatException nfe) {
+        Log.warn("Job end notification couldn't parse configured proxy's port "
+          + portConf + ". Not going to use a proxy");
+      }
+    }
+
+  }
+
+  public Configuration getConf() {
+    return conf;
+  }
+  
+  /**
+   * Notify the URL just once. Use best effort. Timeout hard coded to 5
+   * seconds.
+   */
+  protected boolean notifyURLOnce() {
+    boolean success = false;
+    try {
+      Log.info("Job end notification trying " + urlToNotify);
+      HttpURLConnection conn =
+        (HttpURLConnection) urlToNotify.openConnection(proxyToUse);
+      conn.setConnectTimeout(5*1000);
+      conn.setReadTimeout(5*1000);
+      conn.setAllowUserInteraction(false);
+      if(conn.getResponseCode() != HttpURLConnection.HTTP_OK) {
+        Log.warn("Job end notification to " + urlToNotify +" failed with code: "
+        + conn.getResponseCode() + " and message \"" + conn.getResponseMessage()
+        +"\"");
+      }
+      else {
+        success = true;
+        Log.info("Job end notification to " + urlToNotify + " succeeded");
+      }
+    } catch(IOException ioe) {
+      Log.warn("Job end notification to " + urlToNotify + " failed", ioe);
+    }
+    return success;
+  }
+
+  /**
+   * Notify a server of the completion of a submitted job. The user must have
+   * configured MRJobConfig.MR_JOB_END_NOTIFICATION_URL
+   * @param jobReport JobReport used to read JobId and JobStatus
+   * @throws InterruptedException
+   */
+  public void notify(JobReport jobReport)
+    throws InterruptedException {
+    // Do we need job-end notification?
+    if (userUrl == null) {
+      Log.info("Job end notification URL not set, skipping.");
+      return;
+    }
+
+    //Do string replacements for jobId and jobStatus
+    if (userUrl.contains(JOB_ID)) {
+      userUrl = userUrl.replace(JOB_ID, jobReport.getJobId().toString());
+    }
+    if (userUrl.contains(JOB_STATUS)) {
+      userUrl = userUrl.replace(JOB_STATUS, jobReport.getJobState().toString());
+    }
+
+    // Create the URL, ensure sanity
+    try {
+      urlToNotify = new URL(userUrl);
+    } catch (MalformedURLException mue) {
+      Log.warn("Job end notification couldn't parse " + userUrl, mue);
+      return;
+    }
+
+    // Send notification
+    boolean success = false;
+    while (numTries-- > 0 && !success) {
+      Log.info("Job end notification attempts left " + numTries);
+      success = notifyURLOnce();
+      if (!success) {
+        Thread.sleep(waitInterval);
+      }
+    }
+    if (!success) {
+      Log.warn("Job end notification failed to notify : " + urlToNotify);
+    } else {
+      Log.info("Job end notification succeeded for " + jobReport.getJobId());
+    }
+  }
+}



Mime
View raw message