tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acmur...@apache.org
Subject svn commit: r1457129 [18/38] - in /incubator/tez: ./ tez-ampool/ tez-ampool/src/ tez-ampool/src/main/ tez-ampool/src/main/bin/ tez-ampool/src/main/conf/ tez-ampool/src/main/java/ tez-ampool/src/main/java/org/ tez-ampool/src/main/java/org/apache/ tez-am...
Date Fri, 15 Mar 2013 21:26:48 GMT
Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapred/WrappedPeriodicStatsAccumulator.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapred/WrappedPeriodicStatsAccumulator.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapred/WrappedPeriodicStatsAccumulator.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapred/WrappedPeriodicStatsAccumulator.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+//Workaround for PeriodicStateAccumulator being package access
+public class WrappedPeriodicStatsAccumulator {
+
+  private PeriodicStatsAccumulator real;
+
+  public WrappedPeriodicStatsAccumulator(PeriodicStatsAccumulator real) {
+    this.real = real;
+  }
+  
+  public void extend(double newProgress, int newValue) {
+    real.extend(newProgress, newValue);
+  }
+}

Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapred/WrappedProgressSplitsBlock.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapred/WrappedProgressSplitsBlock.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapred/WrappedProgressSplitsBlock.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapred/WrappedProgressSplitsBlock.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+// Workaround for ProgressSplitBlock being package access
+public class WrappedProgressSplitsBlock extends ProgressSplitsBlock {
+  private WrappedPeriodicStatsAccumulator wrappedProgressWallclockTime;
+  private WrappedPeriodicStatsAccumulator wrappedProgressCPUTime;
+  private WrappedPeriodicStatsAccumulator wrappedProgressVirtualMemoryKbytes;
+  private WrappedPeriodicStatsAccumulator wrappedProgressPhysicalMemoryKbytes;
+
+  public WrappedProgressSplitsBlock(int numberSplits) {
+    super(numberSplits);
+  }
+
+  public int[][] burst() {
+    return super.burst();
+  }
+
+  public WrappedPeriodicStatsAccumulator getProgressWallclockTime() {
+    if (wrappedProgressWallclockTime == null) {
+      wrappedProgressWallclockTime = new WrappedPeriodicStatsAccumulator(
+          progressWallclockTime);
+    }
+    return wrappedProgressWallclockTime;
+  }
+
+  public WrappedPeriodicStatsAccumulator getProgressCPUTime() {
+    if (wrappedProgressCPUTime == null) {
+      wrappedProgressCPUTime = new WrappedPeriodicStatsAccumulator(
+          progressCPUTime);
+    }
+    return wrappedProgressCPUTime;
+  }
+
+  public WrappedPeriodicStatsAccumulator getProgressVirtualMemoryKbytes() {
+    if (wrappedProgressVirtualMemoryKbytes == null) {
+      wrappedProgressVirtualMemoryKbytes = new WrappedPeriodicStatsAccumulator(
+          progressVirtualMemoryKbytes);
+    }
+    return wrappedProgressVirtualMemoryKbytes;
+  }
+
+  public WrappedPeriodicStatsAccumulator getProgressPhysicalMemoryKbytes() {
+    if (wrappedProgressPhysicalMemoryKbytes == null) {
+      wrappedProgressPhysicalMemoryKbytes = new WrappedPeriodicStatsAccumulator(
+          progressPhysicalMemoryKbytes);
+    }
+    return wrappedProgressPhysicalMemoryKbytes;
+  }
+}

Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapred/YarnChild2.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapred/YarnChild2.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapred/YarnChild2.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapred/YarnChild2.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,399 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapred;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSError;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.filecache.DistributedCache;
+import org.apache.hadoop.mapreduce.security.TokenCache;
+import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
+import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.source.JvmMetrics;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.DiskChecker.DiskErrorException;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.log4j.LogManager;
+
+/**
+ * The main() for MapReduce task processes.
+ */
+class YarnChild2 {
+
+  private static final Log LOG = LogFactory.getLog(YarnChild2.class);
+
+  static volatile TaskAttemptID taskid = null;
+
+  /*
+   * Expected arguments args[0] - host, args[1] - port, args[2] - JobId, args[3]
+   * - TaskType, args[4] - jvm integer id, rest are log redirects etc.
+   */
+  public static void main(String[] args) throws Throwable {
+    LOG.info("Child starting");
+
+    final JobConf defaultConf = new JobConf();
+    defaultConf.addResource(MRJobConfig.JOB_CONF_FILE);
+    UserGroupInformation.setConfiguration(defaultConf);
+
+    String host = args[0];
+    int port = Integer.parseInt(args[1]);
+    final InetSocketAddress address =
+        NetUtils.createSocketAddrForHost(host, port);
+
+    final JobID jobID = JobID.forName(args[2]);
+    final TaskType taskType = TaskType.valueOf(args[3]);
+    
+    final int jvmIdInt = Integer.parseInt(args[4]);
+    JVMId jvmId = new JVMId(jobID, taskType == TaskType.MAP, jvmIdInt);
+
+    // initialize metrics
+    DefaultMetricsSystem.initialize(
+        StringUtils.camelize(taskType.name()) +"Task");
+
+    Token<JobTokenIdentifier> jt = loadCredentials(defaultConf, address);
+
+    // Create TaskUmbilicalProtocol as actual task owner.
+    UserGroupInformation taskOwner =
+      UserGroupInformation.createRemoteUser(jobID.toString());
+    taskOwner.addToken(jt);
+    final TaskUmbilicalProtocol umbilical =
+      taskOwner.doAs(new PrivilegedExceptionAction<TaskUmbilicalProtocol>() {
+      @Override
+      public TaskUmbilicalProtocol run() throws Exception {
+        return (TaskUmbilicalProtocol)RPC.getProxy(TaskUmbilicalProtocol.class,
+            TaskUmbilicalProtocol.versionID, address, defaultConf);
+      }
+    });
+
+    // report non-pid to application master
+    JvmContext context = new JvmContext(jvmId, "-1000");
+    LOG.debug("PID: " + System.getenv().get("JVM_PID"));
+    Task task = null;
+    UserGroupInformation childUGI = null;
+
+    try {
+      while (true) {
+        LOG.info("Polling for next task");
+      int idleLoopCount = 0;
+      JvmTask myTask = null;;
+      // poll for new task
+      for (int idle = 0; null == myTask; ++idle) {
+//        long sleepTimeMilliSecs = Math.min(idle * 500, 1500);
+        // XXX: Figure out sleep time.
+        long sleepTimeMilliSecs = 20;
+        LOG.info("Sleeping for " + sleepTimeMilliSecs
+            + "ms before retrying again. Got null now.");
+        MILLISECONDS.sleep(sleepTimeMilliSecs);
+        myTask = umbilical.getTask(context);
+      }
+      if (myTask.shouldDie()) {
+        return;
+      }
+
+      task = myTask.getTask();
+      YarnChild2.taskid = task.getTaskID();
+
+      // Create the job-conf and set credentials
+      final JobConf job =
+        configureTask(task, defaultConf.getCredentials(), jt);
+
+      // Initiate Java VM metrics
+      JvmMetrics.initSingleton(jvmId.toString(), job.getSessionId());
+      childUGI = UserGroupInformation.createRemoteUser(System
+          .getenv(ApplicationConstants.Environment.USER.toString()));
+      // Add tokens to new user so that it may execute its task correctly.
+      for(Token<?> token : UserGroupInformation.getCurrentUser().getTokens()) {
+        childUGI.addToken(token);
+      }
+
+      // Create a final reference to the task for the doAs block
+      final Task taskFinal = task;
+      childUGI.doAs(new PrivilegedExceptionAction<Object>() {
+        @Override
+        public Object run() throws Exception {
+          // use job-specified working directory
+          FileSystem.get(job).setWorkingDirectory(job.getWorkingDirectory());
+          taskFinal.run(job, umbilical); // run the task
+          return null;
+        }
+      });
+      LOG.info("DEBUG: _______Done executing one task_________");
+      }
+    } catch (FSError e) {
+      LOG.fatal("FSError from child", e);
+      umbilical.fsError(taskid, e.getMessage());
+    } catch (Exception exception) {
+      LOG.warn("Exception running child : "
+          + StringUtils.stringifyException(exception));
+      try {
+        if (task != null) {
+          // do cleanup for the task
+          if (childUGI == null) { // no need to job into doAs block
+            task.taskCleanup(umbilical);
+          } else {
+            final Task taskFinal = task;
+            childUGI.doAs(new PrivilegedExceptionAction<Object>() {
+              @Override
+              public Object run() throws Exception {
+                taskFinal.taskCleanup(umbilical);
+                return null;
+              }
+            });
+          }
+        }
+      } catch (Exception e) {
+        LOG.info("Exception cleaning up: " + StringUtils.stringifyException(e));
+      }
+      // Report back any failures, for diagnostic purposes
+      ByteArrayOutputStream baos = new ByteArrayOutputStream();
+      exception.printStackTrace(new PrintStream(baos));
+      if (taskid != null) {
+        umbilical.fatalError(taskid, baos.toString());
+      }
+    } catch (Throwable throwable) {
+      LOG.fatal("Error running child : "
+    	        + StringUtils.stringifyException(throwable));
+      if (taskid != null) {
+        Throwable tCause = throwable.getCause();
+        String cause = tCause == null
+                                 ? throwable.getMessage()
+                                 : StringUtils.stringifyException(tCause);
+        umbilical.fatalError(taskid, cause);
+      }
+    } finally {
+      RPC.stopProxy(umbilical);
+      DefaultMetricsSystem.shutdown();
+      // Shutting down log4j of the child-vm...
+      // This assumes that on return from Task.run()
+      // there is no more logging done.
+      LogManager.shutdown();
+    }
+  }
+
+  private static Token<JobTokenIdentifier> loadCredentials(JobConf conf,
+      InetSocketAddress address) throws IOException {
+    //load token cache storage
+    String tokenFileLocation =
+        System.getenv(ApplicationConstants.CONTAINER_TOKEN_FILE_ENV_NAME);
+    String jobTokenFile =
+        new Path(tokenFileLocation).makeQualified(FileSystem.getLocal(conf))
+            .toUri().getPath();
+    Credentials credentials =
+      TokenCache.loadTokens(jobTokenFile, conf);
+    LOG.debug("loading token. # keys =" +credentials.numberOfSecretKeys() +
+        "; from file=" + jobTokenFile);
+    Token<JobTokenIdentifier> jt = TokenCache.getJobToken(credentials);
+    SecurityUtil.setTokenService(jt, address);
+    UserGroupInformation current = UserGroupInformation.getCurrentUser();
+    current.addToken(jt);
+    for (Token<? extends TokenIdentifier> tok : credentials.getAllTokens()) {
+      current.addToken(tok);
+    }
+    // Set the credentials
+    conf.setCredentials(credentials);
+    return jt;
+  }
+
+  /**
+   * Configure mapred-local dirs. This config is used by the task for finding
+   * out an output directory.
+   * @throws IOException 
+   */
+  private static void configureLocalDirs(Task task, JobConf job) throws IOException {
+    String[] localSysDirs = StringUtils.getTrimmedStrings(
+        System.getenv(ApplicationConstants.LOCAL_DIR_ENV));
+    job.setStrings(MRConfig.LOCAL_DIR, localSysDirs);
+    LOG.info(MRConfig.LOCAL_DIR + " for child: " + job.get(MRConfig.LOCAL_DIR));
+    LocalDirAllocator lDirAlloc = new LocalDirAllocator(MRConfig.LOCAL_DIR);
+    Path workDir = null;
+    // First, try to find the JOB_LOCAL_DIR on this host.
+    try {
+      workDir = lDirAlloc.getLocalPathToRead("work", job);
+    } catch (DiskErrorException e) {
+      // DiskErrorException means dir not found. If not found, it will
+      // be created below.
+    }
+    if (workDir == null) {
+      // JOB_LOCAL_DIR doesn't exist on this host -- Create it.
+      workDir = lDirAlloc.getLocalPathForWrite("work", job);
+      FileSystem lfs = FileSystem.getLocal(job).getRaw();
+      boolean madeDir = false;
+      try {
+        madeDir = lfs.mkdirs(workDir);
+      } catch (FileAlreadyExistsException e) {
+        // Since all tasks will be running in their own JVM, the race condition
+        // exists where multiple tasks could be trying to create this directory
+        // at the same time. If this task loses the race, it's okay because
+        // the directory already exists.
+        madeDir = true;
+        workDir = lDirAlloc.getLocalPathToRead("work", job);
+      }
+      if (!madeDir) {
+          throw new IOException("Mkdirs failed to create "
+              + workDir.toString());
+      }
+    }
+    job.set(MRJobConfig.JOB_LOCAL_DIR,workDir.toString());
+  }
+
+  private static JobConf configureTask(Task task, Credentials credentials,
+      Token<JobTokenIdentifier> jt) throws IOException {
+    final JobConf job = new JobConf(MRJobConfig.JOB_CONF_FILE);
+    job.setCredentials(credentials);
+    
+    String appAttemptIdEnv = System
+        .getenv(MRJobConfig.APPLICATION_ATTEMPT_ID_ENV);
+    LOG.debug("APPLICATION_ATTEMPT_ID: " + appAttemptIdEnv);
+    // Set it in conf, so as to be able to be used the the OutputCommitter.
+    job.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, Integer
+        .parseInt(appAttemptIdEnv));
+
+    // set tcp nodelay
+    job.setBoolean("ipc.client.tcpnodelay", true);
+    job.setClass(MRConfig.TASK_LOCAL_OUTPUT_CLASS,
+        YarnOutputFiles.class, MapOutputFile.class);
+    // set the jobTokenFile into task
+    task.setJobTokenSecret(
+        JobTokenSecretManager.createSecretKey(jt.getPassword()));
+
+    // setup the child's MRConfig.LOCAL_DIR.
+    configureLocalDirs(task, job);
+
+    // setup the child's attempt directories
+    // Do the task-type specific localization
+    task.localizeConfiguration(job);
+
+    // Set up the DistributedCache related configs
+    setupDistributedCacheConfig(job);
+
+    // Overwrite the localized task jobconf which is linked to in the current
+    // work-dir.
+    Path localTaskFile = new Path(MRJobConfig.JOB_CONF_FILE);
+    writeLocalJobFile(localTaskFile, job);
+    task.setJobFile(localTaskFile.toString());
+    task.setConf(job);
+    return job;
+  }
+
+  /**
+   * Set up the DistributedCache related configs to make
+   * {@link DistributedCache#getLocalCacheFiles(Configuration)}
+   * and
+   * {@link DistributedCache#getLocalCacheArchives(Configuration)}
+   * working.
+   * @param job
+   * @throws IOException
+   */
+  private static void setupDistributedCacheConfig(final JobConf job)
+      throws IOException {
+
+    String localWorkDir = System.getenv("PWD");
+    //        ^ ^ all symlinks are created in the current work-dir
+
+    // Update the configuration object with localized archives.
+    URI[] cacheArchives = DistributedCache.getCacheArchives(job);
+    if (cacheArchives != null) {
+      List<String> localArchives = new ArrayList<String>();
+      for (int i = 0; i < cacheArchives.length; ++i) {
+        URI u = cacheArchives[i];
+        Path p = new Path(u);
+        Path name =
+            new Path((null == u.getFragment()) ? p.getName()
+                : u.getFragment());
+        String linkName = name.toUri().getPath();
+        localArchives.add(new Path(localWorkDir, linkName).toUri().getPath());
+      }
+      if (!localArchives.isEmpty()) {
+        job.set(MRJobConfig.CACHE_LOCALARCHIVES, StringUtils
+            .arrayToString(localArchives.toArray(new String[localArchives
+                .size()])));
+      }
+    }
+
+    // Update the configuration object with localized files.
+    URI[] cacheFiles = DistributedCache.getCacheFiles(job);
+    if (cacheFiles != null) {
+      List<String> localFiles = new ArrayList<String>();
+      for (int i = 0; i < cacheFiles.length; ++i) {
+        URI u = cacheFiles[i];
+        Path p = new Path(u);
+        Path name =
+            new Path((null == u.getFragment()) ? p.getName()
+                : u.getFragment());
+        String linkName = name.toUri().getPath();
+        localFiles.add(new Path(localWorkDir, linkName).toUri().getPath());
+      }
+      if (!localFiles.isEmpty()) {
+        job.set(MRJobConfig.CACHE_LOCALFILES,
+            StringUtils.arrayToString(localFiles
+                .toArray(new String[localFiles.size()])));
+      }
+    }
+  }
+
+  private static final FsPermission urw_gr =
+    FsPermission.createImmutable((short) 0640);
+
+  /**
+   * Write the task specific job-configuration file.
+   * @throws IOException
+   */
+  private static void writeLocalJobFile(Path jobFile, JobConf conf)
+      throws IOException {
+    FileSystem localFs = FileSystem.getLocal(conf);
+    localFs.delete(jobFile);
+    OutputStream out = null;
+    try {
+      out = FileSystem.create(localFs, jobFile, urw_gr);
+      conf.writeXml(out);
+    } finally {
+      IOUtils.cleanup(LOG, out);
+    }
+  }
+
+}

Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapred/YarnOutputFiles.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapred/YarnOutputFiles.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapred/YarnOutputFiles.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapred/YarnOutputFiles.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,238 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.MRConfig;
+
+/**
+ * Manipulate the working area for the transient store for maps and reduces.
+ *
+ * This class is used by map and reduce tasks to identify the directories that
+ * they need to write to/read from for intermediate files. The callers of
+ * these methods are from child space.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class YarnOutputFiles extends MapOutputFile {
+
+  private JobConf conf;
+
+  private static final String JOB_OUTPUT_DIR = "output";
+  private static final String SPILL_FILE_PATTERN = "%s_spill_%d.out";
+  private static final String SPILL_INDEX_FILE_PATTERN = SPILL_FILE_PATTERN
+      + ".index";
+
+  public YarnOutputFiles() {
+  }
+
+  // assume configured to $localdir/usercache/$user/appcache/$appId
+  private LocalDirAllocator lDirAlloc = 
+    new LocalDirAllocator(MRConfig.LOCAL_DIR);
+
+  private Path getAttemptOutputDir() {
+    return new Path(JOB_OUTPUT_DIR, conf.get(JobContext.TASK_ATTEMPT_ID));
+  }
+  
+  /**
+   * Return the path to local map output file created earlier
+   * 
+   * @return path
+   * @throws IOException
+   */
+  public Path getOutputFile() throws IOException {
+    Path attemptOutput =
+      new Path(getAttemptOutputDir(), MAP_OUTPUT_FILENAME_STRING);
+    return lDirAlloc.getLocalPathToRead(attemptOutput.toString(), conf);
+  }
+
+  /**
+   * Create a local map output file name.
+   * 
+   * @param size the size of the file
+   * @return path
+   * @throws IOException
+   */
+  public Path getOutputFileForWrite(long size) throws IOException {
+    Path attemptOutput = 
+      new Path(getAttemptOutputDir(), MAP_OUTPUT_FILENAME_STRING);
+    return lDirAlloc.getLocalPathForWrite(attemptOutput.toString(), size, conf);
+  }
+
+  /**
+   * Create a local map output file name on the same volume.
+   */
+  public Path getOutputFileForWriteInVolume(Path existing) {
+    Path outputDir = new Path(existing.getParent(), JOB_OUTPUT_DIR);
+    Path attemptOutputDir = new Path(outputDir,
+        conf.get(JobContext.TASK_ATTEMPT_ID));
+    return new Path(attemptOutputDir, MAP_OUTPUT_FILENAME_STRING);
+  }
+
+  /**
+   * Return the path to a local map output index file created earlier
+   * 
+   * @return path
+   * @throws IOException
+   */
+  public Path getOutputIndexFile() throws IOException {
+    Path attemptIndexOutput =
+      new Path(getAttemptOutputDir(), MAP_OUTPUT_FILENAME_STRING +
+                                      MAP_OUTPUT_INDEX_SUFFIX_STRING);
+    return lDirAlloc.getLocalPathToRead(attemptIndexOutput.toString(), conf);
+  }
+
+  /**
+   * Create a local map output index file name.
+   * 
+   * @param size the size of the file
+   * @return path
+   * @throws IOException
+   */
+  public Path getOutputIndexFileForWrite(long size) throws IOException {
+    Path attemptIndexOutput =
+      new Path(getAttemptOutputDir(), MAP_OUTPUT_FILENAME_STRING +
+                                      MAP_OUTPUT_INDEX_SUFFIX_STRING);
+    return lDirAlloc.getLocalPathForWrite(attemptIndexOutput.toString(),
+        size, conf);
+  }
+
+  /**
+   * Create a local map output index file name on the same volume.
+   */
+  public Path getOutputIndexFileForWriteInVolume(Path existing) {
+    Path outputDir = new Path(existing.getParent(), JOB_OUTPUT_DIR);
+    Path attemptOutputDir = new Path(outputDir,
+        conf.get(JobContext.TASK_ATTEMPT_ID));
+    return new Path(attemptOutputDir, MAP_OUTPUT_FILENAME_STRING +
+                                      MAP_OUTPUT_INDEX_SUFFIX_STRING);
+  }
+
+  /**
+   * Return a local map spill file created earlier.
+   * 
+   * @param spillNumber the number
+   * @return path
+   * @throws IOException
+   */
+  public Path getSpillFile(int spillNumber) throws IOException {
+    return lDirAlloc.getLocalPathToRead(
+        String.format(SPILL_FILE_PATTERN,
+            conf.get(JobContext.TASK_ATTEMPT_ID), spillNumber), conf);
+  }
+
+  /**
+   * Create a local map spill file name.
+   * 
+   * @param spillNumber the number
+   * @param size the size of the file
+   * @return path
+   * @throws IOException
+   */
+  public Path getSpillFileForWrite(int spillNumber, long size)
+      throws IOException {
+    return lDirAlloc.getLocalPathForWrite(
+        String.format(String.format(SPILL_FILE_PATTERN,
+            conf.get(JobContext.TASK_ATTEMPT_ID), spillNumber)), size, conf);
+  }
+
+  /**
+   * Return a local map spill index file created earlier
+   * 
+   * @param spillNumber the number
+   * @return path
+   * @throws IOException
+   */
+  public Path getSpillIndexFile(int spillNumber) throws IOException {
+    return lDirAlloc.getLocalPathToRead(
+        String.format(SPILL_INDEX_FILE_PATTERN,
+            conf.get(JobContext.TASK_ATTEMPT_ID), spillNumber), conf);
+  }
+
+  /**
+   * Create a local map spill index file name.
+   * 
+   * @param spillNumber the number
+   * @param size the size of the file
+   * @return path
+   * @throws IOException
+   */
+  public Path getSpillIndexFileForWrite(int spillNumber, long size)
+      throws IOException {
+    return lDirAlloc.getLocalPathForWrite(
+        String.format(SPILL_INDEX_FILE_PATTERN,
+            conf.get(JobContext.TASK_ATTEMPT_ID), spillNumber), size, conf);
+  }
+
+  /**
+   * Return a local reduce input file created earlier
+   * 
+   * @param mapId a map task id
+   * @return path
+   * @throws IOException 
+   */
+  public Path getInputFile(int mapId) throws IOException {
+    throw new UnsupportedOperationException("Incompatible with LocalRunner");
+  }
+
+  /**
+   * Create a local reduce input file name.
+   * 
+   * @param mapId a map task id
+   * @param size the size of the file
+   * @return path
+   * @throws IOException
+   */
+  public Path getInputFileForWrite(org.apache.hadoop.mapreduce.TaskID mapId,
+      long size) throws IOException {
+    return lDirAlloc.getLocalPathForWrite(String.format(
+        REDUCE_INPUT_FILE_FORMAT_STRING,
+        getAttemptOutputDir().toString(), mapId.getId()),
+        size, conf);
+  }
+
+  /** Removes all of the files related to a task. */
+  public void removeAll() throws IOException {
+    throw new UnsupportedOperationException("Incompatible with LocalRunner");
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    if (conf instanceof JobConf) {
+      this.conf = (JobConf) conf;
+    } else {
+      this.conf = new JobConf(conf);
+    }
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+  
+}

Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ContainerHeartbeatHandler.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ContainerHeartbeatHandler.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ContainerHeartbeatHandler.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ContainerHeartbeatHandler.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,61 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.jobhistory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.v2.app2.AppContext;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerEvent;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerEventType;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+
+public class ContainerHeartbeatHandler extends
+    HeartbeatHandlerBase<ContainerId> {
+
+ 
+  public ContainerHeartbeatHandler(AppContext context, 
+      int numThreads) {
+    super(context, numThreads, "ContainerHeartbeatHandler");
+  }
+
+  @Override
+  protected int getConfiguredTimeout(Configuration conf) {
+    // TODO Maybe define separate timeouts for Containers and tasks.
+    return conf.getInt(MRJobConfig.TASK_TIMEOUT, 5 * 60 * 1000);
+  }
+
+  @Override
+  protected int getConfiguredTimeoutCheckInterval(Configuration conf) {
+    return conf.getInt(MRJobConfig.TASK_TIMEOUT_CHECK_INTERVAL_MS, 30 * 1000);
+  }
+
+  @Override
+  protected boolean hasTimedOut(ReportTime report, long currentTime) {
+    return (timeOut > 0) && (currentTime > report.getLastPing() + timeOut);
+
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  protected void handleTimeOut(ContainerId containerId) {
+    eventHandler.handle(new AMContainerEvent(containerId,
+        AMContainerEventType.C_TIMED_OUT));
+  }
+
+}

Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/jobhistory/HeartbeatHandlerBase.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/jobhistory/HeartbeatHandlerBase.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/jobhistory/HeartbeatHandlerBase.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/jobhistory/HeartbeatHandlerBase.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,171 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.jobhistory;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.v2.app2.AppContext;
+import org.apache.hadoop.yarn.Clock;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.service.AbstractService;
+
+public abstract class HeartbeatHandlerBase<T> extends AbstractService {
+
+
+  protected int timeOut = 5 * 60 * 1000;// 5 mins
+  protected int timeOutCheckInterval = 30 * 1000; // 30 seconds.
+  protected Thread timeOutCheckerThread;
+  private final String name;
+  
+  @SuppressWarnings("rawtypes")
+  protected final EventHandler eventHandler;
+  protected final Clock clock;
+  protected final AppContext appContext;
+  
+  private ConcurrentMap<T, ReportTime> runningMap;
+  private volatile boolean stopped;
+
+  public HeartbeatHandlerBase(AppContext appContext, int numThreads, String name) {
+    super(name);
+    this.name = name;
+    this.eventHandler = appContext.getEventHandler();
+    this.clock = appContext.getClock();
+    this.appContext = appContext;
+    numThreads = numThreads == 0 ? 1 : numThreads;
+    this.runningMap = new ConcurrentHashMap<T, HeartbeatHandlerBase.ReportTime>(
+        16, 0.75f, numThreads);
+  }
+
+  @Override
+  public void init(Configuration conf) {
+    super.init(conf);
+    timeOut = getConfiguredTimeout(conf);
+    timeOutCheckInterval = getConfiguredTimeoutCheckInterval(conf);
+  }
+
+  @Override
+  public void start() {
+    timeOutCheckerThread = new Thread(createPingChecker());
+    timeOutCheckerThread.setName(name + " PingChecker");
+    timeOutCheckerThread.start();
+    super.start();
+  }
+
+  @Override
+  public void stop() {
+    stopped = true;
+    if (timeOutCheckerThread != null) {
+      timeOutCheckerThread.interrupt();
+    }
+    super.stop();
+  }
+  
+  protected Runnable createPingChecker() {
+    return new PingChecker();
+  }
+  protected abstract int getConfiguredTimeout(Configuration conf);
+  protected abstract int getConfiguredTimeoutCheckInterval(Configuration conf);
+  
+  public void progressing(T id) {
+    ReportTime time = runningMap.get(id);
+    if (time != null) {
+      time.setLastProgress(clock.getTime());
+    }
+  }
+  
+  public void pinged(T id) {
+    ReportTime time = runningMap.get(id);
+    if (time != null) {
+      time.setLastPing(clock.getTime());
+    }
+  }
+  
+  public void register(T id) {
+    runningMap.put(id, new ReportTime(clock.getTime()));
+  }
+  
+  public void unregister(T id) {
+    runningMap.remove(id);
+  }
+  
+  
+  
+  protected static class ReportTime {
+    private long lastPing;
+    private long lastProgress;
+    
+    public ReportTime(long time) {
+      setLastProgress(time);
+    }
+    
+    public synchronized void setLastPing(long time) {
+      lastPing = time;
+    }
+    
+    public synchronized void setLastProgress(long time) {
+      lastProgress = time;
+      lastPing = time;
+    }
+    
+    public synchronized long getLastPing() {
+      return lastPing;
+    }
+    
+    public synchronized long getLastProgress() {
+      return lastProgress;
+    }
+  }
+  
+  protected abstract boolean hasTimedOut(ReportTime report, long currentTime);
+  
+  protected abstract void handleTimeOut(T t);
+  
+  private class PingChecker implements Runnable {
+
+    @Override
+    public void run() {
+      while (!stopped && !Thread.currentThread().isInterrupted()) {
+        Iterator<Map.Entry<T, ReportTime>> iterator =
+            runningMap.entrySet().iterator();
+
+        // avoid calculating current time everytime in loop
+        long currentTime = clock.getTime();
+
+        while (iterator.hasNext()) {
+          Map.Entry<T, ReportTime> entry = iterator.next();    
+          if(hasTimedOut(entry.getValue(), currentTime)) {
+            // Timed out. Removed from list and send out an event.
+            iterator.remove();
+            handleTimeOut(entry.getKey());
+          }
+        }
+        try {
+          Thread.sleep(timeOutCheckInterval);
+        } catch (InterruptedException e) {
+          break;
+        }
+      }
+    }
+  }
+  
+}

Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEvent.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEvent.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEvent.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEvent.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,42 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.jobhistory;
+
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.yarn.event.AbstractEvent;
+
+public class JobHistoryEvent extends AbstractEvent<EventType>{
+
+  private final JobId jobID;
+  private final HistoryEvent historyEvent;
+
+  public JobHistoryEvent(JobId jobID, HistoryEvent historyEvent) {
+    super(historyEvent.getEventType());
+    this.jobID = jobID;
+    this.historyEvent = historyEvent;
+  }
+
+  public JobId getJobID() {
+    return jobID;
+  }
+
+  public HistoryEvent getHistoryEvent() {
+    return historyEvent;
+  }
+}

Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler2.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler2.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler2.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler2.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,920 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.jobhistory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.JobCounter;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.app2.AppContext;
+import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.service.AbstractService;
+
+/**
+ * The job history events get routed to this class. This class writes the Job
+ * history events to the DFS directly into a staging dir and then moved to a
+ * done-dir. JobHistory implementation is in this package to access package
+ * private classes.
+ */
+public class JobHistoryEventHandler2 extends AbstractService
+    implements EventHandler<JobHistoryEvent> {
+
+  private final AppContext context;
+  private final int startCount;
+
+  private int eventCounter;
+
+  //TODO Does the FS object need to be different ? 
+  private FileSystem stagingDirFS; // log Dir FileSystem
+  private FileSystem doneDirFS; // done Dir FileSystem
+
+
+  private Path stagingDirPath = null;
+  private Path doneDirPrefixPath = null; // folder for completed jobs
+
+  private int maxUnflushedCompletionEvents;
+  private int postJobCompletionMultiplier;
+  private long flushTimeout;
+  private int minQueueSizeForBatchingFlushes; // TODO: Rename
+
+  private int numUnflushedCompletionEvents = 0;
+  private boolean isTimerActive;
+
+
+  protected BlockingQueue<JobHistoryEvent> eventQueue =
+    new LinkedBlockingQueue<JobHistoryEvent>();
+  protected Thread eventHandlingThread;
+  private volatile boolean stopped;
+  private final Object lock = new Object();
+
+  private static final Log LOG = LogFactory.getLog(
+      JobHistoryEventHandler2.class);
+
+  protected static final Map<JobId, MetaInfo> fileMap =
+    Collections.<JobId,MetaInfo>synchronizedMap(new HashMap<JobId,MetaInfo>());
+
+  // Has a signal (SIGTERM etc) been issued?
+  protected volatile boolean isSignalled = false;
+
+  public JobHistoryEventHandler2(AppContext context, int startCount) {
+    super("JobHistoryEventHandler");
+    this.context = context;
+    this.startCount = startCount;
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.hadoop.yarn.service.AbstractService#init(org.
+   * apache.hadoop.conf.Configuration)
+   * Initializes the FileSystem and Path objects for the log and done directories.
+   * Creates these directories if they do not already exist.
+   */
+  @Override
+  public void init(Configuration conf) {
+
+    String jobId = TypeConverter.fromYarn(context.getApplicationID()).toString();
+
+    String stagingDirStr = null;
+    String doneDirStr = null;
+    String userDoneDirStr = null;
+    try {
+      stagingDirStr = JobHistoryUtils.getConfiguredHistoryStagingDirPrefix(conf,
+          jobId);
+      doneDirStr =
+          JobHistoryUtils.getConfiguredHistoryIntermediateDoneDirPrefix(conf);
+      userDoneDirStr =
+          JobHistoryUtils.getHistoryIntermediateDoneDirForUser(conf);
+    } catch (IOException e) {
+      LOG.error("Failed while getting the configured log directories", e);
+      throw new YarnException(e);
+    }
+
+    //Check for the existence of the history staging dir. Maybe create it. 
+    try {
+      stagingDirPath =
+          FileSystem.get(conf).makeQualified(new Path(stagingDirStr));
+      stagingDirFS = FileSystem.get(stagingDirPath.toUri(), conf);
+      mkdir(stagingDirFS, stagingDirPath, new FsPermission(
+          JobHistoryUtils.HISTORY_STAGING_DIR_PERMISSIONS));
+    } catch (IOException e) {
+      LOG.error("Failed while checking for/creating  history staging path: ["
+          + stagingDirPath + "]", e);
+      throw new YarnException(e);
+    }
+
+    //Check for the existence of intermediate done dir.
+    Path doneDirPath = null;
+    try {
+      doneDirPath = FileSystem.get(conf).makeQualified(new Path(doneDirStr));
+      doneDirFS = FileSystem.get(doneDirPath.toUri(), conf);
+      // This directory will be in a common location, or this may be a cluster
+      // meant for a single user. Creating based on the conf. Should ideally be
+      // created by the JobHistoryServer or as part of deployment.
+      if (!doneDirFS.exists(doneDirPath)) {
+      if (JobHistoryUtils.shouldCreateNonUserDirectory(conf)) {
+        LOG.info("Creating intermediate history logDir: ["
+            + doneDirPath
+            + "] + based on conf. Should ideally be created by the JobHistoryServer: "
+            + MRJobConfig.MR_AM_CREATE_JH_INTERMEDIATE_BASE_DIR);
+          mkdir(
+              doneDirFS,
+              doneDirPath,
+              new FsPermission(
+            JobHistoryUtils.HISTORY_INTERMEDIATE_DONE_DIR_PERMISSIONS
+                .toShort()));
+          // TODO Temporary toShort till new FsPermission(FsPermissions)
+          // respects
+        // sticky
+      } else {
+          String message = "Not creating intermediate history logDir: ["
+                + doneDirPath
+                + "] based on conf: "
+                + MRJobConfig.MR_AM_CREATE_JH_INTERMEDIATE_BASE_DIR
+                + ". Either set to true or pre-create this directory with" +
+                " appropriate permissions";
+        LOG.error(message);
+        throw new YarnException(message);
+      }
+      }
+    } catch (IOException e) {
+      LOG.error("Failed checking for the existance of history intermediate " +
+      		"done directory: [" + doneDirPath + "]");
+      throw new YarnException(e);
+    }
+
+    //Check/create user directory under intermediate done dir.
+    try {
+      doneDirPrefixPath =
+          FileSystem.get(conf).makeQualified(new Path(userDoneDirStr));
+      mkdir(doneDirFS, doneDirPrefixPath, new FsPermission(
+          JobHistoryUtils.HISTORY_INTERMEDIATE_USER_DIR_PERMISSIONS));
+    } catch (IOException e) {
+      LOG.error("Error creating user intermediate history done directory: [ "
+          + doneDirPrefixPath + "]", e);
+      throw new YarnException(e);
+    }
+
+    // Maximum number of unflushed completion-events that can stay in the queue
+    // before flush kicks in.
+    maxUnflushedCompletionEvents =
+        conf.getInt(MRJobConfig.MR_AM_HISTORY_MAX_UNFLUSHED_COMPLETE_EVENTS,
+            MRJobConfig.DEFAULT_MR_AM_HISTORY_MAX_UNFLUSHED_COMPLETE_EVENTS);
+    // We want to cut down flushes after job completes so as to write quicker,
+    // so we increase maxUnflushedEvents post Job completion by using the
+    // following multiplier.
+    postJobCompletionMultiplier =
+        conf.getInt(
+            MRJobConfig.MR_AM_HISTORY_JOB_COMPLETE_UNFLUSHED_MULTIPLIER,
+            MRJobConfig.DEFAULT_MR_AM_HISTORY_JOB_COMPLETE_UNFLUSHED_MULTIPLIER);
+    // Max time until which flush doesn't take place.
+    flushTimeout =
+        conf.getLong(MRJobConfig.MR_AM_HISTORY_COMPLETE_EVENT_FLUSH_TIMEOUT_MS,
+            MRJobConfig.DEFAULT_MR_AM_HISTORY_COMPLETE_EVENT_FLUSH_TIMEOUT_MS);
+    minQueueSizeForBatchingFlushes =
+        conf.getInt(
+            MRJobConfig.MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD,
+            MRJobConfig.DEFAULT_MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD);
+    
+    super.init(conf);
+  }
+
+  private void mkdir(FileSystem fs, Path path, FsPermission fsp)
+      throws IOException {
+    if (!fs.exists(path)) {
+      try {
+        fs.mkdirs(path, fsp);
+        FileStatus fsStatus = fs.getFileStatus(path);
+        LOG.info("Perms after creating " + fsStatus.getPermission().toShort()
+            + ", Expected: " + fsp.toShort());
+        if (fsStatus.getPermission().toShort() != fsp.toShort()) {
+          LOG.info("Explicitly setting permissions to : " + fsp.toShort()
+              + ", " + fsp);
+          fs.setPermission(path, fsp);
+        }
+      } catch (FileAlreadyExistsException e) {
+        LOG.info("Directory: [" + path + "] already exists.");
+      }
+    }
+  }
+
+  @Override
+  public void start() {
+    eventHandlingThread = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        JobHistoryEvent event = null;
+        while (!stopped && !Thread.currentThread().isInterrupted()) {
+
+          // Log the size of the history-event-queue every so often.
+          if (eventCounter != 0 && eventCounter % 1000 == 0) {
+            eventCounter = 0;
+            LOG.info("Size of the JobHistory event queue is "
+                + eventQueue.size());
+          } else {
+            eventCounter++;
+          }
+
+          try {
+            event = eventQueue.take();
+          } catch (InterruptedException e) {
+            LOG.info("EventQueue take interrupted. Returning");
+            return;
+          }
+          // If an event has been removed from the queue. Handle it.
+          // The rest of the queue is handled via stop()
+          // Clear the interrupt status if it's set before calling handleEvent
+          // and set it if it was set before calling handleEvent. 
+          // Interrupts received from other threads during handleEvent cannot be
+          // dealth with - Shell.runCommand() ignores them.
+          synchronized (lock) {
+            boolean isInterrupted = Thread.interrupted();
+            handleEvent(event);
+            if (isInterrupted) {
+              Thread.currentThread().interrupt();
+            }
+          }
+        }
+      }
+    });
+    eventHandlingThread.start();
+    super.start();
+  }
+
+  @Override
+  public void stop() {
+    LOG.info("Stopping JobHistoryEventHandler. "
+        + "Size of the outstanding queue size is " + eventQueue.size());
+    stopped = true;
+    //do not interrupt while event handling is in progress
+    synchronized(lock) {
+      if (eventHandlingThread != null)
+        eventHandlingThread.interrupt();
+    }
+
+    try {
+      if (eventHandlingThread != null)
+        eventHandlingThread.join();
+    } catch (InterruptedException ie) {
+      LOG.info("Interruped Exception while stopping", ie);
+    }
+
+    // Cancel all timers - so that they aren't invoked during or after
+    // the metaInfo object is wrapped up.
+    for (MetaInfo mi : fileMap.values()) {
+      try {
+        mi.shutDownTimer();
+      } catch (IOException e) {
+        LOG.info("Exception while cancelling delayed flush timer. "
+            + "Likely caused by a failed flush " + e.getMessage());
+      }
+    }
+
+    //write all the events remaining in queue
+    Iterator<JobHistoryEvent> it = eventQueue.iterator();
+    while(it.hasNext()) {
+      JobHistoryEvent ev = it.next();
+      LOG.info("In stop, writing event " + ev.getType());
+      handleEvent(ev);
+    }
+
+    // Process JobUnsuccessfulCompletionEvent for jobIds which still haven't
+    // closed their event writers
+    Iterator<JobId> jobIt = fileMap.keySet().iterator();
+    if(isSignalled) {
+      while (jobIt.hasNext()) {
+        JobId toClose = jobIt.next();
+        MetaInfo mi = fileMap.get(toClose);
+        if(mi != null && mi.isWriterActive()) {
+          LOG.warn("Found jobId " + toClose
+            + " to have not been closed. Will close");
+          //Create a JobFinishEvent so that it is written to the job history
+          JobUnsuccessfulCompletionEvent jucEvent =
+            new JobUnsuccessfulCompletionEvent(TypeConverter.fromYarn(toClose),
+              System.currentTimeMillis(), context.getJob(toClose)
+              .getCompletedMaps(), context.getJob(toClose).getCompletedReduces(),
+              JobState.KILLED.toString());
+          JobHistoryEvent jfEvent = new JobHistoryEvent(toClose, jucEvent);
+          //Bypass the queue mechanism which might wait. Call the method directly
+          handleEvent(jfEvent);
+        }
+      }
+    }
+
+    //close all file handles
+    for (MetaInfo mi : fileMap.values()) {
+      try {
+        mi.closeWriter();
+      } catch (IOException e) {
+        LOG.info("Exception while closing file " + e.getMessage());
+      }
+    }
+    LOG.info("Stopped JobHistoryEventHandler. super.stop()");
+    super.stop();
+  }
+
+  protected EventWriter createEventWriter(Path historyFilePath)
+      throws IOException {
+    FSDataOutputStream out = stagingDirFS.create(historyFilePath, true);
+    return new EventWriter(out);
+  }
+  
+  /**
+   * Create an event writer for the Job represented by the jobID.
+   * Writes out the job configuration to the log directory.
+   * This should be the first call to history for a job
+   * 
+   * @param jobId the jobId.
+   * @throws IOException
+   */
+  protected void setupEventWriter(JobId jobId)
+      throws IOException {
+    if (stagingDirPath == null) {
+      LOG.error("Log Directory is null, returning");
+      throw new IOException("Missing Log Directory for History");
+    }
+
+    MetaInfo oldFi = fileMap.get(jobId);
+    Configuration conf = getConfig();
+
+    // TODO Ideally this should be written out to the job dir
+    // (.staging/jobid/files - RecoveryService will need to be patched)
+    Path historyFile = JobHistoryUtils.getStagingJobHistoryFile(
+        stagingDirPath, jobId, startCount);
+    String user = UserGroupInformation.getCurrentUser().getShortUserName();
+    if (user == null) {
+      throw new IOException(
+          "User is null while setting up jobhistory eventwriter");
+    }
+
+    String jobName = context.getJob(jobId).getName();
+    EventWriter writer = (oldFi == null) ? null : oldFi.writer;
+ 
+    Path logDirConfPath =
+        JobHistoryUtils.getStagingConfFile(stagingDirPath, jobId, startCount);
+    if (writer == null) {
+      try {
+        writer = createEventWriter(historyFile);
+        LOG.info("Event Writer setup for JobId: " + jobId + ", File: "
+            + historyFile);
+      } catch (IOException ioe) {
+        LOG.info("Could not create log file: [" + historyFile + "] + for job "
+            + "[" + jobName + "]");
+        throw ioe;
+      }
+      
+      //Write out conf only if the writer isn't already setup.
+      if (conf != null) {
+        // TODO Ideally this should be written out to the job dir
+        // (.staging/jobid/files - RecoveryService will need to be patched)
+        FSDataOutputStream jobFileOut = null;
+        try {
+          if (logDirConfPath != null) {
+            jobFileOut = stagingDirFS.create(logDirConfPath, true);
+            conf.writeXml(jobFileOut);
+            jobFileOut.close();
+          }
+        } catch (IOException e) {
+          LOG.info("Failed to write the job configuration file", e);
+          throw e;
+        }
+      }
+    }
+
+    MetaInfo fi = new MetaInfo(historyFile, logDirConfPath, writer,
+        user, jobName, jobId);
+    fi.getJobSummary().setJobId(jobId);
+    fileMap.put(jobId, fi);
+  }
+
+  /** Close the event writer for this id 
+   * @throws IOException */
+  public void closeWriter(JobId id) throws IOException {
+    try {
+      final MetaInfo mi = fileMap.get(id);
+      if (mi != null) {
+        mi.closeWriter();
+      }
+      
+    } catch (IOException e) {
+      LOG.error("Error closing writer for JobID: " + id);
+      throw e;
+    }
+  }
+
+  @Override
+  public void handle(JobHistoryEvent event) {
+    try {
+      if (isJobCompletionEvent(event.getHistoryEvent())) {
+        // When the job is complete, flush slower but write faster.
+        maxUnflushedCompletionEvents =
+            maxUnflushedCompletionEvents * postJobCompletionMultiplier;
+      }
+
+      eventQueue.put(event);
+    } catch (InterruptedException e) {
+      throw new YarnException(e);
+    }
+  }
+
+  private boolean isJobCompletionEvent(HistoryEvent historyEvent) {
+    if (EnumSet.of(EventType.JOB_FINISHED, EventType.JOB_FAILED,
+        EventType.JOB_KILLED).contains(historyEvent.getEventType())) {
+      return true;
+    }
+    return false;
+  }
+
+  protected void handleEvent(JobHistoryEvent event) {
+    synchronized (lock) {
+
+      // If this is JobSubmitted Event, setup the writer
+      if (event.getHistoryEvent().getEventType() == EventType.AM_STARTED) {
+        try {
+          setupEventWriter(event.getJobID());
+        } catch (IOException ioe) {
+          LOG.error("Error JobHistoryEventHandler in handleEvent: " + event,
+              ioe);
+          throw new YarnException(ioe);
+        }
+      }
+
+      // For all events
+      // (1) Write it out
+      // (2) Process it for JobSummary
+      MetaInfo mi = fileMap.get(event.getJobID());
+      try {
+        HistoryEvent historyEvent = event.getHistoryEvent();
+        if (! (historyEvent instanceof NormalizedResourceEvent)) {
+          mi.writeEvent(historyEvent);
+        }
+        processEventForJobSummary(event.getHistoryEvent(), mi.getJobSummary(),
+            event.getJobID());
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("In HistoryEventHandler "
+              + event.getHistoryEvent().getEventType());
+        }
+      } catch (IOException e) {
+        LOG.error("Error writing History Event: " + event.getHistoryEvent(),
+            e);
+        throw new YarnException(e);
+      }
+
+      if (event.getHistoryEvent().getEventType() == EventType.JOB_SUBMITTED) {
+        JobSubmittedEvent jobSubmittedEvent =
+            (JobSubmittedEvent) event.getHistoryEvent();
+        mi.getJobIndexInfo().setSubmitTime(jobSubmittedEvent.getSubmitTime());
+        mi.getJobIndexInfo().setQueueName(jobSubmittedEvent.getJobQueueName());
+      }
+     
+      // If this is JobFinishedEvent, close the writer and setup the job-index
+      if (event.getHistoryEvent().getEventType() == EventType.JOB_FINISHED) {
+        try {
+          JobFinishedEvent jFinishedEvent =
+              (JobFinishedEvent) event.getHistoryEvent();
+          mi.getJobIndexInfo().setFinishTime(jFinishedEvent.getFinishTime());
+          mi.getJobIndexInfo().setNumMaps(jFinishedEvent.getFinishedMaps());
+          mi.getJobIndexInfo().setNumReduces(
+              jFinishedEvent.getFinishedReduces());
+          mi.getJobIndexInfo().setJobStatus(JobState.SUCCEEDED.toString());
+          closeEventWriter(event.getJobID());
+        } catch (IOException e) {
+          throw new YarnException(e);
+        }
+      }
+
+      if (event.getHistoryEvent().getEventType() == EventType.JOB_FAILED
+          || event.getHistoryEvent().getEventType() == EventType.JOB_KILLED) {
+        try {
+          JobUnsuccessfulCompletionEvent jucEvent = 
+              (JobUnsuccessfulCompletionEvent) event
+              .getHistoryEvent();
+          mi.getJobIndexInfo().setFinishTime(jucEvent.getFinishTime());
+          mi.getJobIndexInfo().setNumMaps(jucEvent.getFinishedMaps());
+          mi.getJobIndexInfo().setNumReduces(jucEvent.getFinishedReduces());
+          mi.getJobIndexInfo().setJobStatus(jucEvent.getStatus());
+          closeEventWriter(event.getJobID());
+        } catch (IOException e) {
+          throw new YarnException(e);
+        }
+      }
+    }
+  }
+
+  public void processEventForJobSummary(HistoryEvent event, JobSummary summary, 
+      JobId jobId) {
+    // context.getJob could be used for some of this info as well.
+    switch (event.getEventType()) {
+    case JOB_SUBMITTED:
+      JobSubmittedEvent jse = (JobSubmittedEvent) event;
+      summary.setUser(jse.getUserName());
+      summary.setQueue(jse.getJobQueueName());
+      summary.setJobSubmitTime(jse.getSubmitTime());
+      summary.setJobName(jse.getJobName());
+      break;
+    case NORMALIZED_RESOURCE:
+      NormalizedResourceEvent normalizedResourceEvent = 
+            (NormalizedResourceEvent) event;
+      if (normalizedResourceEvent.getTaskType() == TaskType.MAP) {
+        summary.setResourcesPerMap(normalizedResourceEvent.getMemory());
+      } else if (normalizedResourceEvent.getTaskType() == TaskType.REDUCE) {
+        summary.setResourcesPerReduce(normalizedResourceEvent.getMemory());
+      }
+      break;  
+    case JOB_INITED:
+      JobInitedEvent jie = (JobInitedEvent) event;
+      summary.setJobLaunchTime(jie.getLaunchTime());
+      break;
+    case MAP_ATTEMPT_STARTED:
+      TaskAttemptStartedEvent mtase = (TaskAttemptStartedEvent) event;
+      if (summary.getFirstMapTaskLaunchTime() == 0)
+        summary.setFirstMapTaskLaunchTime(mtase.getStartTime());
+      break;
+    case REDUCE_ATTEMPT_STARTED:
+      TaskAttemptStartedEvent rtase = (TaskAttemptStartedEvent) event;
+      if (summary.getFirstReduceTaskLaunchTime() == 0)
+        summary.setFirstReduceTaskLaunchTime(rtase.getStartTime());
+      break;
+    case JOB_FINISHED:
+      JobFinishedEvent jfe = (JobFinishedEvent) event;
+      summary.setJobFinishTime(jfe.getFinishTime());
+      summary.setNumFinishedMaps(jfe.getFinishedMaps());
+      summary.setNumFailedMaps(jfe.getFailedMaps());
+      summary.setNumFinishedReduces(jfe.getFinishedReduces());
+      summary.setNumFailedReduces(jfe.getFailedReduces());
+      if (summary.getJobStatus() == null)
+        summary
+            .setJobStatus(org.apache.hadoop.mapreduce.JobStatus.State.SUCCEEDED
+                .toString());
+      // TODO JOB_FINISHED does not have state. Effectively job history does not
+      // have state about the finished job.
+      setSummarySlotSeconds(summary, jfe.getTotalCounters());
+      break;
+    case JOB_FAILED:
+    case JOB_KILLED:
+      JobUnsuccessfulCompletionEvent juce = (JobUnsuccessfulCompletionEvent) event;
+      summary.setJobStatus(juce.getStatus());
+      summary.setNumFinishedMaps(context.getJob(jobId).getTotalMaps());
+      summary.setNumFinishedReduces(context.getJob(jobId).getTotalReduces());
+      summary.setJobFinishTime(juce.getFinishTime());
+      setSummarySlotSeconds(summary, context.getJob(jobId).getAllCounters());
+      break;
+    }
+  }
+
+  private void setSummarySlotSeconds(JobSummary summary, Counters allCounters) {
+
+    Counter slotMillisMapCounter = allCounters
+      .findCounter(JobCounter.SLOTS_MILLIS_MAPS);
+    if (slotMillisMapCounter != null) {
+      summary.setMapSlotSeconds(slotMillisMapCounter.getValue() / 1000);
+    }
+
+    Counter slotMillisReduceCounter = allCounters
+      .findCounter(JobCounter.SLOTS_MILLIS_REDUCES);
+    if (slotMillisReduceCounter != null) {
+      summary.setReduceSlotSeconds(slotMillisReduceCounter.getValue() / 1000);
+    }
+  }
+
+  protected void closeEventWriter(JobId jobId) throws IOException {
+
+    final MetaInfo mi = fileMap.get(jobId);
+    if (mi == null) {
+      throw new IOException("No MetaInfo found for JobId: [" + jobId + "]");
+    }
+
+    if (!mi.isWriterActive()) {
+      throw new IOException(
+          "Inactive Writer: Likely received multiple JobFinished / " +
+          "JobUnsuccessful events for JobId: ["
+              + jobId + "]");
+    }
+
+    // Close the Writer
+    try {
+      mi.closeWriter();
+    } catch (IOException e) {
+      LOG.error("Error closing writer for JobID: " + jobId);
+      throw e;
+    }
+     
+    if (mi.getHistoryFile() == null) {
+      LOG.warn("No file for job-history with " + jobId + " found in cache!");
+    }
+    if (mi.getConfFile() == null) {
+      LOG.warn("No file for jobconf with " + jobId + " found in cache!");
+    }
+      
+    // Writing out the summary file.
+    // TODO JH enhancement - reuse this file to store additional indexing info
+    // like ACLs, etc. JHServer can use HDFS append to build an index file
+    // with more info than is available via the filename.
+    Path qualifiedSummaryDoneFile = null;
+    FSDataOutputStream summaryFileOut = null;
+    try {
+      String doneSummaryFileName = getTempFileName(JobHistoryUtils
+          .getIntermediateSummaryFileName(jobId));
+      qualifiedSummaryDoneFile = doneDirFS.makeQualified(new Path(
+          doneDirPrefixPath, doneSummaryFileName));
+      summaryFileOut = doneDirFS.create(qualifiedSummaryDoneFile, true);
+      summaryFileOut.writeUTF(mi.getJobSummary().getJobSummaryString());
+      summaryFileOut.close();
+    } catch (IOException e) {
+      LOG.info("Unable to write out JobSummaryInfo to ["
+          + qualifiedSummaryDoneFile + "]", e);
+      throw e;
+    }
+
+    try {
+
+      // Move historyFile to Done Folder.
+      Path qualifiedDoneFile = null;
+      if (mi.getHistoryFile() != null) {
+        Path historyFile = mi.getHistoryFile();
+        Path qualifiedLogFile = stagingDirFS.makeQualified(historyFile);
+        String doneJobHistoryFileName =
+            getTempFileName(FileNameIndexUtils.getDoneFileName(mi
+                .getJobIndexInfo()));
+        qualifiedDoneFile =
+            doneDirFS.makeQualified(new Path(doneDirPrefixPath,
+                doneJobHistoryFileName));
+        moveToDoneNow(qualifiedLogFile, qualifiedDoneFile);
+      }
+
+      // Move confFile to Done Folder
+      Path qualifiedConfDoneFile = null;
+      if (mi.getConfFile() != null) {
+        Path confFile = mi.getConfFile();
+        Path qualifiedConfFile = stagingDirFS.makeQualified(confFile);
+        String doneConfFileName =
+            getTempFileName(JobHistoryUtils
+                .getIntermediateConfFileName(jobId));
+        qualifiedConfDoneFile =
+            doneDirFS.makeQualified(new Path(doneDirPrefixPath,
+                doneConfFileName));
+        moveToDoneNow(qualifiedConfFile, qualifiedConfDoneFile);
+      }
+      
+      moveTmpToDone(qualifiedSummaryDoneFile);
+      moveTmpToDone(qualifiedConfDoneFile);
+      moveTmpToDone(qualifiedDoneFile);
+
+    } catch (IOException e) {
+      LOG.error("Error closing writer for JobID: " + jobId);
+      throw e;
+    }
+  }
+
+  private class FlushTimerTask extends TimerTask {
+    private MetaInfo metaInfo;
+    private IOException ioe = null;
+    private volatile boolean shouldRun = true;
+
+    FlushTimerTask(MetaInfo metaInfo) {
+      this.metaInfo = metaInfo;
+    }
+
+    @Override
+    public void run() {
+      synchronized (lock) {
+        try {
+          if (!metaInfo.isTimerShutDown() && shouldRun)
+            metaInfo.flush();
+        } catch (IOException e) {
+          ioe = e;
+        }
+      }
+    }
+
+    public IOException getException() {
+      return ioe;
+    }
+
+    public void stop() {
+      shouldRun = false;
+      this.cancel();
+    }
+  }
+
+  protected class MetaInfo {
+    private Path historyFile;
+    private Path confFile;
+    private EventWriter writer;
+    JobIndexInfo jobIndexInfo;
+    JobSummary jobSummary;
+    Timer flushTimer; 
+    FlushTimerTask flushTimerTask;
+    private boolean isTimerShutDown = false;
+
+    MetaInfo(Path historyFile, Path conf, EventWriter writer, String user,
+        String jobName, JobId jobId) {
+      this.historyFile = historyFile;
+      this.confFile = conf;
+      this.writer = writer;
+      this.jobIndexInfo =
+          new JobIndexInfo(-1, -1, user, jobName, jobId, -1, -1, null);
+      this.jobSummary = new JobSummary();
+      this.flushTimer = new Timer("FlushTimer", true);
+    }
+
+    Path getHistoryFile() {
+      return historyFile;
+    }
+
+    Path getConfFile() {
+      return confFile;
+    }
+
+    JobIndexInfo getJobIndexInfo() {
+      return jobIndexInfo;
+    }
+
+    JobSummary getJobSummary() {
+      return jobSummary;
+    }
+
+    boolean isWriterActive() {
+      return writer != null;
+    }
+    
+    boolean isTimerShutDown() {
+      return isTimerShutDown;
+    }
+
+    void closeWriter() throws IOException {
+      synchronized (lock) {
+        if (writer != null) {
+          writer.close();
+        }
+        writer = null;
+      }
+    }
+
+    void writeEvent(HistoryEvent event) throws IOException {
+      synchronized (lock) {
+        if (writer != null) {
+          writer.write(event);
+          processEventForFlush(event);
+          maybeFlush(event);
+        }
+      }
+    }
+
+    void processEventForFlush(HistoryEvent historyEvent) throws IOException {
+      if (EnumSet.of(EventType.MAP_ATTEMPT_FINISHED,
+          EventType.MAP_ATTEMPT_FAILED, EventType.MAP_ATTEMPT_KILLED,
+          EventType.REDUCE_ATTEMPT_FINISHED, EventType.REDUCE_ATTEMPT_FAILED,
+          EventType.REDUCE_ATTEMPT_KILLED, EventType.TASK_FINISHED,
+          EventType.TASK_FAILED, EventType.JOB_FINISHED, EventType.JOB_FAILED,
+          EventType.JOB_KILLED).contains(historyEvent.getEventType())) {
+        numUnflushedCompletionEvents++;
+        if (!isTimerActive) {
+          resetFlushTimer();
+          if (!isTimerShutDown) {
+            flushTimerTask = new FlushTimerTask(this);
+            flushTimer.schedule(flushTimerTask, flushTimeout);
+          }
+        }
+      }
+    }
+
+    void resetFlushTimer() throws IOException {
+      if (flushTimerTask != null) {
+        IOException exception = flushTimerTask.getException();
+        flushTimerTask.stop();
+        if (exception != null) {
+          throw exception;
+        }
+        flushTimerTask = null;
+      }
+      isTimerActive = false;
+    }
+
+    void maybeFlush(HistoryEvent historyEvent) throws IOException {
+      if ((eventQueue.size() < minQueueSizeForBatchingFlushes 
+          && numUnflushedCompletionEvents > 0)
+          || numUnflushedCompletionEvents >= maxUnflushedCompletionEvents 
+          || isJobCompletionEvent(historyEvent)) {
+        this.flush();
+      }
+    }
+
+    void flush() throws IOException {
+      synchronized (lock) {
+        if (numUnflushedCompletionEvents != 0) { // skipped timer cancel.
+          writer.flush();
+          numUnflushedCompletionEvents = 0;
+          resetFlushTimer();
+        }
+      }
+    }
+
+    void shutDownTimer() throws IOException {
+      synchronized (lock) {
+        isTimerShutDown = true;
+        flushTimer.cancel();
+        if (flushTimerTask != null && flushTimerTask.getException() != null) {
+          throw flushTimerTask.getException();
+        }
+      }
+    }
+  }
+
+  private void moveTmpToDone(Path tmpPath) throws IOException {
+    if (tmpPath != null) {
+      String tmpFileName = tmpPath.getName();
+      String fileName = getFileNameFromTmpFN(tmpFileName);
+      Path path = new Path(tmpPath.getParent(), fileName);
+      doneDirFS.rename(tmpPath, path);
+      LOG.info("Moved tmp to done: " + tmpPath + " to " + path);
+    }
+  }
+  
+  // TODO If the FS objects are the same, this should be a rename instead of a
+  // copy.
+  private void moveToDoneNow(Path fromPath, Path toPath) throws IOException {
+    // check if path exists, in case of retries it may not exist
+    if (stagingDirFS.exists(fromPath)) {
+      LOG.info("Moving " + fromPath.toString() + " to " + toPath.toString());
+      // TODO temporarily removing the existing dst
+      if (doneDirFS.exists(toPath)) {
+        doneDirFS.delete(toPath, true);
+      }
+      boolean copied = FileUtil.copy(stagingDirFS, fromPath, doneDirFS, toPath,
+          false, getConfig());
+
+      if (copied)
+        LOG.info("Copied to done location: " + toPath);
+      else 
+          LOG.info("copy failed");
+      doneDirFS.setPermission(toPath, new FsPermission(
+          JobHistoryUtils.HISTORY_INTERMEDIATE_FILE_PERMISSIONS));
+      
+      stagingDirFS.delete(fromPath, false);
+    }
+    }
+
+  boolean pathExists(FileSystem fileSys, Path path) throws IOException {
+    return fileSys.exists(path);
+  }
+
+  private String getTempFileName(String srcFile) {
+    return srcFile + "_tmp";
+  }
+  
+  private String getFileNameFromTmpFN(String tmpFileName) {
+    //TODO. Some error checking here.
+    return tmpFileName.substring(0, tmpFileName.length()-4);
+  }
+
+  public void setSignalled(boolean isSignalled) {
+    this.isSignalled = isSignalled;
+    LOG.info("JobHistoryEventHandler notified that isSignalled was "
+      + isSignalled);
+  }
+}

Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobSummary.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobSummary.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobSummary.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobSummary.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,254 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.jobhistory;
+
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.util.StringUtils;
+
+public class JobSummary {
+  private JobId jobId;
+  private long jobSubmitTime;
+  private long jobLaunchTime;
+  private long firstMapTaskLaunchTime; // MapAttempteStarted |
+                                       // TaskAttemptStartEvent
+  private long firstReduceTaskLaunchTime; // ReduceAttemptStarted |
+                                          // TaskAttemptStartEvent
+  private long jobFinishTime;
+  private int numFinishedMaps;
+  private int numFailedMaps;
+  private int numFinishedReduces;
+  private int numFailedReduces;
+  private int resourcesPerMap; // resources used per map/min resource
+  private int resourcesPerReduce; // resources used per reduce/min resource
+  // resource models
+  // private int numSlotsPerReduce; | Doesn't make sense with potentially
+  // different resource models
+  private String user;
+  private String queue;
+  private String jobStatus;
+  private long mapSlotSeconds; // TODO Not generated yet in MRV2
+  private long reduceSlotSeconds; // TODO Not generated yet MRV2
+  // private int clusterSlotCapacity;
+  private String jobName;
+
+  JobSummary() {
+  }
+
+  public JobId getJobId() {
+    return jobId;
+  }
+
+  public void setJobId(JobId jobId) {
+    this.jobId = jobId;
+  }
+
+  public long getJobSubmitTime() {
+    return jobSubmitTime;
+  }
+
+  public void setJobSubmitTime(long jobSubmitTime) {
+    this.jobSubmitTime = jobSubmitTime;
+  }
+
+  public long getJobLaunchTime() {
+    return jobLaunchTime;
+  }
+
+  public void setJobLaunchTime(long jobLaunchTime) {
+    this.jobLaunchTime = jobLaunchTime;
+  }
+
+  public long getFirstMapTaskLaunchTime() {
+    return firstMapTaskLaunchTime;
+  }
+
+  public void setFirstMapTaskLaunchTime(long firstMapTaskLaunchTime) {
+    this.firstMapTaskLaunchTime = firstMapTaskLaunchTime;
+  }
+
+  public long getFirstReduceTaskLaunchTime() {
+    return firstReduceTaskLaunchTime;
+  }
+
+  public void setFirstReduceTaskLaunchTime(long firstReduceTaskLaunchTime) {
+    this.firstReduceTaskLaunchTime = firstReduceTaskLaunchTime;
+  }
+
+  public long getJobFinishTime() {
+    return jobFinishTime;
+  }
+
+  public void setJobFinishTime(long jobFinishTime) {
+    this.jobFinishTime = jobFinishTime;
+  }
+
+  public int getNumFinishedMaps() {
+    return numFinishedMaps;
+  }
+
+  public void setNumFinishedMaps(int numFinishedMaps) {
+    this.numFinishedMaps = numFinishedMaps;
+  }
+
+  public int getNumFailedMaps() {
+    return numFailedMaps;
+  }
+
+  public void setNumFailedMaps(int numFailedMaps) {
+    this.numFailedMaps = numFailedMaps;
+  }
+
+  public int getResourcesPerMap() {
+    return resourcesPerMap;
+  }
+  
+  public void setResourcesPerMap(int resourcesPerMap) {
+    this.resourcesPerMap = resourcesPerMap;
+  }
+  
+  public int getNumFinishedReduces() {
+    return numFinishedReduces;
+  }
+
+  public void setNumFinishedReduces(int numFinishedReduces) {
+    this.numFinishedReduces = numFinishedReduces;
+  }
+
+  public int getNumFailedReduces() {
+    return numFailedReduces;
+  }
+
+  public void setNumFailedReduces(int numFailedReduces) {
+    this.numFailedReduces = numFailedReduces;
+  }
+
+  public int getResourcesPerReduce() {
+    return this.resourcesPerReduce;
+  }
+  
+  public void setResourcesPerReduce(int resourcesPerReduce) {
+    this.resourcesPerReduce = resourcesPerReduce;
+  }
+  
+  public String getUser() {
+    return user;
+  }
+
+  public void setUser(String user) {
+    this.user = user;
+  }
+
+  public String getQueue() {
+    return queue;
+  }
+
+  public void setQueue(String queue) {
+    this.queue = queue;
+  }
+
+  public String getJobStatus() {
+    return jobStatus;
+  }
+
+  public void setJobStatus(String jobStatus) {
+    this.jobStatus = jobStatus;
+  }
+
+  public long getMapSlotSeconds() {
+    return mapSlotSeconds;
+  }
+
+  public void setMapSlotSeconds(long mapSlotSeconds) {
+    this.mapSlotSeconds = mapSlotSeconds;
+  }
+
+  public long getReduceSlotSeconds() {
+    return reduceSlotSeconds;
+  }
+
+  public void setReduceSlotSeconds(long reduceSlotSeconds) {
+    this.reduceSlotSeconds = reduceSlotSeconds;
+  }
+
+  public String getJobName() {
+    return jobName;
+  }
+
+  public void setJobName(String jobName) {
+    this.jobName = jobName;
+  }
+
+  public String getJobSummaryString() {
+    SummaryBuilder summary = new SummaryBuilder()
+      .add("jobId", jobId)
+      .add("submitTime", jobSubmitTime)
+      .add("launchTime", jobLaunchTime)
+      .add("firstMapTaskLaunchTime", firstMapTaskLaunchTime)
+      .add("firstReduceTaskLaunchTime", firstReduceTaskLaunchTime)
+      .add("finishTime", jobFinishTime)
+      .add("resourcesPerMap", resourcesPerMap)
+      .add("resourcesPerReduce", resourcesPerReduce)
+      .add("numMaps", numFinishedMaps + numFailedMaps)
+      .add("numReduces", numFinishedReduces + numFailedReduces)
+      .add("user", user)
+      .add("queue", queue)
+      .add("status", jobStatus)
+      .add("mapSlotSeconds", mapSlotSeconds)
+      .add("reduceSlotSeconds", reduceSlotSeconds)
+      .add("jobName", jobName);
+    return summary.toString();
+  }
+
+  static final char EQUALS = '=';
+  static final char[] charsToEscape = { StringUtils.COMMA, EQUALS,
+      StringUtils.ESCAPE_CHAR };
+  
+  static class SummaryBuilder {
+    final StringBuilder buffer = new StringBuilder();
+
+    // A little optimization for a very common case
+    SummaryBuilder add(String key, long value) {
+      return _add(key, Long.toString(value));
+    }
+
+    <T> SummaryBuilder add(String key, T value) {
+      return _add(key, StringUtils.escapeString(String.valueOf(value),
+          StringUtils.ESCAPE_CHAR, charsToEscape));
+    }
+
+    SummaryBuilder add(SummaryBuilder summary) {
+      if (buffer.length() > 0)
+        buffer.append(StringUtils.COMMA);
+      buffer.append(summary.buffer);
+      return this;
+    }
+
+    SummaryBuilder _add(String key, String value) {
+      if (buffer.length() > 0)
+        buffer.append(StringUtils.COMMA);
+      buffer.append(key).append(EQUALS).append(value);
+      return this;
+    }
+
+    @Override
+    public String toString() {
+      return buffer.toString();
+    }
+  }
+}

Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/AppContext.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/AppContext.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/AppContext.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/AppContext.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,72 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app2;
+
+import java.util.Map;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.app2.job.Job;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainer;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerMap;
+import org.apache.hadoop.mapreduce.v2.app2.rm.node.AMNode;
+import org.apache.hadoop.mapreduce.v2.app2.rm.node.AMNodeMap;
+import org.apache.hadoop.yarn.Clock;
+import org.apache.hadoop.yarn.ClusterInfo;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.event.EventHandler;
+
+
+/**
+ * Context interface for sharing information across components in YARN App.
+ */
+@InterfaceAudience.Private
+public interface AppContext {
+
+  ApplicationId getApplicationID();
+
+  ApplicationAttemptId getApplicationAttemptId();
+
+  String getApplicationName();
+  
+  Map<ApplicationAccessType, String> getApplicationACLs();
+
+  long getStartTime();
+
+  CharSequence getUser();
+
+  Job getJob(JobId jobID);
+
+  Map<JobId, Job> getAllJobs();
+
+  @SuppressWarnings("rawtypes")
+  EventHandler getEventHandler();
+
+  Clock getClock();
+  
+  ClusterInfo getClusterInfo();
+  
+  AMContainerMap getAllContainers();
+  
+  AMNodeMap getAllNodes();
+}

Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/ControlledClock.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/ControlledClock.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/ControlledClock.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/ControlledClock.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,43 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.hadoop.mapreduce.v2.app2;
+
+import org.apache.hadoop.yarn.Clock;
+
+public class ControlledClock implements Clock {
+  private long time = -1;
+  private final Clock actualClock;
+  public ControlledClock(Clock actualClock) {
+    this.actualClock = actualClock;
+  }
+  public synchronized void setTime(long time) {
+    this.time = time;
+  }
+  public synchronized void reset() {
+    time = -1;
+  }
+
+  @Override
+  public synchronized long getTime() {
+    if (time != -1) {
+      return time;
+    }
+    return actualClock.getTime();
+  }
+
+}



Mime
View raw message