hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tgra...@apache.org
Subject svn commit: r1311896 [1/2] - in /hadoop/common/trunk/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/ hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/mai...
Date Tue, 10 Apr 2012 18:11:27 GMT
Author: tgraves
Date: Tue Apr 10 18:11:26 2012
New Revision: 1311896

URL: http://svn.apache.org/viewvc?rev=1311896&view=rev
Log:
MAPREDUCE-4059. The history server should have a separate pluggable storage/query interface. (Robert Evans via tgraves).

Added:
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CachedHistoryStorage.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryStorage.java
Modified:
    hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryContext.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsWebApp.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsWebServices.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServices.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesAttempts.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesJobConf.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesJobs.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesJobsQuery.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesTasks.java

Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1311896&r1=1311895&r2=1311896&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Tue Apr 10 18:11:26 2012
@@ -235,6 +235,9 @@ Release 0.23.3 - UNRELEASED
 
   IMPROVEMENTS
 
+    MAPREDUCE-4059. The history server should have a separate pluggable 
+    storage/query interface. (Robert Evans via tgraves)
+
   OPTIMIZATIONS
 
   BUG FIXES

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java?rev=1311896&r1=1311895&r2=1311896&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java Tue Apr 10 18:11:26 2012
@@ -44,6 +44,9 @@ public class JHAdminConfig {
   /** Run the History Cleaner every X ms.*/
   public static final String MR_HISTORY_CLEANER_INTERVAL_MS = 
     MR_HISTORY_PREFIX + "cleaner.interval-ms";
+  public static final long DEFAULT_MR_HISTORY_CLEANER_INTERVAL_MS = 
+    1 * 24 * 60 * 60 * 1000l; //1 day
+  
   
   /** The number of threads to handle client API requests.*/
   public static final String MR_HISTORY_CLIENT_THREAD_COUNT = 
@@ -56,7 +59,9 @@ public class JHAdminConfig {
    */
   public static final String MR_HISTORY_DATESTRING_CACHE_SIZE = 
     MR_HISTORY_PREFIX + "datestring.cache.size";
+  public static final int DEFAULT_MR_HISTORY_DATESTRING_CACHE_SIZE = 200000;
   
+  //TODO REMOVE debug-mode
   /** Equivalent to 0.20 mapreduce.jobhistory.debug.mode */
   public static final String MR_HISTORY_DEBUG_MODE = 
     MR_HISTORY_PREFIX + "debug-mode";
@@ -75,6 +80,7 @@ public class JHAdminConfig {
   /** Size of the job list cache.*/
   public static final String MR_HISTORY_JOBLIST_CACHE_SIZE =
     MR_HISTORY_PREFIX + "joblist.cache.size";
+  public static final int DEFAULT_MR_HISTORY_JOBLIST_CACHE_SIZE = 20000;
   
   /** The location of the Kerberos keytab file.*/
   public static final String MR_HISTORY_KEYTAB = MR_HISTORY_PREFIX + "keytab";
@@ -82,6 +88,7 @@ public class JHAdminConfig {
   /** Size of the loaded job cache.*/
   public static final String MR_HISTORY_LOADED_JOB_CACHE_SIZE = 
     MR_HISTORY_PREFIX + "loadedjobs.cache.size";
+  public static final int DEFAULT_MR_HISTORY_LOADED_JOB_CACHE_SIZE = 5;
   
   /**
    * The maximum age of a job history file before it is deleted from the history
@@ -89,6 +96,8 @@ public class JHAdminConfig {
    */
   public static final String MR_HISTORY_MAX_AGE_MS =
     MR_HISTORY_PREFIX + "max-age-ms";
+  public static final long DEFAULT_MR_HISTORY_MAX_AGE = 
+    7 * 24 * 60 * 60 * 1000L; //1 week
   
   /**
    * Scan for history files to more from intermediate done dir to done dir
@@ -96,10 +105,13 @@ public class JHAdminConfig {
    */
   public static final String MR_HISTORY_MOVE_INTERVAL_MS = 
     MR_HISTORY_PREFIX + "move.interval-ms";
+  public static final long DEFAULT_MR_HISTORY_MOVE_INTERVAL_MS = 
+    3 * 60 * 1000l; //3 minutes
   
   /** The number of threads used to move files.*/
   public static final String MR_HISTORY_MOVE_THREAD_COUNT = 
     MR_HISTORY_PREFIX + "move.thread-count";
+  public static final int DEFAULT_MR_HISTORY_MOVE_THREAD_COUNT = 3;
   
   /** The Kerberos principal for the history server.*/
   public static final String MR_HISTORY_PRINCIPAL = 
@@ -116,4 +128,10 @@ public class JHAdminConfig {
    */
   public static final String MR_HS_SECURITY_SERVICE_AUTHORIZATION =
       "security.mrhs.client.protocol.acl";
+  
+  /**
+   * The HistoryStorage class to use to cache history data.
+   */
+  public static final String MR_HISTORY_STORAGE =
+    MR_HISTORY_PREFIX + ".store.class";
 }

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java?rev=1311896&r1=1311895&r2=1311896&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java Tue Apr 10 18:11:26 2012
@@ -31,6 +31,8 @@ import java.util.concurrent.atomic.Atomi
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.FileStatus;
@@ -50,6 +52,8 @@ import org.apache.hadoop.yarn.api.record
 import com.google.common.base.Joiner;
 import com.google.common.base.Splitter;
 
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
 public class JobHistoryUtils {
   
   /**

Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CachedHistoryStorage.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CachedHistoryStorage.java?rev=1311896&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CachedHistoryStorage.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CachedHistoryStorage.java Tue Apr 10 18:11:26 2012
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.hs;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobsInfo;
+import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.MetaInfo;
+import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobInfo;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.service.AbstractService;
+
+/**
+ * Manages an in memory cache of parsed Job History files.
+ */
+public class CachedHistoryStorage extends AbstractService implements
+    HistoryStorage {
+  private static final Log LOG = LogFactory.getLog(CachedHistoryStorage.class);
+
+  private Map<JobId, Job> loadedJobCache = null;
+  // The number of loaded jobs.
+  private int loadedJobCacheSize;
+
+  private HistoryFileManager hsManager;
+
+  @Override
+  public void setHistoryFileManager(HistoryFileManager hsManager) {
+    this.hsManager = hsManager;
+  }
+
+  @SuppressWarnings("serial")
+  @Override
+  public void init(Configuration conf) throws YarnException {
+    LOG.info("CachedHistoryStorage Init");
+
+    loadedJobCacheSize = conf.getInt(
+        JHAdminConfig.MR_HISTORY_LOADED_JOB_CACHE_SIZE,
+        JHAdminConfig.DEFAULT_MR_HISTORY_LOADED_JOB_CACHE_SIZE);
+
+    loadedJobCache = Collections.synchronizedMap(new LinkedHashMap<JobId, Job>(
+        loadedJobCacheSize + 1, 0.75f, true) {
+      @Override
+      public boolean removeEldestEntry(final Map.Entry<JobId, Job> eldest) {
+        return super.size() > loadedJobCacheSize;
+      }
+    });
+
+    super.init(conf);
+  }
+
+  public CachedHistoryStorage() {
+    super(CachedHistoryStorage.class.getName());
+  }
+
+  private Job loadJob(MetaInfo metaInfo) {
+    try {
+      Job job = hsManager.loadJob(metaInfo);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Adding " + job.getID() + " to loaded job cache");
+      }
+      loadedJobCache.put(job.getID(), job);
+      return job;
+    } catch (IOException e) {
+      throw new YarnException(
+          "Could not find/load job: " + metaInfo.getJobId(), e);
+    }
+  }
+
+  @Override
+  public synchronized Job getFullJob(JobId jobId) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Looking for Job " + jobId);
+    }
+    try {
+      Job result = loadedJobCache.get(jobId);
+      if (result == null) {
+        MetaInfo metaInfo = hsManager.getMetaInfo(jobId);
+        if (metaInfo != null) {
+          result = loadJob(metaInfo);
+        }
+      }
+      return result;
+    } catch (IOException e) {
+      throw new YarnException(e);
+    }
+  }
+
+  @Override
+  public Map<JobId, Job> getAllPartialJobs() {
+    LOG.debug("Called getAllPartialJobs()");
+    SortedMap<JobId, Job> result = new TreeMap<JobId, Job>();
+    try {
+      for (MetaInfo mi : hsManager.getAllMetaInfo()) {
+        if (mi != null) {
+          JobId id = mi.getJobId();
+          result.put(id, new PartialJob(mi.getJobIndexInfo(), id));
+        }
+      }
+    } catch (IOException e) {
+      LOG.warn("Error trying to scan for all MetaInfos", e);
+      throw new YarnException(e);
+    }
+    return result;
+  }
+
+  @Override
+  public void jobRemovedFromHDFS(JobId jobId) {
+    loadedJobCache.remove(jobId);
+  }
+
+  @Override
+  public JobsInfo getPartialJobs(Long offset, Long count, String user,
+      String queue, Long sBegin, Long sEnd, Long fBegin, Long fEnd,
+      JobState jobState) {
+    return getPartialJobs(getAllPartialJobs().values(), offset, count, user,
+        queue, sBegin, sEnd, fBegin, fEnd, jobState);
+  }
+
+  public static JobsInfo getPartialJobs(Collection<Job> jobs, Long offset,
+      Long count, String user, String queue, Long sBegin, Long sEnd,
+      Long fBegin, Long fEnd, JobState jobState) {
+    JobsInfo allJobs = new JobsInfo();
+
+    if (sBegin == null || sBegin < 0)
+      sBegin = 0l;
+    if (sEnd == null)
+      sEnd = Long.MAX_VALUE;
+    if (fBegin == null || fBegin < 0)
+      fBegin = 0l;
+    if (fEnd == null)
+      fEnd = Long.MAX_VALUE;
+    if (offset == null || offset < 0)
+      offset = 0l;
+    if (count == null)
+      count = Long.MAX_VALUE;
+
+    if (offset > jobs.size()) {
+      return allJobs;
+    }
+
+    long at = 0;
+    long end = offset + count - 1;
+    if (end < 0) { // due to overflow
+      end = Long.MAX_VALUE;
+    }
+    for (Job job : jobs) {
+      if (at > end) {
+        break;
+      }
+
+      // can't really validate queue is a valid one since queues could change
+      if (queue != null && !queue.isEmpty()) {
+        if (!job.getQueueName().equals(queue)) {
+          continue;
+        }
+      }
+
+      if (user != null && !user.isEmpty()) {
+        if (!job.getUserName().equals(user)) {
+          continue;
+        }
+      }
+
+      JobReport report = job.getReport();
+
+      if (report.getStartTime() < sBegin || report.getStartTime() > sEnd) {
+        continue;
+      }
+      if (report.getFinishTime() < fBegin || report.getFinishTime() > fEnd) {
+        continue;
+      }
+      if (jobState != null && jobState != report.getJobState()) {
+        continue;
+      }
+
+      at++;
+      if ((at - 1) < offset) {
+        continue;
+      }
+
+      JobInfo jobInfo = new JobInfo(job);
+
+      allJobs.add(jobInfo);
+    }
+    return allJobs;
+  }
+}

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryContext.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryContext.java?rev=1311896&r1=1311895&r2=1311896&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryContext.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryContext.java Tue Apr 10 18:11:26 2012
@@ -24,8 +24,13 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobsInfo;
 
 public interface HistoryContext extends AppContext {
 
   Map<JobId, Job> getAllJobs(ApplicationId appID);
+
+  JobsInfo getPartialJobs(Long offset, Long count, String user,
+      String queue, Long sBegin, Long sEnd, Long fBegin, Long fEnd, JobState jobState);
 }

Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java?rev=1311896&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java Tue Apr 10 18:11:26 2012
@@ -0,0 +1,763 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.hs;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.mapred.JobACLsManager;
+import org.apache.hadoop.mapreduce.jobhistory.JobSummary;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.service.AbstractService;
+
+/**
+ * This class provides a way to interact with history files in a thread safe
+ * manor.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public class HistoryFileManager extends AbstractService {
+  private static final Log LOG = LogFactory.getLog(HistoryFileManager.class);
+  private static final Log SUMMARY_LOG = LogFactory.getLog(JobSummary.class);
+
+  private static String DONE_BEFORE_SERIAL_TAIL = JobHistoryUtils
+      .doneSubdirsBeforeSerialTail();
+
+  public static class MetaInfo {
+    private Path historyFile;
+    private Path confFile;
+    private Path summaryFile;
+    private JobIndexInfo jobIndexInfo;
+
+    public MetaInfo(Path historyFile, Path confFile, Path summaryFile,
+        JobIndexInfo jobIndexInfo) {
+      this.historyFile = historyFile;
+      this.confFile = confFile;
+      this.summaryFile = summaryFile;
+      this.jobIndexInfo = jobIndexInfo;
+    }
+
+    private Path getHistoryFile() {
+      return historyFile;
+    }
+
+    private Path getConfFile() {
+      return confFile;
+    }
+
+    private Path getSummaryFile() {
+      return summaryFile;
+    }
+
+    public JobIndexInfo getJobIndexInfo() {
+      return jobIndexInfo;
+    }
+
+    public JobId getJobId() {
+      return jobIndexInfo.getJobId();
+    }
+
+    private void setHistoryFile(Path historyFile) {
+      this.historyFile = historyFile;
+    }
+
+    private void setConfFile(Path confFile) {
+      this.confFile = confFile;
+    }
+
+    private void setSummaryFile(Path summaryFile) {
+      this.summaryFile = summaryFile;
+    }
+  }
+
+  /**
+   * Maps between a serial number (generated based on jobId) and the timestamp
+   * component(s) to which it belongs. Facilitates jobId based searches. If a
+   * jobId is not found in this list - it will not be found.
+   */
+  private final SortedMap<String, Set<String>> idToDateString = 
+    new TreeMap<String, Set<String>>();
+  // The number of entries in idToDateString
+  private int dateStringCacheSize;
+
+  // Maintains minimal details for recent jobs (parsed from history file name).
+  // Sorted on Job Completion Time.
+  private final SortedMap<JobId, MetaInfo> jobListCache = 
+    new ConcurrentSkipListMap<JobId, MetaInfo>();
+  // The number of jobs to maintain in the job list cache.
+  private int jobListCacheSize;
+
+  // Re-use existing MetaInfo objects if they exist for the specific JobId.
+  // (synchronization on MetaInfo)
+  // Check for existence of the object when using iterators.
+  private final SortedMap<JobId, MetaInfo> intermediateListCache = 
+    new ConcurrentSkipListMap<JobId, MetaInfo>();
+
+  // Maintains a list of known done subdirectories.
+  private final Set<Path> existingDoneSubdirs = new HashSet<Path>();
+
+  /**
+   * Maintains a mapping between intermediate user directories and the last
+   * known modification time.
+   */
+  private Map<String, Long> userDirModificationTimeMap = 
+    new HashMap<String, Long>();
+
+  private JobACLsManager aclsMgr;
+
+  private Configuration conf;
+
+  // TODO Remove me!!!!
+  private boolean debugMode;
+  private String serialNumberFormat;
+
+  private Path doneDirPrefixPath = null; // folder for completed jobs
+  private FileContext doneDirFc; // done Dir FileContext
+
+  private Path intermediateDoneDirPath = null; // Intermediate Done Dir Path
+  private FileContext intermediateDoneDirFc; // Intermediate Done Dir
+                                             // FileContext
+
+  public HistoryFileManager() {
+    super(HistoryFileManager.class.getName());
+  }
+
+  @Override
+  public void init(Configuration conf) {
+    this.conf = conf;
+
+    debugMode = conf.getBoolean(JHAdminConfig.MR_HISTORY_DEBUG_MODE, false);
+    int serialNumberLowDigits = debugMode ? 1 : 3;
+    serialNumberFormat = ("%0"
+        + (JobHistoryUtils.SERIAL_NUMBER_DIRECTORY_DIGITS + serialNumberLowDigits)
+        + "d");
+
+    String doneDirPrefix = null;
+    doneDirPrefix = JobHistoryUtils
+        .getConfiguredHistoryServerDoneDirPrefix(conf);
+    try {
+      doneDirPrefixPath = FileContext.getFileContext(conf).makeQualified(
+          new Path(doneDirPrefix));
+      doneDirFc = FileContext.getFileContext(doneDirPrefixPath.toUri(), conf);
+      mkdir(doneDirFc, doneDirPrefixPath, new FsPermission(
+          JobHistoryUtils.HISTORY_DONE_DIR_PERMISSION));
+    } catch (IOException e) {
+      throw new YarnException("Error creating done directory: ["
+          + doneDirPrefixPath + "]", e);
+    }
+
+    String intermediateDoneDirPrefix = null;
+    intermediateDoneDirPrefix = JobHistoryUtils
+        .getConfiguredHistoryIntermediateDoneDirPrefix(conf);
+    try {
+      intermediateDoneDirPath = FileContext.getFileContext(conf).makeQualified(
+          new Path(intermediateDoneDirPrefix));
+      intermediateDoneDirFc = FileContext.getFileContext(
+          intermediateDoneDirPath.toUri(), conf);
+      mkdir(intermediateDoneDirFc, intermediateDoneDirPath, new FsPermission(
+          JobHistoryUtils.HISTORY_INTERMEDIATE_DONE_DIR_PERMISSIONS.toShort()));
+    } catch (IOException e) {
+      LOG.info("error creating done directory on dfs " + e);
+      throw new YarnException("Error creating intermediate done directory: ["
+          + intermediateDoneDirPath + "]", e);
+    }
+
+    this.aclsMgr = new JobACLsManager(conf);
+
+    jobListCacheSize = conf.getInt(JHAdminConfig.MR_HISTORY_JOBLIST_CACHE_SIZE,
+        JHAdminConfig.DEFAULT_MR_HISTORY_JOBLIST_CACHE_SIZE);
+
+    dateStringCacheSize = conf.getInt(
+        JHAdminConfig.MR_HISTORY_DATESTRING_CACHE_SIZE,
+        JHAdminConfig.DEFAULT_MR_HISTORY_DATESTRING_CACHE_SIZE);
+
+    super.init(conf);
+  }
+
+  private void mkdir(FileContext fc, Path path, FsPermission fsp)
+      throws IOException {
+    if (!fc.util().exists(path)) {
+      try {
+        fc.mkdir(path, fsp, true);
+
+        FileStatus fsStatus = fc.getFileStatus(path);
+        LOG.info("Perms after creating " + fsStatus.getPermission().toShort()
+            + ", Expected: " + fsp.toShort());
+        if (fsStatus.getPermission().toShort() != fsp.toShort()) {
+          LOG.info("Explicitly setting permissions to : " + fsp.toShort()
+              + ", " + fsp);
+          fc.setPermission(path, fsp);
+        }
+      } catch (FileAlreadyExistsException e) {
+        LOG.info("Directory: [" + path + "] already exists.");
+      }
+    }
+  }
+
+  /**
+   * Populates index data structures. Should only be called at initialization
+   * times.
+   */
+  @SuppressWarnings("unchecked")
+  void initExisting() throws IOException {
+    LOG.info("Initializing Existing Jobs...");
+    List<FileStatus> timestampedDirList = findTimestampedDirectories();
+    Collections.sort(timestampedDirList);
+    for (FileStatus fs : timestampedDirList) {
+      // TODO Could verify the correct format for these directories.
+      addDirectoryToSerialNumberIndex(fs.getPath());
+      addDirectoryToJobListCache(fs.getPath());
+    }
+  }
+
+  private void removeDirectoryFromSerialNumberIndex(Path serialDirPath) {
+    String serialPart = serialDirPath.getName();
+    String timeStampPart = JobHistoryUtils
+        .getTimestampPartFromPath(serialDirPath.toString());
+    if (timeStampPart == null) {
+      LOG.warn("Could not find timestamp portion from path: "
+          + serialDirPath.toString() + ". Continuing with next");
+      return;
+    }
+    if (serialPart == null) {
+      LOG.warn("Could not find serial portion from path: "
+          + serialDirPath.toString() + ". Continuing with next");
+      return;
+    }
+    synchronized (idToDateString) {
+      // TODO make this thread safe without the synchronize
+      if (idToDateString.containsKey(serialPart)) {
+        Set<String> set = idToDateString.get(serialPart);
+        set.remove(timeStampPart);
+        if (set.isEmpty()) {
+          idToDateString.remove(serialPart);
+        }
+      }
+    }
+  }
+
+  private void addDirectoryToSerialNumberIndex(Path serialDirPath) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Adding " + serialDirPath + " to serial index");
+    }
+    String serialPart = serialDirPath.getName();
+    String timestampPart = JobHistoryUtils
+        .getTimestampPartFromPath(serialDirPath.toString());
+    if (timestampPart == null) {
+      LOG.warn("Could not find timestamp portion from path: " + serialDirPath
+          + ". Continuing with next");
+      return;
+    }
+    if (serialPart == null) {
+      LOG.warn("Could not find serial portion from path: "
+          + serialDirPath.toString() + ". Continuing with next");
+    }
+    addToSerialNumberIndex(serialPart, timestampPart);
+  }
+
+  private void addToSerialNumberIndex(String serialPart, String timestampPart) {
+    synchronized (idToDateString) {
+      // TODO make this thread safe without the synchronize
+      if (!idToDateString.containsKey(serialPart)) {
+        idToDateString.put(serialPart, new HashSet<String>());
+        if (idToDateString.size() > dateStringCacheSize) {
+          idToDateString.remove(idToDateString.firstKey());
+        }
+        Set<String> datePartSet = idToDateString.get(serialPart);
+        datePartSet.add(timestampPart);
+      }
+    }
+  }
+
+  private void addDirectoryToJobListCache(Path path) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Adding " + path + " to job list cache.");
+    }
+    List<FileStatus> historyFileList = scanDirectoryForHistoryFiles(path,
+        doneDirFc);
+    for (FileStatus fs : historyFileList) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Adding in history for " + fs.getPath());
+      }
+      JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath()
+          .getName());
+      String confFileName = JobHistoryUtils
+          .getIntermediateConfFileName(jobIndexInfo.getJobId());
+      String summaryFileName = JobHistoryUtils
+          .getIntermediateSummaryFileName(jobIndexInfo.getJobId());
+      MetaInfo metaInfo = new MetaInfo(fs.getPath(), new Path(fs.getPath()
+          .getParent(), confFileName), new Path(fs.getPath().getParent(),
+          summaryFileName), jobIndexInfo);
+      addToJobListCache(metaInfo);
+    }
+  }
+
+  private static List<FileStatus> scanDirectory(Path path, FileContext fc,
+      PathFilter pathFilter) throws IOException {
+    path = fc.makeQualified(path);
+    List<FileStatus> jhStatusList = new ArrayList<FileStatus>();
+    RemoteIterator<FileStatus> fileStatusIter = fc.listStatus(path);
+    while (fileStatusIter.hasNext()) {
+      FileStatus fileStatus = fileStatusIter.next();
+      Path filePath = fileStatus.getPath();
+      if (fileStatus.isFile() && pathFilter.accept(filePath)) {
+        jhStatusList.add(fileStatus);
+      }
+    }
+    return jhStatusList;
+  }
+
+  private static List<FileStatus> scanDirectoryForHistoryFiles(Path path,
+      FileContext fc) throws IOException {
+    return scanDirectory(path, fc, JobHistoryUtils.getHistoryFileFilter());
+  }
+
+  /**
+   * Finds all history directories with a timestamp component by scanning the
+   * filesystem. Used when the JobHistory server is started.
+   * 
+   * @return
+   */
+  private List<FileStatus> findTimestampedDirectories() throws IOException {
+    List<FileStatus> fsList = JobHistoryUtils.localGlobber(doneDirFc,
+        doneDirPrefixPath, DONE_BEFORE_SERIAL_TAIL);
+    return fsList;
+  }
+
+  private void addToJobListCache(MetaInfo metaInfo) {
+    JobId jobId = metaInfo.getJobIndexInfo().getJobId();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Adding " + jobId + " to job list cache with "
+          + metaInfo.getJobIndexInfo());
+    }
+    jobListCache.put(jobId, metaInfo);
+    if (jobListCache.size() > jobListCacheSize) {
+      jobListCache.remove(jobListCache.firstKey());
+    }
+  }
+
+  /**
+   * Scans the intermediate directory to find user directories. Scans these for
+   * history files if the modification time for the directory has changed.
+   * 
+   * @throws IOException
+   */
+  private void scanIntermediateDirectory() throws IOException {
+    List<FileStatus> userDirList = JobHistoryUtils.localGlobber(
+        intermediateDoneDirFc, intermediateDoneDirPath, "");
+
+    for (FileStatus userDir : userDirList) {
+      String name = userDir.getPath().getName();
+      long newModificationTime = userDir.getModificationTime();
+      boolean shouldScan = false;
+      synchronized (userDirModificationTimeMap) {
+        if (!userDirModificationTimeMap.containsKey(name)
+            || newModificationTime > userDirModificationTimeMap.get(name)) {
+          shouldScan = true;
+          userDirModificationTimeMap.put(name, newModificationTime);
+        }
+      }
+      if (shouldScan) {
+        scanIntermediateDirectory(userDir.getPath());
+      }
+    }
+  }
+
+  /**
+   * Scans the specified path and populates the intermediate cache.
+   * 
+   * @param absPath
+   * @throws IOException
+   */
+  private void scanIntermediateDirectory(final Path absPath) throws IOException {
+    List<FileStatus> fileStatusList = scanDirectoryForHistoryFiles(absPath,
+        intermediateDoneDirFc);
+    for (FileStatus fs : fileStatusList) {
+      JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath()
+          .getName());
+      String confFileName = JobHistoryUtils
+          .getIntermediateConfFileName(jobIndexInfo.getJobId());
+      String summaryFileName = JobHistoryUtils
+          .getIntermediateSummaryFileName(jobIndexInfo.getJobId());
+      MetaInfo metaInfo = new MetaInfo(fs.getPath(), new Path(fs.getPath()
+          .getParent(), confFileName), new Path(fs.getPath().getParent(),
+          summaryFileName), jobIndexInfo);
+      if (!intermediateListCache.containsKey(jobIndexInfo.getJobId())) {
+        intermediateListCache.put(jobIndexInfo.getJobId(), metaInfo);
+      }
+    }
+  }
+
+  /**
+   * Searches the job history file FileStatus list for the specified JobId.
+   * 
+   * @param fileStatusList
+   *          fileStatus list of Job History Files.
+   * @param jobId
+   *          The JobId to find.
+   * @return A MetaInfo object for the jobId, null if not found.
+   * @throws IOException
+   */
+  private MetaInfo getJobMetaInfo(List<FileStatus> fileStatusList, JobId jobId)
+      throws IOException {
+    for (FileStatus fs : fileStatusList) {
+      JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath()
+          .getName());
+      if (jobIndexInfo.getJobId().equals(jobId)) {
+        String confFileName = JobHistoryUtils
+            .getIntermediateConfFileName(jobIndexInfo.getJobId());
+        String summaryFileName = JobHistoryUtils
+            .getIntermediateSummaryFileName(jobIndexInfo.getJobId());
+        MetaInfo metaInfo = new MetaInfo(fs.getPath(), new Path(fs.getPath()
+            .getParent(), confFileName), new Path(fs.getPath().getParent(),
+            summaryFileName), jobIndexInfo);
+        return metaInfo;
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Scans old directories known by the idToDateString map for the specified
+   * jobId. If the number of directories is higher than the supported size of
+   * the idToDateString cache, the jobId will not be found.
+   * 
+   * @param jobId
+   *          the jobId.
+   * @return
+   * @throws IOException
+   */
+  private MetaInfo scanOldDirsForJob(JobId jobId) throws IOException {
+    int jobSerialNumber = JobHistoryUtils.jobSerialNumber(jobId);
+    String boxedSerialNumber = String.valueOf(jobSerialNumber);
+    Set<String> dateStringSet;
+    synchronized (idToDateString) {
+      Set<String> found = idToDateString.get(boxedSerialNumber);
+      if (found == null) {
+        return null;
+      } else {
+        dateStringSet = new HashSet<String>(found);
+      }
+    }
+    for (String timestampPart : dateStringSet) {
+      Path logDir = canonicalHistoryLogPath(jobId, timestampPart);
+      List<FileStatus> fileStatusList = scanDirectoryForHistoryFiles(logDir,
+          doneDirFc);
+      MetaInfo metaInfo = getJobMetaInfo(fileStatusList, jobId);
+      if (metaInfo != null) {
+        return metaInfo;
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Checks for the existence of the job history file in the intermediate
+   * directory.
+   * 
+   * @param jobId
+   * @return
+   * @throws IOException
+   */
+  private MetaInfo scanIntermediateForJob(JobId jobId) throws IOException {
+    scanIntermediateDirectory();
+    return intermediateListCache.get(jobId);
+  }
+
+  /**
+   * Parse a job from the JobHistoryFile, if the underlying file is not going to
+   * be deleted.
+   * 
+   * @param metaInfo
+   *          the where the JobHistory is stored.
+   * @return the Job or null if the underlying file was deleted.
+   * @throws IOException
+   *           if there is an error trying to read the file.
+   */
+  public Job loadJob(MetaInfo metaInfo) throws IOException {
+    return new CompletedJob(conf, metaInfo.getJobIndexInfo().getJobId(),
+        metaInfo.getHistoryFile(), false, metaInfo.getJobIndexInfo().getUser(),
+        metaInfo.getConfFile(), aclsMgr);
+  }
+
+  public Collection<MetaInfo> getAllMetaInfo() throws IOException {
+    scanIntermediateDirectory();
+    ArrayList<MetaInfo> result = new ArrayList<MetaInfo>();
+    result.addAll(intermediateListCache.values());
+    result.addAll(jobListCache.values());
+    return result;
+  }
+
+  Collection<MetaInfo> getIntermediateMetaInfos() throws IOException {
+    scanIntermediateDirectory();
+    return intermediateListCache.values();
+  }
+
+  public MetaInfo getMetaInfo(JobId jobId) throws IOException {
+    // MetaInfo available in cache.
+    MetaInfo metaInfo = null;
+    if (jobListCache.containsKey(jobId)) {
+      metaInfo = jobListCache.get(jobId);
+    }
+
+    if (metaInfo != null) {
+      return metaInfo;
+    }
+
+    // MetaInfo not available. Check intermediate directory for meta info.
+    metaInfo = scanIntermediateForJob(jobId);
+    if (metaInfo != null) {
+      return metaInfo;
+    }
+
+    // Intermediate directory does not contain job. Search through older ones.
+    metaInfo = scanOldDirsForJob(jobId);
+    if (metaInfo != null) {
+      return metaInfo;
+    }
+    return null;
+  }
+
+  void moveToDone(MetaInfo metaInfo) throws IOException {
+    long completeTime = metaInfo.getJobIndexInfo().getFinishTime();
+    if (completeTime == 0)
+      completeTime = System.currentTimeMillis();
+    JobId jobId = metaInfo.getJobIndexInfo().getJobId();
+
+    List<Path> paths = new ArrayList<Path>();
+    Path historyFile = metaInfo.getHistoryFile();
+    if (historyFile == null) {
+      LOG.info("No file for job-history with " + jobId + " found in cache!");
+    } else {
+      paths.add(historyFile);
+    }
+
+    Path confFile = metaInfo.getConfFile();
+    if (confFile == null) {
+      LOG.info("No file for jobConf with " + jobId + " found in cache!");
+    } else {
+      paths.add(confFile);
+    }
+
+    // TODO Check all mi getters and setters for the conf path
+    Path summaryFile = metaInfo.getSummaryFile();
+    if (summaryFile == null) {
+      LOG.info("No summary file for job: " + jobId);
+    } else {
+      try {
+        String jobSummaryString = getJobSummary(intermediateDoneDirFc,
+            summaryFile);
+        SUMMARY_LOG.info(jobSummaryString);
+        LOG.info("Deleting JobSummary file: [" + summaryFile + "]");
+        intermediateDoneDirFc.delete(summaryFile, false);
+        metaInfo.setSummaryFile(null);
+      } catch (IOException e) {
+        LOG.warn("Failed to process summary file: [" + summaryFile + "]");
+        throw e;
+      }
+    }
+
+    Path targetDir = canonicalHistoryLogPath(jobId, completeTime);
+    addDirectoryToSerialNumberIndex(targetDir);
+    try {
+      makeDoneSubdir(targetDir);
+    } catch (IOException e) {
+      LOG.warn("Failed creating subdirectory: " + targetDir
+          + " while attempting to move files for jobId: " + jobId);
+      throw e;
+    }
+    synchronized (metaInfo) {
+      if (historyFile != null) {
+        Path toPath = doneDirFc.makeQualified(new Path(targetDir, historyFile
+            .getName()));
+        try {
+          moveToDoneNow(historyFile, toPath);
+        } catch (IOException e) {
+          LOG.warn("Failed to move file: " + historyFile + " for jobId: "
+              + jobId);
+          throw e;
+        }
+        metaInfo.setHistoryFile(toPath);
+      }
+      if (confFile != null) {
+        Path toPath = doneDirFc.makeQualified(new Path(targetDir, confFile
+            .getName()));
+        try {
+          moveToDoneNow(confFile, toPath);
+        } catch (IOException e) {
+          LOG.warn("Failed to move file: " + historyFile + " for jobId: "
+              + jobId);
+          throw e;
+        }
+        metaInfo.setConfFile(toPath);
+      }
+    }
+    addToJobListCache(metaInfo);
+    intermediateListCache.remove(jobId);
+  }
+
+  private void moveToDoneNow(final Path src, final Path target)
+      throws IOException {
+    LOG.info("Moving " + src.toString() + " to " + target.toString());
+    intermediateDoneDirFc.rename(src, target, Options.Rename.NONE);
+  }
+
+  private String getJobSummary(FileContext fc, Path path) throws IOException {
+    Path qPath = fc.makeQualified(path);
+    FSDataInputStream in = fc.open(qPath);
+    String jobSummaryString = in.readUTF();
+    in.close();
+    return jobSummaryString;
+  }
+
+  private void makeDoneSubdir(Path path) throws IOException {
+    boolean existsInExistingCache = false;
+    synchronized (existingDoneSubdirs) {
+      if (existingDoneSubdirs.contains(path))
+        existsInExistingCache = true;
+    }
+    try {
+      doneDirFc.getFileStatus(path);
+      if (!existsInExistingCache) {
+        existingDoneSubdirs.add(path);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("JobHistory.maybeMakeSubdirectory -- We believed " + path
+              + " already existed, but it didn't.");
+        }
+      }
+    } catch (FileNotFoundException fnfE) {
+      try {
+        FsPermission fsp = new FsPermission(
+            JobHistoryUtils.HISTORY_DONE_DIR_PERMISSION);
+        doneDirFc.mkdir(path, fsp, true);
+        FileStatus fsStatus = doneDirFc.getFileStatus(path);
+        LOG.info("Perms after creating " + fsStatus.getPermission().toShort()
+            + ", Expected: " + fsp.toShort());
+        if (fsStatus.getPermission().toShort() != fsp.toShort()) {
+          LOG.info("Explicitly setting permissions to : " + fsp.toShort()
+              + ", " + fsp);
+          doneDirFc.setPermission(path, fsp);
+        }
+        synchronized (existingDoneSubdirs) {
+          existingDoneSubdirs.add(path);
+        }
+      } catch (FileAlreadyExistsException faeE) { 
+        // Nothing to do.
+      }
+    }
+  }
+
+  private Path canonicalHistoryLogPath(JobId id, String timestampComponent) {
+    return new Path(doneDirPrefixPath, JobHistoryUtils.historyLogSubdirectory(
+        id, timestampComponent, serialNumberFormat));
+  }
+
+  private Path canonicalHistoryLogPath(JobId id, long millisecondTime) {
+    String timestampComponent = JobHistoryUtils.timestampDirectoryComponent(
+        millisecondTime, debugMode);
+    return new Path(doneDirPrefixPath, JobHistoryUtils.historyLogSubdirectory(
+        id, timestampComponent, serialNumberFormat));
+  }
+
+  private long getEffectiveTimestamp(long finishTime, FileStatus fileStatus) {
+    if (finishTime == 0) {
+      return fileStatus.getModificationTime();
+    }
+    return finishTime;
+  }
+
+  private void deleteJobFromDone(MetaInfo metaInfo) throws IOException {
+    jobListCache.remove(metaInfo.getJobId());
+    doneDirFc.delete(doneDirFc.makeQualified(metaInfo.getHistoryFile()), false);
+    doneDirFc.delete(doneDirFc.makeQualified(metaInfo.getConfFile()), false);
+  }
+
+  @SuppressWarnings("unchecked")
+  void clean(long cutoff, HistoryStorage storage) throws IOException {
+    // TODO this should be replaced by something that knows about the directory
+    // structure and will put less of a load on HDFS.
+    boolean halted = false;
+    // TODO Delete YYYY/MM/DD directories.
+    List<FileStatus> serialDirList = findTimestampedDirectories();
+    // Sort in ascending order. Relies on YYYY/MM/DD/Serial
+    Collections.sort(serialDirList);
+    for (FileStatus serialDir : serialDirList) {
+      List<FileStatus> historyFileList = scanDirectoryForHistoryFiles(
+          serialDir.getPath(), doneDirFc);
+      for (FileStatus historyFile : historyFileList) {
+        JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(historyFile
+            .getPath().getName());
+        long effectiveTimestamp = getEffectiveTimestamp(
+            jobIndexInfo.getFinishTime(), historyFile);
+        if (effectiveTimestamp <= cutoff) {
+          String confFileName = JobHistoryUtils
+              .getIntermediateConfFileName(jobIndexInfo.getJobId());
+          MetaInfo metaInfo = new MetaInfo(historyFile.getPath(), new Path(
+              historyFile.getPath().getParent(), confFileName), null,
+              jobIndexInfo);
+          storage.jobRemovedFromHDFS(metaInfo.getJobId());
+          deleteJobFromDone(metaInfo);
+        } else {
+          halted = true;
+          break;
+        }
+      }
+      if (!halted) {
+        doneDirFc.delete(doneDirFc.makeQualified(serialDir.getPath()), true);
+        removeDirectoryFromSerialNumberIndex(serialDir.getPath());
+        synchronized (existingDoneSubdirs) {
+          existingDoneSubdirs.remove(serialDir.getPath());
+        }
+      } else {
+        break; // Don't scan any more directories.
+      }
+    }
+  }
+}

Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryStorage.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryStorage.java?rev=1311896&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryStorage.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryStorage.java Tue Apr 10 18:11:26 2012
@@ -0,0 +1,80 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.hs;
+
+import java.util.Map;
+
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobsInfo;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Provides an API to query jobs that have finished. 
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public interface HistoryStorage {
+  
+  /**
+   * Give the Storage a reference to a class that can be used to interact with
+   * history files.
+   * @param hsManager the class that is used to interact with history files.
+   */
+  void setHistoryFileManager(HistoryFileManager hsManager);
+  
+  /**
+   * Look for a set of partial jobs.
+   * @param offset the offset into the list of jobs.
+   * @param count the maximum number of jobs to return.
+   * @param user only return jobs for the given user.
+   * @param queue only return jobs for in the given queue.
+   * @param sBegin only return Jobs that started on or after the given time.
+   * @param sEnd only return Jobs that started on or before the given time.
+   * @param fBegin only return Jobs that ended on or after the given time.
+   * @param fEnd only return Jobs that ended on or before the given time.
+   * @param jobState only return Jobs that are in the given job state.
+   * @return The list of filtered jobs.
+   */
+  JobsInfo getPartialJobs(Long offset, Long count, String user, 
+      String queue, Long sBegin, Long sEnd, Long fBegin, Long fEnd, 
+      JobState jobState);
+  
+  /**
+   * Get all of the cached jobs.  This only returns partial jobs and is here for
+   * legacy reasons.
+   * @return all of the cached jobs
+   */
+  Map<JobId, Job> getAllPartialJobs();
+  
+  /**
+   * Get a fully parsed job.
+   * @param jobId the id of the job
+   * @return the job, or null if it is not found.
+   */
+  Job getFullJob(JobId jobId);
+
+  /**
+   * Informs the Storage that a job has been removed from HDFS
+   * @param jobId the ID of the job that was removed.
+   */
+  void jobRemovedFromHDFS(JobId jobId);
+}



Mime
View raw message