hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tgra...@apache.org
Subject svn commit: r1311896 [2/2] - in /hadoop/common/trunk/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/ hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/mai...
Date Tue, 10 Apr 2012 18:11:27 GMT
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java?rev=1311896&r1=1311895&r2=1311896&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java Tue Apr 10 18:11:26 2012
@@ -1,36 +1,26 @@
 /**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 
 package org.apache.hadoop.mapreduce.v2.hs;
 
-import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.List;
 import java.util.Map;
-import java.util.Set;
-import java.util.SortedMap;
-import java.util.TreeMap;
-import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.ThreadFactory;
@@ -41,26 +31,16 @@ import java.util.regex.Pattern;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileAlreadyExistsException;
-import org.apache.hadoop.fs.FileContext;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Options;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.fs.RemoteIterator;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.mapred.JobACLsManager;
 import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TypeConverter;
-import org.apache.hadoop.mapreduce.jobhistory.JobSummary;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
-import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils;
+import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.MetaInfo;
+import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobsInfo;
 import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
-import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
-import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo;
+import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.yarn.Clock;
 import org.apache.hadoop.yarn.ClusterInfo;
 import org.apache.hadoop.yarn.YarnException;
@@ -69,106 +49,36 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.hadoop.yarn.service.Service;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
-
-/*
+/**
  * Loads and manages the Job history cache.
  */
-public class JobHistory extends AbstractService implements HistoryContext   {
-
-  private static final int DEFAULT_JOBLIST_CACHE_SIZE = 20000;
-  private static final int DEFAULT_LOADEDJOB_CACHE_SIZE = 5;
-  private static final int DEFAULT_DATESTRING_CACHE_SIZE = 200000;
-  private static final long DEFAULT_MOVE_THREAD_INTERVAL = 3 * 60 * 1000l; //3 minutes
-  private static final int DEFAULT_MOVE_THREAD_COUNT = 3;
-  
-  static final long DEFAULT_HISTORY_MAX_AGE = 7 * 24 * 60 * 60 * 1000L; //1 week
-  static final long DEFAULT_RUN_INTERVAL = 1 * 24 * 60 * 60 * 1000l; //1 day
-  
+public class JobHistory extends AbstractService implements HistoryContext {
   private static final Log LOG = LogFactory.getLog(JobHistory.class);
 
-  private static final Log SUMMARY_LOG = LogFactory.getLog(JobSummary.class);
-  public static final Pattern CONF_FILENAME_REGEX =
-    Pattern.compile("(" + JobID.JOBID_REGEX + ")_conf.xml(?:\\.[0-9]+\\.old)?");
+  public static final Pattern CONF_FILENAME_REGEX = Pattern.compile("("
+      + JobID.JOBID_REGEX + ")_conf.xml(?:\\.[0-9]+\\.old)?");
   public static final String OLD_SUFFIX = ".old";
 
-  private static String DONE_BEFORE_SERIAL_TAIL = 
-    JobHistoryUtils.doneSubdirsBeforeSerialTail();
-  
-  /**
-   * Maps between a serial number (generated based on jobId) and the timestamp
-   * component(s) to which it belongs.
-   * Facilitates jobId based searches.
-   * If a jobId is not found in this list - it will not be found.
-   */
-  private final SortedMap<String, Set<String>> idToDateString = 
-    new ConcurrentSkipListMap<String, Set<String>>();
-
-  //Maintains minimal details for recent jobs (parsed from history file name).
-  //Sorted on Job Completion Time.
-  private final SortedMap<JobId, MetaInfo> jobListCache = 
-    new ConcurrentSkipListMap<JobId, MetaInfo>();
-  
-  
-  // Re-use exisiting MetaInfo objects if they exist for the specific JobId. (synchronization on MetaInfo)
-  // Check for existance of the object when using iterators.
-  private final SortedMap<JobId, MetaInfo> intermediateListCache = 
-    new ConcurrentSkipListMap<JobId, JobHistory.MetaInfo>();
-  
-  //Maintains a list of known done subdirectories. Not currently used.
-  private final Set<Path> existingDoneSubdirs = new HashSet<Path>();
-
-  private Map<JobId, Job> loadedJobCache = null;
-
-  /**
-   * Maintains a mapping between intermediate user directories and the last 
-   * known modification time.
-   */
-  private Map<String, Long> userDirModificationTimeMap = 
-    new HashMap<String, Long>();
-  
-  //The number of jobs to maintain in the job list cache.
-  private int jobListCacheSize;
-  
-  private JobACLsManager aclsMgr;
-  
-  //The number of loaded jobs.
-  private int loadedJobCacheSize;
-  
-  //The number of entries in idToDateString
-  private int dateStringCacheSize;
-
-  //Time interval for the move thread.
+  // Time interval for the move thread.
   private long moveThreadInterval;
-  
-  //Number of move threads.
+
+  // Number of move threads.
   private int numMoveThreads;
-  
-  private Configuration conf;
 
-  private boolean debugMode;
-  private int serialNumberLowDigits;
-  private String serialNumberFormat;
-  
-
-  private Path doneDirPrefixPath = null; // folder for completed jobs
-  private FileContext doneDirFc; // done Dir FileContext
-  
-  private Path intermediateDoneDirPath = null; //Intermediate Done Dir Path
-  private FileContext intermediateDoneDirFc; //Intermediate Done Dir FileContext
+  private Configuration conf;
 
   private Thread moveIntermediateToDoneThread = null;
   private MoveIntermediateToDoneRunnable moveIntermediateToDoneRunnable = null;
+
   private ScheduledThreadPoolExecutor cleanerScheduledExecutor = null;
-  
-  /**
-   * Writes out files to the path
-   * .....${DONE_DIR}/VERSION_STRING/YYYY/MM/DD/HH/SERIAL_NUM/jh{index_entries}.jhist
-   */
 
-  @SuppressWarnings("serial")
+  private HistoryStorage storage = null;
+  private HistoryFileManager hsManager = null;
+
   @Override
   public void init(Configuration conf) throws YarnException {
     LOG.info("JobHistory Init");
@@ -176,121 +86,66 @@ public class JobHistory extends Abstract
     this.appID = RecordFactoryProvider.getRecordFactory(conf)
         .newRecordInstance(ApplicationId.class);
     this.appAttemptID = RecordFactoryProvider.getRecordFactory(conf)
-    .newRecordInstance(ApplicationAttemptId.class);
-
-    debugMode = conf.getBoolean(JHAdminConfig.MR_HISTORY_DEBUG_MODE, false);
-    serialNumberLowDigits = debugMode ? 1 : 3;
-    serialNumberFormat = ("%0"
-        + (JobHistoryUtils.SERIAL_NUMBER_DIRECTORY_DIGITS 
-            + serialNumberLowDigits) + "d");
+        .newRecordInstance(ApplicationAttemptId.class);
 
-    String doneDirPrefix = null;
-    doneDirPrefix = JobHistoryUtils.getConfiguredHistoryServerDoneDirPrefix(conf);
-    try {
-      doneDirPrefixPath = FileContext.getFileContext(conf).makeQualified(
-          new Path(doneDirPrefix));
-      doneDirFc = FileContext.getFileContext(doneDirPrefixPath.toUri(), conf);
-      mkdir(doneDirFc, doneDirPrefixPath, new FsPermission(
-          JobHistoryUtils.HISTORY_DONE_DIR_PERMISSION));
-    } catch (IOException e) {
-      throw new YarnException("Error creating done directory: [" +
-          doneDirPrefixPath + "]", e);
-    }
-
-    String intermediateDoneDirPrefix = null;
-    intermediateDoneDirPrefix = JobHistoryUtils
-        .getConfiguredHistoryIntermediateDoneDirPrefix(conf);
-    try {
-      intermediateDoneDirPath = FileContext.getFileContext(conf)
-          .makeQualified(new Path(intermediateDoneDirPrefix));
-      intermediateDoneDirFc = FileContext.getFileContext(
-          intermediateDoneDirPath.toUri(), conf);
-      mkdir(intermediateDoneDirFc, intermediateDoneDirPath, new FsPermission(
-          JobHistoryUtils.HISTORY_INTERMEDIATE_DONE_DIR_PERMISSIONS.toShort()));
-    } catch (IOException e) {
-      LOG.info("error creating done directory on dfs " + e);
-      throw new YarnException("Error creating intermediate done directory: [" 
-          + intermediateDoneDirPath + "]", e);
-    }
-    
-    this.aclsMgr = new JobACLsManager(conf);
-    
-    jobListCacheSize = conf.getInt(JHAdminConfig.MR_HISTORY_JOBLIST_CACHE_SIZE,
-        DEFAULT_JOBLIST_CACHE_SIZE);
-    loadedJobCacheSize = conf.getInt(JHAdminConfig.MR_HISTORY_LOADED_JOB_CACHE_SIZE,
-        DEFAULT_LOADEDJOB_CACHE_SIZE);
-    dateStringCacheSize = conf.getInt(JHAdminConfig.MR_HISTORY_DATESTRING_CACHE_SIZE,
-        DEFAULT_DATESTRING_CACHE_SIZE);
-    moveThreadInterval =
-        conf.getLong(JHAdminConfig.MR_HISTORY_MOVE_INTERVAL_MS,
-            DEFAULT_MOVE_THREAD_INTERVAL);
+    moveThreadInterval = conf.getLong(
+        JHAdminConfig.MR_HISTORY_MOVE_INTERVAL_MS,
+        JHAdminConfig.DEFAULT_MR_HISTORY_MOVE_INTERVAL_MS);
     numMoveThreads = conf.getInt(JHAdminConfig.MR_HISTORY_MOVE_THREAD_COUNT,
-        DEFAULT_MOVE_THREAD_COUNT);
-    
-    loadedJobCache =
-        Collections.synchronizedMap(new LinkedHashMap<JobId, Job>(
-            loadedJobCacheSize + 1, 0.75f, true) {
-          @Override
-          public boolean removeEldestEntry(final Map.Entry<JobId, Job> eldest) {
-            return super.size() > loadedJobCacheSize;
-          }
-        });
-    
+        JHAdminConfig.DEFAULT_MR_HISTORY_MOVE_THREAD_COUNT);
+
+    hsManager = new HistoryFileManager();
+    hsManager.init(conf);
     try {
-      initExisting();
+      hsManager.initExisting();
     } catch (IOException e) {
       throw new YarnException("Failed to intialize existing directories", e);
     }
-    super.init(conf);
-  }
-  
-  private void mkdir(FileContext fc, Path path, FsPermission fsp)
-      throws IOException {
-    if (!fc.util().exists(path)) {
-      try {
-        fc.mkdir(path, fsp, true);
 
-        FileStatus fsStatus = fc.getFileStatus(path);
-        LOG.info("Perms after creating " + fsStatus.getPermission().toShort()
-            + ", Expected: " + fsp.toShort());
-        if (fsStatus.getPermission().toShort() != fsp.toShort()) {
-          LOG.info("Explicitly setting permissions to : " + fsp.toShort()
-              + ", " + fsp);
-          fc.setPermission(path, fsp);
-        }
-      } catch (FileAlreadyExistsException e) {
-        LOG.info("Directory: [" + path + "] already exists.");
-      }
+    storage = ReflectionUtils.newInstance(conf.getClass(
+        JHAdminConfig.MR_HISTORY_STORAGE, CachedHistoryStorage.class,
+        HistoryStorage.class), conf);
+    if (storage instanceof Service) {
+      ((Service) storage).init(conf);
     }
+    storage.setHistoryFileManager(hsManager);
+
+    super.init(conf);
   }
 
   @Override
   public void start() {
-    //Start moveIntermediatToDoneThread
-    moveIntermediateToDoneRunnable = 
-      new MoveIntermediateToDoneRunnable(moveThreadInterval, numMoveThreads);
+    hsManager.start();
+    if (storage instanceof Service) {
+      ((Service) storage).start();
+    }
+
+    // Start moveIntermediatToDoneThread
+    moveIntermediateToDoneRunnable = new MoveIntermediateToDoneRunnable(
+        moveThreadInterval, numMoveThreads);
     moveIntermediateToDoneThread = new Thread(moveIntermediateToDoneRunnable);
     moveIntermediateToDoneThread.setName("MoveIntermediateToDoneScanner");
     moveIntermediateToDoneThread.start();
-    
-    //Start historyCleaner
+
+    // Start historyCleaner
     boolean startCleanerService = conf.getBoolean(
         JHAdminConfig.MR_HISTORY_CLEANER_ENABLE, true);
     if (startCleanerService) {
       long maxAgeOfHistoryFiles = conf.getLong(
-          JHAdminConfig.MR_HISTORY_MAX_AGE_MS, DEFAULT_HISTORY_MAX_AGE);
+          JHAdminConfig.MR_HISTORY_MAX_AGE_MS,
+          JHAdminConfig.DEFAULT_MR_HISTORY_MAX_AGE);
       cleanerScheduledExecutor = new ScheduledThreadPoolExecutor(1,
-          new ThreadFactoryBuilder().setNameFormat("LogCleaner").build()
-      );
+          new ThreadFactoryBuilder().setNameFormat("LogCleaner").build());
       long runInterval = conf.getLong(
-          JHAdminConfig.MR_HISTORY_CLEANER_INTERVAL_MS, DEFAULT_RUN_INTERVAL);
+          JHAdminConfig.MR_HISTORY_CLEANER_INTERVAL_MS,
+          JHAdminConfig.DEFAULT_MR_HISTORY_CLEANER_INTERVAL_MS);
       cleanerScheduledExecutor
           .scheduleAtFixedRate(new HistoryCleaner(maxAgeOfHistoryFiles),
               30 * 1000l, runInterval, TimeUnit.MILLISECONDS);
     }
     super.start();
   }
-  
+
   @Override
   public void stop() {
     LOG.info("Stopping JobHistory");
@@ -323,281 +178,16 @@ public class JobHistory extends Abstract
         LOG.warn("HistoryCleanerService shutdown may not have succeeded");
       }
     }
+    if (storage instanceof Service) {
+      ((Service) storage).stop();
+    }
+    hsManager.stop();
     super.stop();
   }
-  
+
   public JobHistory() {
     super(JobHistory.class.getName());
   }
-  
-  /**
-   * Populates index data structures.
-   * Should only be called at initialization times.
-   */
-  @SuppressWarnings("unchecked")
-  private void initExisting() throws IOException {
-    LOG.info("Initializing Existing Jobs...");
-    List<FileStatus> timestampedDirList = findTimestampedDirectories();
-    Collections.sort(timestampedDirList);
-    for (FileStatus fs : timestampedDirList) {
-      //TODO Could verify the correct format for these directories.
-      addDirectoryToSerialNumberIndex(fs.getPath());
-      addDirectoryToJobListCache(fs.getPath());
-    }
-  }
-  
-  private void removeDirectoryFromSerialNumberIndex(Path serialDirPath) {
-    String serialPart = serialDirPath.getName();
-    String timeStampPart = 
-      JobHistoryUtils.getTimestampPartFromPath(serialDirPath.toString());
-    if (timeStampPart == null) {
-      LOG.warn("Could not find timestamp portion from path: " + 
-          serialDirPath.toString() +". Continuing with next");
-      return;
-    }
-    if (serialPart == null) {
-      LOG.warn("Could not find serial portion from path: " + 
-          serialDirPath.toString() + ". Continuing with next");
-      return;
-    }
-    if (idToDateString.containsKey(serialPart)) {
-      Set<String> set = idToDateString.get(serialPart);
-      set.remove(timeStampPart);
-      if (set.isEmpty()) {
-        idToDateString.remove(serialPart);
-      }
-    }
-
-  }
-  
-  private void addDirectoryToSerialNumberIndex(Path serialDirPath) {
-    if(LOG.isDebugEnabled()) {
-      LOG.debug("Adding "+serialDirPath+" to serial index");
-    }
-    String serialPart = serialDirPath.getName();
-    String timestampPart = 
-      JobHistoryUtils.getTimestampPartFromPath(serialDirPath.toString());
-    if (timestampPart == null) {
-      LOG.warn("Could not find timestamp portion from path: " + 
-          serialDirPath.toString() +". Continuing with next");
-      return;
-    }
-    if (serialPart == null) {
-      LOG.warn("Could not find serial portion from path: " + 
-          serialDirPath.toString() + ". Continuing with next");
-    }
-    addToSerialNumberIndex(serialPart, timestampPart);
-  }
-
-  private void addToSerialNumberIndex(String serialPart, String timestampPart) {
-      if (!idToDateString.containsKey(serialPart)) {
-        idToDateString.put(serialPart, new HashSet<String>());
-        if (idToDateString.size() > dateStringCacheSize) {
-          idToDateString.remove(idToDateString.firstKey());
-        }
-      Set<String> datePartSet = idToDateString.get(serialPart);
-      datePartSet.add(timestampPart);
-    }
-  }
-  
-  private void addDirectoryToJobListCache(Path path) throws IOException {
-    if(LOG.isDebugEnabled()) {
-      LOG.debug("Adding "+path+" to job list cache.");
-    }
-    List<FileStatus> historyFileList = scanDirectoryForHistoryFiles(path,
-        doneDirFc);
-    for (FileStatus fs : historyFileList) {
-      if(LOG.isDebugEnabled()) {
-        LOG.debug("Adding in history for "+fs.getPath());
-      }
-      JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath()
-          .getName());
-      String confFileName = JobHistoryUtils
-          .getIntermediateConfFileName(jobIndexInfo.getJobId());
-      String summaryFileName = JobHistoryUtils
-          .getIntermediateSummaryFileName(jobIndexInfo.getJobId());
-      MetaInfo metaInfo = new MetaInfo(fs.getPath(), new Path(fs.getPath()
-          .getParent(), confFileName), new Path(fs.getPath().getParent(),
-          summaryFileName), jobIndexInfo);
-      addToJobListCache(jobIndexInfo.getJobId(), metaInfo);
-    }
-  }
-  
-  private static List<FileStatus> scanDirectory(Path path, FileContext fc,
-      PathFilter pathFilter) throws IOException {
-    path = fc.makeQualified(path);
-    List<FileStatus> jhStatusList = new ArrayList<FileStatus>();
-      RemoteIterator<FileStatus> fileStatusIter = fc.listStatus(path);
-      while (fileStatusIter.hasNext()) {
-        FileStatus fileStatus = fileStatusIter.next();
-        Path filePath = fileStatus.getPath();
-        if (fileStatus.isFile() && pathFilter.accept(filePath)) {
-          jhStatusList.add(fileStatus);
-        }
-      }    
-    return jhStatusList;
-  }
-  
-  private static List<FileStatus> scanDirectoryForHistoryFiles(Path path,
-      FileContext fc) throws IOException {
-    return scanDirectory(path, fc, JobHistoryUtils.getHistoryFileFilter());
-  }
-  
-  /**
-   * Finds all history directories with a timestamp component by scanning 
-   * the filesystem.
-   * Used when the JobHistory server is started.
-   * @return
-   */
-  private List<FileStatus> findTimestampedDirectories() throws IOException {
-    List<FileStatus> fsList = JobHistoryUtils.localGlobber(doneDirFc, 
-        doneDirPrefixPath, DONE_BEFORE_SERIAL_TAIL);
-    return fsList;
-  }
-    
-  /**
-   * Adds an entry to the job list cache. Maintains the size.
-   */
-  private void addToJobListCache(JobId jobId, MetaInfo metaInfo) {
-    if(LOG.isDebugEnabled()) {
-      LOG.debug("Adding "+jobId+" to job list cache with "
-          +metaInfo.getJobIndexInfo());
-    }
-    jobListCache.put(jobId, metaInfo);
-    if (jobListCache.size() > jobListCacheSize) {
-      jobListCache.remove(jobListCache.firstKey());
-    }
-  }
-
-  /**
-   * Adds an entry to the loaded job cache. Maintains the size.
-   */
-  private void addToLoadedJobCache(Job job) {
-    if(LOG.isDebugEnabled()) {
-      LOG.debug("Adding "+job.getID()+" to loaded job cache");
-    }
-    loadedJobCache.put(job.getID(), job);
-  }
-  
-  
-  /**
-   * Scans the intermediate directory to find user directories. Scans these
-   * for history files if the modification time for the directory has changed.
-   * @throws IOException
-   */
-  private void scanIntermediateDirectory() throws IOException {
-    List<FileStatus> userDirList = 
-      JobHistoryUtils.localGlobber(intermediateDoneDirFc, intermediateDoneDirPath, "");
-    
-    for (FileStatus userDir : userDirList) {
-      String name = userDir.getPath().getName();
-      long newModificationTime = userDir.getModificationTime();
-      boolean shouldScan = false;
-      synchronized (userDirModificationTimeMap) {
-        if (!userDirModificationTimeMap.containsKey(name) || newModificationTime
-            > userDirModificationTimeMap.get(name)) {
-            shouldScan = true;
-            userDirModificationTimeMap.put(name, newModificationTime);
-        }  
-        }  
-      if (shouldScan) {
-        scanIntermediateDirectory(userDir.getPath());
-      }
-    }
-  }
-
-  /**
-   * Scans the specified path and populates the intermediate cache.
-   * @param absPath
-   * @throws IOException
-   */
-  private void scanIntermediateDirectory(final Path absPath)
-      throws IOException {
-    List<FileStatus> fileStatusList = scanDirectoryForHistoryFiles(absPath,
-        intermediateDoneDirFc);
-    for (FileStatus fs : fileStatusList) {
-      JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath()
-          .getName());
-      String confFileName = JobHistoryUtils
-          .getIntermediateConfFileName(jobIndexInfo.getJobId());
-      String summaryFileName = JobHistoryUtils
-          .getIntermediateSummaryFileName(jobIndexInfo.getJobId());
-      MetaInfo metaInfo = new MetaInfo(fs.getPath(), new Path(fs.getPath()
-          .getParent(), confFileName), new Path(fs.getPath().getParent(),
-          summaryFileName), jobIndexInfo);
-      if (!intermediateListCache.containsKey(jobIndexInfo.getJobId())) {
-        intermediateListCache.put(jobIndexInfo.getJobId(), metaInfo);
-      }
-    }
-  }
-  
-  /**
-   * Searches the job history file FileStatus list for the specified JobId.
-   * 
-   * @param fileStatusList fileStatus list of Job History Files.
-   * @param jobId The JobId to find.
-   * @param checkForDoneFile whether to check for the existance of a done file.
-   * @return A MetaInfo object for the jobId, null if not found.
-   * @throws IOException
-   */
-  private MetaInfo getJobMetaInfo(List<FileStatus> fileStatusList, JobId jobId) 
-  throws IOException {
-    for (FileStatus fs : fileStatusList) {
-      JobIndexInfo jobIndexInfo = 
-        FileNameIndexUtils.getIndexInfo(fs.getPath().getName());
-      if (jobIndexInfo.getJobId().equals(jobId)) {
-        String confFileName = JobHistoryUtils
-            .getIntermediateConfFileName(jobIndexInfo.getJobId());
-        String summaryFileName = JobHistoryUtils
-            .getIntermediateSummaryFileName(jobIndexInfo.getJobId());
-        MetaInfo metaInfo = new MetaInfo(fs.getPath(), new Path(fs.getPath()
-            .getParent(), confFileName), new Path(fs.getPath().getParent(),
-            summaryFileName), jobIndexInfo);
-        return metaInfo;
-      }
-    }
-    return null;
-  }
-  
-  /**
-   * Scans old directories known by the idToDateString map for the specified 
-   * jobId.
-   * If the number of directories is higher than the supported size of the
-   * idToDateString cache, the jobId will not be found.
-   * @param jobId the jobId.
-   * @return
-   * @throws IOException
-   */
-  private MetaInfo scanOldDirsForJob(JobId jobId) throws IOException {
-    int jobSerialNumber = JobHistoryUtils.jobSerialNumber(jobId);
-    String boxedSerialNumber = String.valueOf(jobSerialNumber);
-    Set<String> dateStringSet = idToDateString.get(boxedSerialNumber);
-    if (dateStringSet == null) {
-      return null;
-    }
-    for (String timestampPart : dateStringSet) {
-      Path logDir = canonicalHistoryLogPath(jobId, timestampPart);
-      List<FileStatus> fileStatusList = scanDirectoryForHistoryFiles(logDir,
-          doneDirFc);
-      MetaInfo metaInfo = getJobMetaInfo(fileStatusList, jobId);
-      if (metaInfo != null) {
-        return metaInfo;
-      }
-    }
-    return null;
-  }
-  
-  /**
-   * Checks for the existence of the job history file in the intermediate 
-   * directory.
-   * @param jobId
-   * @return
-   * @throws IOException
-   */
-  private MetaInfo scanIntermediateForJob(JobId jobId) throws IOException {
-    scanIntermediateDirectory();
-    return intermediateListCache.get(jobId);
-  }
 
   @Override
   public String getApplicationName() {
@@ -609,486 +199,167 @@ public class JobHistory extends Abstract
     private long sleepTime;
     private ThreadPoolExecutor moveToDoneExecutor = null;
     private boolean running = false;
-    
-    public void stop() {
+
+    public synchronized void stop() {
       running = false;
+      notify();
     }
-    
+
     MoveIntermediateToDoneRunnable(long sleepTime, int numMoveThreads) {
       this.sleepTime = sleepTime;
-      ThreadFactory tf = new ThreadFactoryBuilder()
-        .setNameFormat("MoveIntermediateToDone Thread #%d")
-        .build();
-      moveToDoneExecutor = new ThreadPoolExecutor(1, numMoveThreads, 1, 
+      ThreadFactory tf = new ThreadFactoryBuilder().setNameFormat(
+          "MoveIntermediateToDone Thread #%d").build();
+      moveToDoneExecutor = new ThreadPoolExecutor(1, numMoveThreads, 1,
           TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>(), tf);
       running = true;
     }
-  
-  @Override
+
+    @Override
     public void run() {
       Thread.currentThread().setName("IntermediateHistoryScanner");
       try {
-        while (running) {
+        while (true) {
           LOG.info("Starting scan to move intermediate done files");
-          scanIntermediateDirectory();
-          for (final MetaInfo metaInfo : intermediateListCache.values()) {
+          for (final MetaInfo metaInfo : hsManager.getIntermediateMetaInfos()) {
             moveToDoneExecutor.execute(new Runnable() {
               @Override
               public void run() {
                 try {
-                moveToDone(metaInfo);
+                  hsManager.moveToDone(metaInfo);
                 } catch (IOException e) {
-                  LOG.info("Failed to process metaInfo for job: " + 
-                      metaInfo.jobIndexInfo.getJobId(), e);
+                  LOG.info(
+                      "Failed to process metaInfo for job: "
+                          + metaInfo.getJobId(), e);
                 }
               }
             });
-
           }
-          synchronized (this) { // TODO Is this really required.
+          synchronized (this) {
             try {
               this.wait(sleepTime);
             } catch (InterruptedException e) {
               LOG.info("IntermediateHistoryScannerThread interrupted");
             }
+            if (!running) {
+              break;
+            }
           }
         }
       } catch (IOException e) {
-        LOG.warn("Unable to get a list of intermediate files to be moved from: "
-            + intermediateDoneDirPath);
+        LOG.warn("Unable to get a list of intermediate files to be moved");
+        // TODO Shut down the entire process!!!!
       }
     }
   }
-  
-  private Job loadJob(MetaInfo metaInfo) {
-    synchronized(metaInfo) {
-      try {
-        Job job = new CompletedJob(conf, metaInfo.getJobIndexInfo().getJobId(), 
-            metaInfo.getHistoryFile(), false, metaInfo.getJobIndexInfo().getUser(),
-            metaInfo.getConfFile(), this.aclsMgr);
-        addToLoadedJobCache(job);
-        return job;
-      } catch (IOException e) {
-        throw new YarnException("Could not find/load job: " + 
-            metaInfo.getJobIndexInfo().getJobId(), e);
-      }
-    }
-  }
-  
-  private Map<JobId, Job> getAllJobsInternal() {
-    //TODO This should ideally be using getAllJobsMetaInfo
-    // or get rid of that method once Job has APIs for user, finishTime etc.
-    SortedMap<JobId, Job> result = new TreeMap<JobId, Job>();
-    try {
-      scanIntermediateDirectory();
-    } catch (IOException e) {
-      LOG.warn("Failed to scan intermediate directory", e);
-      throw new YarnException(e);
-    }
-    for (JobId jobId : intermediateListCache.keySet()) {
-      MetaInfo mi = intermediateListCache.get(jobId);
-      if (mi != null) {
-        result.put(jobId, new PartialJob(mi.getJobIndexInfo(), mi
-            .getJobIndexInfo().getJobId()));
-      }
-    }
-    for (JobId jobId : jobListCache.keySet()) {
-      MetaInfo mi = jobListCache.get(jobId);
-      if (mi != null) {
-        result.put(jobId, new PartialJob(mi.getJobIndexInfo(), mi
-            .getJobIndexInfo().getJobId()));
-      }
-    }
-    return result;
-  }
 
   /**
    * Helper method for test cases.
    */
   MetaInfo getJobMetaInfo(JobId jobId) throws IOException {
-    //MetaInfo available in cache.
-    MetaInfo metaInfo = null;
-    if (jobListCache.containsKey(jobId)) {
-      metaInfo = jobListCache.get(jobId);
-    }
-
-    if (metaInfo != null) {
-      return metaInfo;
-    }
-    
-    //MetaInfo not available. Check intermediate directory for meta info.
-    metaInfo = scanIntermediateForJob(jobId);
-    if (metaInfo != null) {
-      return metaInfo;
-    }
-    
-    //Intermediate directory does not contain job. Search through older ones.
-    metaInfo = scanOldDirsForJob(jobId);
-    if (metaInfo != null) {
-      return metaInfo;
-    }
-    return null;
-  }
-  
-  private Job findJob(JobId jobId) throws IOException {
-    //Job already loaded.
-    if (loadedJobCache.containsKey(jobId)) {
-      return loadedJobCache.get(jobId);        
-    }
-    
-    //MetaInfo available in cache.
-    MetaInfo metaInfo = null;
-    if (jobListCache.containsKey(jobId)) {
-      metaInfo = jobListCache.get(jobId);
-    }
-
-    if (metaInfo != null) {
-      return loadJob(metaInfo);
-    }
-    
-    //MetaInfo not available. Check intermediate directory for meta info.
-    metaInfo = scanIntermediateForJob(jobId);
-    if (metaInfo != null) {
-      return loadJob(metaInfo);
-    }
-    
-    //Intermediate directory does not contain job. Search through older ones.
-    metaInfo = scanOldDirsForJob(jobId);
-    if (metaInfo != null) {
-      return loadJob(metaInfo);
-    }
-    return null;
-  }
-  
-  private void moveToDone(MetaInfo metaInfo) throws IOException {
-    long completeTime = metaInfo.getJobIndexInfo().getFinishTime();
-    if (completeTime == 0) completeTime = System.currentTimeMillis();
-    JobId jobId = metaInfo.getJobIndexInfo().getJobId();
-    
-    List<Path> paths = new ArrayList<Path>();
-    Path historyFile = metaInfo.getHistoryFile();
-    if (historyFile == null) {
-      LOG.info("No file for job-history with " + jobId + " found in cache!");
-    } else {
-      paths.add(historyFile);
-    }
-    
-    Path confFile = metaInfo.getConfFile();
-    if (confFile == null) {
-      LOG.info("No file for jobConf with " + jobId + " found in cache!");
-    } else {
-      paths.add(confFile);
-    }
-    
-    //TODO Check all mi getters and setters for the conf path
-    Path summaryFile = metaInfo.getSummaryFile();
-    if (summaryFile == null) {
-      LOG.info("No summary file for job: " + jobId);
-    } else {
-      try {
-        String jobSummaryString = getJobSummary(intermediateDoneDirFc, summaryFile);
-        SUMMARY_LOG.info(jobSummaryString);
-        LOG.info("Deleting JobSummary file: [" + summaryFile + "]");
-        intermediateDoneDirFc.delete(summaryFile, false);
-        metaInfo.setSummaryFile(null);
-      } catch (IOException e) {
-        LOG.warn("Failed to process summary file: [" + summaryFile + "]");
-        throw e;
-      }
-    }
-
-    Path targetDir = canonicalHistoryLogPath(jobId, completeTime);
-    addDirectoryToSerialNumberIndex(targetDir);
-    try {
-      maybeMakeSubdirectory(targetDir);
-    } catch (IOException e) {
-      LOG.warn("Failed creating subdirectory: " + targetDir + 
-          " while attempting to move files for jobId: " + jobId);
-      throw e;
-    }
-    synchronized (metaInfo) {
-      if (historyFile != null) {
-        Path toPath = doneDirFc.makeQualified(new Path(targetDir, 
-            historyFile.getName()));
-        try {
-          moveToDoneNow(historyFile, toPath);
-        } catch (IOException e) {
-          LOG.warn("Failed to move file: " + historyFile + " for jobId: "
-              + jobId);
-          throw e;
-        }
-        metaInfo.setHistoryFile(toPath);
-      }
-      if (confFile != null) {
-        Path toPath = doneDirFc.makeQualified(new Path(targetDir, 
-            confFile.getName()));
-        try {
-          moveToDoneNow(confFile, toPath);
-        } catch (IOException e) {
-          LOG.warn("Failed to move file: " + historyFile + " for jobId: " 
-              + jobId);
-          throw e;
-        }
-        metaInfo.setConfFile(toPath);
-      }
-    }
-    addToJobListCache(jobId, metaInfo);
-    intermediateListCache.remove(jobId);
+    return hsManager.getMetaInfo(jobId);
   }
-  
-  private void moveToDoneNow(final Path src, final Path target)
-      throws IOException {
-    LOG.info("Moving " + src.toString() + " to " + target.toString());
-    intermediateDoneDirFc.rename(src, target, Options.Rename.NONE);
-    // fc.util().copy(src, target);
-    //fc.delete(src, false);
-    //intermediateDoneDirFc.setPermission(target, new FsPermission(
-    //JobHistoryUtils.HISTORY_DONE_FILE_PERMISSION));
-  }
-  
-  String getJobSummary(FileContext fc, Path path) throws IOException {
-    Path qPath = fc.makeQualified(path);
-    FSDataInputStream in = fc.open(qPath);
-    String jobSummaryString = in.readUTF();
-    in.close();
-    return jobSummaryString;
-  }
-  
-  private void maybeMakeSubdirectory(Path path) throws IOException {
-    boolean existsInExistingCache = false;
-    synchronized(existingDoneSubdirs) {
-      if (existingDoneSubdirs.contains(path)) existsInExistingCache = true;
-    }
-    try {
-      doneDirFc.getFileStatus(path);
-      if (!existsInExistingCache) {
-        existingDoneSubdirs.add(path);
-        if (debugMode) {
-          LOG.info("JobHistory.maybeMakeSubdirectory -- We believed "
-                             + path + " already existed, but it didn't.");
-        }
-      }
-    } catch (FileNotFoundException fnfE) {
-      try {
-        FsPermission fsp = 
-          new FsPermission(JobHistoryUtils.HISTORY_DONE_DIR_PERMISSION);
-        doneDirFc.mkdir(path, fsp, true);
-        FileStatus fsStatus = doneDirFc.getFileStatus(path);
-        LOG.info("Perms after creating " + fsStatus.getPermission().toShort()
-            + ", Expected: " + fsp.toShort());
-        if (fsStatus.getPermission().toShort() != fsp.toShort()) {
-          LOG.info("Explicitly setting permissions to : " + fsp.toShort()
-              + ", " + fsp);
-          doneDirFc.setPermission(path, fsp);
-        }
-        synchronized(existingDoneSubdirs) {
-          existingDoneSubdirs.add(path);
-        }
-      } catch (FileAlreadyExistsException faeE) { //Nothing to do.
-      }
-    }
-  }
-  
-  private Path canonicalHistoryLogPath(JobId id, String timestampComponent) {
-    return new Path(doneDirPrefixPath, 
-        JobHistoryUtils.historyLogSubdirectory(id, timestampComponent, serialNumberFormat));
-  }
-  
-  private Path canonicalHistoryLogPath(JobId id, long millisecondTime) {
-    String timestampComponent = 
-      JobHistoryUtils.timestampDirectoryComponent(millisecondTime, debugMode);
-    return new Path(doneDirPrefixPath, 
-        JobHistoryUtils.historyLogSubdirectory(id, timestampComponent, serialNumberFormat));
-  }  
-  
 
   @Override
-  public synchronized Job getJob(JobId jobId) {
-    if(LOG.isDebugEnabled()) {
-      LOG.debug("Looking for Job "+jobId);
-    }
-    Job job = null;
-    try {
-      job = findJob(jobId);
-      //This could return a null job.
-    } catch (IOException e) {
-      throw new YarnException(e);
-    }
-    return job;
+  public Job getJob(JobId jobId) {
+    return storage.getFullJob(jobId);
   }
 
   @Override
   public Map<JobId, Job> getAllJobs(ApplicationId appID) {
-    if(LOG.isDebugEnabled()) {
+    if (LOG.isDebugEnabled()) {
       LOG.debug("Called getAllJobs(AppId): " + appID);
     }
-//    currently there is 1 to 1 mapping between app and job id
+    // currently there is 1 to 1 mapping between app and job id
     org.apache.hadoop.mapreduce.JobID oldJobID = TypeConverter.fromYarn(appID);
     Map<JobId, Job> jobs = new HashMap<JobId, Job>();
     JobId jobID = TypeConverter.toYarn(oldJobID);
     jobs.put(jobID, getJob(jobID));
     return jobs;
-//    return getAllJobs();
   }
-  
-  /* (non-Javadoc)
-   * @see org.apache.hadoop.mapreduce.v2.hs.HistoryContext#getAllJobs()
-   * 
-   * Returns a recent list of jobs. This may not be the complete set.
-   * If a previous jobId is known - it can be queries via the getJob(JobId)
-   * method.
-   * Size of this list is determined by the size of the job list cache.
-   * This can be fixed when pagination is implemented - return the first set of
-   * jobs via the cache, go to DFS only when an attempt is made to navigate
-   * past the cached list.
-   * This does involve a DFS oepration of scanning the intermediate directory.
-   */
+
+  @Override
   public Map<JobId, Job> getAllJobs() {
-    LOG.debug("Called getAllJobs()");
-    return getAllJobsInternal();
+    return storage.getAllPartialJobs();
   }
 
-  static class MetaInfo {
-    private Path historyFile;
-    private Path confFile; 
-    private Path summaryFile;
-    JobIndexInfo jobIndexInfo;
-
-    MetaInfo(Path historyFile, Path confFile, Path summaryFile, 
-        JobIndexInfo jobIndexInfo) {
-      this.historyFile = historyFile;
-      this.confFile = confFile;
-      this.summaryFile = summaryFile;
-      this.jobIndexInfo = jobIndexInfo;
-    }
-
-    Path getHistoryFile() { return historyFile; }
-    Path getConfFile() { return confFile; }
-    Path getSummaryFile() { return summaryFile; }
-    JobIndexInfo getJobIndexInfo() { return jobIndexInfo; }
-    
-    void setHistoryFile(Path historyFile) { this.historyFile = historyFile; }
-    void setConfFile(Path confFile) {this.confFile = confFile; }
-    void setSummaryFile(Path summaryFile) { this.summaryFile = summaryFile; }
+  /**
+   * Look for a set of partial jobs.
+   * 
+   * @param offset
+   *          the offset into the list of jobs.
+   * @param count
+   *          the maximum number of jobs to return.
+   * @param user
+   *          only return jobs for the given user.
+   * @param queue
+   *          only return jobs for in the given queue.
+   * @param sBegin
+   *          only return Jobs that started on or after the given time.
+   * @param sEnd
+   *          only return Jobs that started on or before the given time.
+   * @param fBegin
+   *          only return Jobs that ended on or after the given time.
+   * @param fEnd
+   *          only return Jobs that ended on or before the given time.
+   * @param jobState
+   *          only return jobs that are in the give job state.
+   * @return The list of filtered jobs.
+   */
+  @Override
+  public JobsInfo getPartialJobs(Long offset, Long count, String user,
+      String queue, Long sBegin, Long sEnd, Long fBegin, Long fEnd,
+      JobState jobState) {
+    return storage.getPartialJobs(offset, count, user, queue, sBegin, sEnd,
+        fBegin, fEnd, jobState);
   }
-  
 
   public class HistoryCleaner implements Runnable {
-    private long currentTime;
-    
     long maxAgeMillis;
-    long filesDeleted = 0;
-    long dirsDeleted = 0;
-    
+
     public HistoryCleaner(long maxAge) {
       this.maxAgeMillis = maxAge;
     }
-    
-    @SuppressWarnings("unchecked")
+
     public void run() {
       LOG.info("History Cleaner started");
-      currentTime = System.currentTimeMillis();
-      boolean halted = false;
-      //TODO Delete YYYY/MM/DD directories.
+      long cutoff = System.currentTimeMillis() - maxAgeMillis;
       try {
-        List<FileStatus> serialDirList = findTimestampedDirectories();
-        //Sort in ascending order. Relies on YYYY/MM/DD/Serial
-        Collections.sort(serialDirList);
-        for (FileStatus serialDir : serialDirList) {
-          List<FileStatus> historyFileList = 
-            scanDirectoryForHistoryFiles(serialDir.getPath(), doneDirFc);
-          for (FileStatus historyFile : historyFileList) {
-            JobIndexInfo jobIndexInfo = 
-              FileNameIndexUtils.getIndexInfo(historyFile.getPath().getName());
-            long effectiveTimestamp = 
-              getEffectiveTimestamp(jobIndexInfo.getFinishTime(), historyFile);
-            if (shouldDelete(effectiveTimestamp)) {
-              String confFileName = 
-                JobHistoryUtils.getIntermediateConfFileName(jobIndexInfo.getJobId());
-              MetaInfo metaInfo = new MetaInfo(historyFile.getPath(),
-                  new Path(historyFile.getPath().getParent(), confFileName), 
-                  null, jobIndexInfo);
-              delete(metaInfo);
-            } else {
-              halted = true;
-              break;
-            }
-          }
-          if (!halted) {
-            deleteDir(serialDir.getPath());
-            removeDirectoryFromSerialNumberIndex(serialDir.getPath());
-            synchronized (existingDoneSubdirs) {
-              existingDoneSubdirs.remove(serialDir.getPath());  
-            }
-            
-          } else {
-            break; //Don't scan any more directories.
-    }
-  }
+        hsManager.clean(cutoff, storage);
       } catch (IOException e) {
-        LOG.warn("Error in History cleaner run", e);
+        LOG.warn("Error trying to clean up ", e);
       }
       LOG.info("History Cleaner complete");
-      LOG.info("FilesDeleted: " + filesDeleted);
-      LOG.info("Directories Deleted: " + dirsDeleted);
-    }
-    
-    private boolean shouldDelete(long ts) {
-      return ((ts + maxAgeMillis) <= currentTime);
-    }
-    
-    private long getEffectiveTimestamp(long finishTime, FileStatus fileStatus) {
-      if (finishTime == 0) {
-        return fileStatus.getModificationTime();
-      }
-      return finishTime;
-    }
-    
-    private void delete(MetaInfo metaInfo) throws IOException {
-      deleteFile(metaInfo.getHistoryFile());
-      deleteFile(metaInfo.getConfFile());
-      jobListCache.remove(metaInfo.getJobIndexInfo().getJobId());
-      loadedJobCache.remove(metaInfo.getJobIndexInfo().getJobId());
-    }
-    
-    private void deleteFile(final Path path) throws IOException {
-      doneDirFc.delete(doneDirFc.makeQualified(path), false);
-      filesDeleted++;
-    }
-    
-    private void deleteDir(Path path) throws IOException {
-      doneDirFc.delete(doneDirFc.makeQualified(path), true);
-      dirsDeleted++;
-    }
     }
-    
-  
-  
-  //TODO AppContext - Not Required
-  private  ApplicationAttemptId appAttemptID;
+  }
+
+  // TODO AppContext - Not Required
+  private ApplicationAttemptId appAttemptID;
+
   @Override
   public ApplicationAttemptId getApplicationAttemptId() {
-  //TODO fixme - bogus appAttemptID for now
+    // TODO fixme - bogus appAttemptID for now
     return appAttemptID;
-  }  
-  
-  //TODO AppContext - Not Required
+  }
+
+  // TODO AppContext - Not Required
   private ApplicationId appID;
+
   @Override
   public ApplicationId getApplicationID() {
-  //TODO fixme - bogus appID for now
+    // TODO fixme - bogus appID for now
     return appID;
   }
-  
-  //TODO AppContext - Not Required
+
+  // TODO AppContext - Not Required
   @Override
   public EventHandler getEventHandler() {
     // TODO Auto-generated method stub
     return null;
   }
-  
-  //TODO AppContext - Not Required
+
+  // TODO AppContext - Not Required
   private String userName;
+
   @Override
   public CharSequence getUser() {
     if (userName != null) {

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java?rev=1311896&r1=1311895&r2=1311896&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java Tue Apr 10 18:11:26 2012
@@ -51,6 +51,7 @@ public class PartialJob implements org.a
     jobReport = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(JobReport.class);
     jobReport.setStartTime(jobIndexInfo.getSubmitTime());
     jobReport.setFinishTime(jobIndexInfo.getFinishTime());
+    jobReport.setJobState(getState());
   }
   
   @Override

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsWebApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsWebApp.java?rev=1311896&r1=1311895&r2=1311896&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsWebApp.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsWebApp.java Tue Apr 10 18:11:26 2012
@@ -44,6 +44,7 @@ public class HsWebApp extends WebApp imp
     bind(JAXBContextResolver.class);
     bind(GenericExceptionHandler.class);
     bind(AppContext.class).toInstance(history);
+    bind(HistoryContext.class).toInstance(history);
     route("/", HsController.class);
     route("/app", HsController.class);
     route(pajoin("/job", JOB_ID), HsController.class, "job");

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsWebServices.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsWebServices.java?rev=1311896&r1=1311895&r2=1311896&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsWebServices.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsWebServices.java Tue Apr 10 18:11:26 2012
@@ -32,10 +32,8 @@ import javax.ws.rs.core.UriInfo;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
-import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
-import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
 import org.apache.hadoop.mapreduce.v2.app.job.Task;
 import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
@@ -49,6 +47,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.webapp.dao.TaskAttemptsInfo;
 import org.apache.hadoop.mapreduce.v2.app.webapp.dao.TaskInfo;
 import org.apache.hadoop.mapreduce.v2.app.webapp.dao.TasksInfo;
+import org.apache.hadoop.mapreduce.v2.hs.HistoryContext;
 import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.AMAttemptInfo;
 import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.AMAttemptsInfo;
 import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.HistoryInfo;
@@ -64,7 +63,7 @@ import com.google.inject.Inject;
 
 @Path("/ws/v1/history")
 public class HsWebServices {
-  private final AppContext appCtx;
+  private final HistoryContext ctx;
   private WebApp webapp;
   private final Configuration conf;
 
@@ -72,9 +71,9 @@ public class HsWebServices {
   UriInfo uriInfo;
 
   @Inject
-  public HsWebServices(final AppContext appCtx, final Configuration conf,
+  public HsWebServices(final HistoryContext ctx, final Configuration conf,
       final WebApp webapp) {
-    this.appCtx = appCtx;
+    this.ctx = ctx;
     this.conf = conf;
     this.webapp = webapp;
   }
@@ -103,33 +102,22 @@ public class HsWebServices {
       @QueryParam("startedTimeEnd") String startedEnd,
       @QueryParam("finishedTimeBegin") String finishBegin,
       @QueryParam("finishedTimeEnd") String finishEnd) {
-    JobsInfo allJobs = new JobsInfo();
-    long num = 0;
-    boolean checkCount = false;
-    boolean checkStart = false;
-    boolean checkEnd = false;
-    long countNum = 0;
-
-    // set values suitable in case both of begin/end not specified
-    long sBegin = 0;
-    long sEnd = Long.MAX_VALUE;
-    long fBegin = 0;
-    long fEnd = Long.MAX_VALUE;
 
+    Long countParam = null;
+    
     if (count != null && !count.isEmpty()) {
-      checkCount = true;
       try {
-        countNum = Long.parseLong(count);
+        countParam = Long.parseLong(count);
       } catch (NumberFormatException e) {
         throw new BadRequestException(e.getMessage());
       }
-      if (countNum <= 0) {
+      if (countParam <= 0) {
         throw new BadRequestException("limit value must be greater then 0");
       }
     }
 
+    Long sBegin = null;
     if (startedBegin != null && !startedBegin.isEmpty()) {
-      checkStart = true;
       try {
         sBegin = Long.parseLong(startedBegin);
       } catch (NumberFormatException e) {
@@ -139,8 +127,9 @@ public class HsWebServices {
         throw new BadRequestException("startedTimeBegin must be greater than 0");
       }
     }
+    
+    Long sEnd = null;
     if (startedEnd != null && !startedEnd.isEmpty()) {
-      checkStart = true;
       try {
         sEnd = Long.parseLong(startedEnd);
       } catch (NumberFormatException e) {
@@ -150,13 +139,13 @@ public class HsWebServices {
         throw new BadRequestException("startedTimeEnd must be greater than 0");
       }
     }
-    if (sBegin > sEnd) {
+    if (sBegin != null && sEnd != null && sBegin > sEnd) {
       throw new BadRequestException(
           "startedTimeEnd must be greater than startTimeBegin");
     }
 
+    Long fBegin = null;
     if (finishBegin != null && !finishBegin.isEmpty()) {
-      checkEnd = true;
       try {
         fBegin = Long.parseLong(finishBegin);
       } catch (NumberFormatException e) {
@@ -166,8 +155,8 @@ public class HsWebServices {
         throw new BadRequestException("finishedTimeBegin must be greater than 0");
       }
     }
+    Long fEnd = null;
     if (finishEnd != null && !finishEnd.isEmpty()) {
-      checkEnd = true;
       try {
         fEnd = Long.parseLong(finishEnd);
       } catch (NumberFormatException e) {
@@ -177,53 +166,18 @@ public class HsWebServices {
         throw new BadRequestException("finishedTimeEnd must be greater than 0");
       }
     }
-    if (fBegin > fEnd) {
+    if (fBegin != null && fEnd != null && fBegin > fEnd) {
       throw new BadRequestException(
           "finishedTimeEnd must be greater than finishedTimeBegin");
     }
-
-    for (Job job : appCtx.getAllJobs().values()) {
-      if (checkCount && num == countNum) {
-        break;
-      }
-
-      if (stateQuery != null && !stateQuery.isEmpty()) {
-        JobState.valueOf(stateQuery);
-        if (!job.getState().toString().equalsIgnoreCase(stateQuery)) {
-          continue;
-        }
-      }
-
-      // can't really validate queue is a valid one since queues could change
-      if (queueQuery != null && !queueQuery.isEmpty()) {
-        if (!job.getQueueName().equals(queueQuery)) {
-          continue;
-        }
-      }
-
-      if (userQuery != null && !userQuery.isEmpty()) {
-        if (!job.getUserName().equals(userQuery)) {
-          continue;
-        }
-      }
-
-      JobReport report = job.getReport();
-      
-      if (checkStart
-          && (report.getStartTime() < sBegin || report.getStartTime() > sEnd)) {
-        continue;
-      }
-      if (checkEnd
-          && (report.getFinishTime() < fBegin || report.getFinishTime() > fEnd)) {
-        continue;
-      }
-      
-      JobInfo jobInfo = new JobInfo(job);
-      
-      allJobs.add(jobInfo);
-      num++;
+    
+    JobState jobState = null;
+    if (stateQuery != null) {
+      jobState = JobState.valueOf(stateQuery);
     }
-    return allJobs;
+
+    return ctx.getPartialJobs(0l, countParam, userQuery, queueQuery, 
+        sBegin, sEnd, fBegin, fEnd, jobState);
   }
 
   @GET
@@ -231,7 +185,7 @@ public class HsWebServices {
   @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
   public JobInfo getJob(@PathParam("jobid") String jid) {
 
-    Job job = AMWebServices.getJobFromJobIdString(jid, appCtx);
+    Job job = AMWebServices.getJobFromJobIdString(jid, ctx);
     return new JobInfo(job);
   }
 
@@ -240,7 +194,7 @@ public class HsWebServices {
   @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
   public AMAttemptsInfo getJobAttempts(@PathParam("jobid") String jid) {
 
-    Job job = AMWebServices.getJobFromJobIdString(jid, appCtx);
+    Job job = AMWebServices.getJobFromJobIdString(jid, ctx);
     AMAttemptsInfo amAttempts = new AMAttemptsInfo();
     for (AMInfo amInfo : job.getAMInfos()) {
       AMAttemptInfo attempt = new AMAttemptInfo(amInfo, MRApps.toString(job
@@ -256,8 +210,8 @@ public class HsWebServices {
   @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
   public JobCounterInfo getJobCounters(@PathParam("jobid") String jid) {
 
-    Job job = AMWebServices.getJobFromJobIdString(jid, appCtx);
-    return new JobCounterInfo(this.appCtx, job);
+    Job job = AMWebServices.getJobFromJobIdString(jid, ctx);
+    return new JobCounterInfo(this.ctx, job);
   }
 
   @GET
@@ -265,7 +219,7 @@ public class HsWebServices {
   @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
   public ConfInfo getJobConf(@PathParam("jobid") String jid) {
 
-    Job job = AMWebServices.getJobFromJobIdString(jid, appCtx);
+    Job job = AMWebServices.getJobFromJobIdString(jid, ctx);
     ConfInfo info;
     try {
       info = new ConfInfo(job, this.conf);
@@ -282,7 +236,7 @@ public class HsWebServices {
   public TasksInfo getJobTasks(@PathParam("jobid") String jid,
       @QueryParam("type") String type) {
 
-    Job job = AMWebServices.getJobFromJobIdString(jid, appCtx);
+    Job job = AMWebServices.getJobFromJobIdString(jid, ctx);
     TasksInfo allTasks = new TasksInfo();
     for (Task task : job.getTasks().values()) {
       TaskType ttype = null;
@@ -307,7 +261,7 @@ public class HsWebServices {
   public TaskInfo getJobTask(@PathParam("jobid") String jid,
       @PathParam("taskid") String tid) {
 
-    Job job = AMWebServices.getJobFromJobIdString(jid, appCtx);
+    Job job = AMWebServices.getJobFromJobIdString(jid, ctx);
     Task task = AMWebServices.getTaskFromTaskIdString(tid, job);
     return new TaskInfo(task);
 
@@ -319,7 +273,7 @@ public class HsWebServices {
   public JobTaskCounterInfo getSingleTaskCounters(
       @PathParam("jobid") String jid, @PathParam("taskid") String tid) {
 
-    Job job = AMWebServices.getJobFromJobIdString(jid, appCtx);
+    Job job = AMWebServices.getJobFromJobIdString(jid, ctx);
     TaskId taskID = MRApps.toTaskID(tid);
     if (taskID == null) {
       throw new NotFoundException("taskid " + tid + " not found or invalid");
@@ -338,7 +292,7 @@ public class HsWebServices {
       @PathParam("taskid") String tid) {
 
     TaskAttemptsInfo attempts = new TaskAttemptsInfo();
-    Job job = AMWebServices.getJobFromJobIdString(jid, appCtx);
+    Job job = AMWebServices.getJobFromJobIdString(jid, ctx);
     Task task = AMWebServices.getTaskFromTaskIdString(tid, job);
     for (TaskAttempt ta : task.getAttempts().values()) {
       if (ta != null) {
@@ -358,7 +312,7 @@ public class HsWebServices {
   public TaskAttemptInfo getJobTaskAttemptId(@PathParam("jobid") String jid,
       @PathParam("taskid") String tid, @PathParam("attemptid") String attId) {
 
-    Job job = AMWebServices.getJobFromJobIdString(jid, appCtx);
+    Job job = AMWebServices.getJobFromJobIdString(jid, ctx);
     Task task = AMWebServices.getTaskFromTaskIdString(tid, job);
     TaskAttempt ta = AMWebServices.getTaskAttemptFromTaskAttemptString(attId,
         task);
@@ -376,7 +330,7 @@ public class HsWebServices {
       @PathParam("jobid") String jid, @PathParam("taskid") String tid,
       @PathParam("attemptid") String attId) {
 
-    Job job = AMWebServices.getJobFromJobIdString(jid, appCtx);
+    Job job = AMWebServices.getJobFromJobIdString(jid, ctx);
     Task task = AMWebServices.getTaskFromTaskIdString(tid, job);
     TaskAttempt ta = AMWebServices.getTaskAttemptFromTaskAttemptString(attId,
         task);

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java?rev=1311896&r1=1311895&r2=1311896&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java Tue Apr 10 18:11:26 2012
@@ -92,6 +92,14 @@ public class TestJobHistoryParsing {
     checkHistoryParsing(3, 0, 2);
   }
   
+  private static String getJobSummary(FileContext fc, Path path) throws IOException {
+    Path qPath = fc.makeQualified(path);
+    FSDataInputStream in = fc.open(qPath);
+    String jobSummaryString = in.readUTF();
+    in.close();
+    return jobSummaryString;
+  }
+  
   private void checkHistoryParsing(final int numMaps, final int numReduces,
       final int numSuccessfulMaps) 
   throws Exception {
@@ -244,7 +252,7 @@ public class TestJobHistoryParsing {
       String summaryFileName = JobHistoryUtils
           .getIntermediateSummaryFileName(jobId);
       Path summaryFile = new Path(jobhistoryDir, summaryFileName);
-      String jobSummaryString = jobHistory.getJobSummary(fc, summaryFile);
+      String jobSummaryString = getJobSummary(fc, summaryFile);
       Assert.assertTrue(jobSummaryString.contains("resourcesPerMap=100"));
       Assert.assertTrue(jobSummaryString.contains("resourcesPerReduce=100"));
       Assert.assertNotNull(jobSummaryString);

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServices.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServices.java?rev=1311896&r1=1311895&r2=1311896&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServices.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServices.java Tue Apr 10 18:11:26 2012
@@ -30,11 +30,13 @@ import javax.xml.parsers.DocumentBuilder
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.MockJobs;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
 import org.apache.hadoop.mapreduce.v2.hs.HistoryContext;
 import org.apache.hadoop.mapreduce.v2.hs.JobHistory;
+import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobsInfo;
 import org.apache.hadoop.util.VersionInfo;
 import org.apache.hadoop.yarn.Clock;
 import org.apache.hadoop.yarn.ClusterInfo;
@@ -77,7 +79,7 @@ public class TestHsWebServices extends J
   private static TestAppContext appContext;
   private static HsWebApp webApp;
 
-  static class TestAppContext implements AppContext {
+  static class TestAppContext implements HistoryContext {
     final ApplicationAttemptId appAttemptID;
     final ApplicationId appID;
     final String user = MockJobs.newUserName();
@@ -144,6 +146,20 @@ public class TestHsWebServices extends J
     public ClusterInfo getClusterInfo() {
       return null;
     }
+
+    @Override
+    public Map<JobId, Job> getAllJobs(ApplicationId appID) {
+      // TODO Auto-generated method stub
+      return null;
+    }
+
+    @Override
+    public JobsInfo getPartialJobs(Long offset, Long count, String user,
+        String queue, Long sBegin, Long sEnd, Long fBegin, Long fEnd,
+        JobState jobState) {
+      // TODO Auto-generated method stub
+      return null;
+    }
   }
 
   private Injector injector = Guice.createInjector(new ServletModule() {
@@ -160,6 +176,7 @@ public class TestHsWebServices extends J
       bind(GenericExceptionHandler.class);
       bind(WebApp.class).toInstance(webApp);
       bind(AppContext.class).toInstance(appContext);
+      bind(HistoryContext.class).toInstance(appContext);
       bind(Configuration.class).toInstance(conf);
 
       serve("/*").with(GuiceContainer.class);

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesAttempts.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesAttempts.java?rev=1311896&r1=1311895&r2=1311896&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesAttempts.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesAttempts.java Tue Apr 10 18:11:26 2012
@@ -35,6 +35,7 @@ import javax.xml.parsers.DocumentBuilder
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
@@ -42,6 +43,8 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
 import org.apache.hadoop.mapreduce.v2.app.job.Task;
 import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.hs.HistoryContext;
+import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobsInfo;
 import org.apache.hadoop.mapreduce.v2.util.MRApps;
 import org.apache.hadoop.yarn.Clock;
 import org.apache.hadoop.yarn.ClusterInfo;
@@ -89,7 +92,7 @@ public class TestHsWebServicesAttempts e
   private static TestAppContext appContext;
   private static HsWebApp webApp;
 
-  static class TestAppContext implements AppContext {
+  static class TestAppContext implements HistoryContext {
     final ApplicationAttemptId appAttemptID;
     final ApplicationId appID;
     final String user = MockJobs.newUserName();
@@ -156,6 +159,20 @@ public class TestHsWebServicesAttempts e
     public ClusterInfo getClusterInfo() {
       return null;
     }
+
+    @Override
+    public Map<JobId, Job> getAllJobs(ApplicationId appID) {
+      // TODO Auto-generated method stub
+      return null;
+    }
+
+    @Override
+    public JobsInfo getPartialJobs(Long offset, Long count, String user,
+        String queue, Long sBegin, Long sEnd, Long fBegin, Long fEnd,
+        JobState jobState) {
+      // TODO Auto-generated method stub
+      return null;
+    }
   }
 
   private Injector injector = Guice.createInjector(new ServletModule() {
@@ -171,6 +188,7 @@ public class TestHsWebServicesAttempts e
       bind(GenericExceptionHandler.class);
       bind(WebApp.class).toInstance(webApp);
       bind(AppContext.class).toInstance(appContext);
+      bind(HistoryContext.class).toInstance(appContext);
       bind(Configuration.class).toInstance(conf);
 
       serve("/*").with(GuiceContainer.class);

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesJobConf.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesJobConf.java?rev=1311896&r1=1311895&r2=1311896&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesJobConf.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesJobConf.java Tue Apr 10 18:11:26 2012
@@ -41,9 +41,12 @@ import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.MockJobs;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.hs.HistoryContext;
+import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobsInfo;
 import org.apache.hadoop.mapreduce.v2.util.MRApps;
 import org.apache.hadoop.yarn.Clock;
 import org.apache.hadoop.yarn.ClusterInfo;
@@ -90,7 +93,7 @@ public class TestHsWebServicesJobConf ex
   private static File testConfDir = new File("target",
       TestHsWebServicesJobConf.class.getSimpleName() + "confDir");
 
-  static class TestAppContext implements AppContext {
+  static class TestAppContext implements HistoryContext {
     final ApplicationAttemptId appAttemptID;
     final ApplicationId appID;
     final String user = MockJobs.newUserName();
@@ -156,6 +159,20 @@ public class TestHsWebServicesJobConf ex
     public ClusterInfo getClusterInfo() {
       return null;
     }
+
+    @Override
+    public Map<JobId, Job> getAllJobs(ApplicationId appID) {
+      // TODO Auto-generated method stub
+      return null;
+    }
+
+    @Override
+    public JobsInfo getPartialJobs(Long offset, Long count, String user,
+        String queue, Long sBegin, Long sEnd, Long fBegin, Long fEnd,
+        JobState jobState) {
+      // TODO Auto-generated method stub
+      return null;
+    }
   }
 
   private Injector injector = Guice.createInjector(new ServletModule() {
@@ -195,6 +212,7 @@ public class TestHsWebServicesJobConf ex
       bind(GenericExceptionHandler.class);
       bind(WebApp.class).toInstance(webApp);
       bind(AppContext.class).toInstance(appContext);
+      bind(HistoryContext.class).toInstance(appContext);
       bind(Configuration.class).toInstance(conf);
 
       serve("/*").with(GuiceContainer.class);

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesJobs.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesJobs.java?rev=1311896&r1=1311895&r2=1311896&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesJobs.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesJobs.java Tue Apr 10 18:11:26 2012
@@ -38,11 +38,15 @@ import javax.xml.parsers.DocumentBuilder
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.MockJobs;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.hs.CachedHistoryStorage;
+import org.apache.hadoop.mapreduce.v2.hs.HistoryContext;
 import org.apache.hadoop.mapreduce.v2.hs.MockHistoryJobs;
 import org.apache.hadoop.mapreduce.v2.hs.MockHistoryJobs.JobsPair;
+import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobsInfo;
 import org.apache.hadoop.mapreduce.v2.util.MRApps;
 import org.apache.hadoop.yarn.Clock;
 import org.apache.hadoop.yarn.ClusterInfo;
@@ -90,7 +94,7 @@ public class TestHsWebServicesJobs exten
   private static TestAppContext appContext;
   private static HsWebApp webApp;
 
-  static class TestAppContext implements AppContext {
+  static class TestAppContext implements HistoryContext {
     final ApplicationAttemptId appAttemptID;
     final ApplicationId appID;
     final String user = MockJobs.newUserName();
@@ -169,6 +173,20 @@ public class TestHsWebServicesJobs exten
     public ClusterInfo getClusterInfo() {
       return null;
     }
+
+    @Override
+    public Map<JobId, Job> getAllJobs(ApplicationId appID) {
+      // TODO Auto-generated method stub
+      return null;
+    }
+
+    @Override
+    public JobsInfo getPartialJobs(Long offset, Long count, String user,
+        String queue, Long sBegin, Long sEnd, Long fBegin, Long fEnd,
+        JobState jobState) {
+      return CachedHistoryStorage.getPartialJobs(this.partialJobs.values(), 
+          offset, count, user, queue, sBegin, sEnd, fBegin, fEnd, jobState);
+    }
   }
 
   private Injector injector = Guice.createInjector(new ServletModule() {
@@ -184,6 +202,7 @@ public class TestHsWebServicesJobs exten
       bind(GenericExceptionHandler.class);
       bind(WebApp.class).toInstance(webApp);
       bind(AppContext.class).toInstance(appContext);
+      bind(HistoryContext.class).toInstance(appContext);
       bind(Configuration.class).toInstance(conf);
 
       serve("/*").with(GuiceContainer.class);

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesJobsQuery.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesJobsQuery.java?rev=1311896&r1=1311895&r2=1311896&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesJobsQuery.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesJobsQuery.java Tue Apr 10 18:11:26 2012
@@ -36,8 +36,11 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.MockJobs;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.hs.CachedHistoryStorage;
+import org.apache.hadoop.mapreduce.v2.hs.HistoryContext;
 import org.apache.hadoop.mapreduce.v2.hs.MockHistoryJobs;
 import org.apache.hadoop.mapreduce.v2.hs.MockHistoryJobs.JobsPair;
+import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobsInfo;
 import org.apache.hadoop.mapreduce.v2.util.MRApps;
 import org.apache.hadoop.yarn.Clock;
 import org.apache.hadoop.yarn.ClusterInfo;
@@ -77,7 +80,7 @@ public class TestHsWebServicesJobsQuery 
   private static TestAppContext appContext;
   private static HsWebApp webApp;
 
-  static class TestAppContext implements AppContext {
+  static class TestAppContext implements HistoryContext {
     final String user = MockJobs.newUserName();
     final Map<JobId, Job> fullJobs;
     final Map<JobId, Job> partialJobs;
@@ -152,6 +155,20 @@ public class TestHsWebServicesJobsQuery 
     public ClusterInfo getClusterInfo() {
       return null;
     }
+
+    @Override
+    public Map<JobId, Job> getAllJobs(ApplicationId appID) {
+      // TODO Auto-generated method stub
+      return null;
+    }
+
+    @Override
+    public JobsInfo getPartialJobs(Long offset, Long count, String user,
+        String queue, Long sBegin, Long sEnd, Long fBegin, Long fEnd,
+        JobState jobState) {
+      return CachedHistoryStorage.getPartialJobs(this.partialJobs.values(), 
+          offset, count, user, queue, sBegin, sEnd, fBegin, fEnd, jobState);
+    }
   }
 
   private Injector injector = Guice.createInjector(new ServletModule() {
@@ -167,6 +184,7 @@ public class TestHsWebServicesJobsQuery 
       bind(GenericExceptionHandler.class);
       bind(WebApp.class).toInstance(webApp);
       bind(AppContext.class).toInstance(appContext);
+      bind(HistoryContext.class).toInstance(appContext);
       bind(Configuration.class).toInstance(conf);
 
       serve("/*").with(GuiceContainer.class);

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesTasks.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesTasks.java?rev=1311896&r1=1311895&r2=1311896&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesTasks.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesTasks.java Tue Apr 10 18:11:26 2012
@@ -34,12 +34,15 @@ import javax.xml.parsers.DocumentBuilder
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskReport;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.MockJobs;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
 import org.apache.hadoop.mapreduce.v2.app.job.Task;
+import org.apache.hadoop.mapreduce.v2.hs.HistoryContext;
+import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobsInfo;
 import org.apache.hadoop.mapreduce.v2.util.MRApps;
 import org.apache.hadoop.yarn.Clock;
 import org.apache.hadoop.yarn.ClusterInfo;
@@ -85,7 +88,7 @@ public class TestHsWebServicesTasks exte
   private static TestAppContext appContext;
   private static HsWebApp webApp;
 
-  static class TestAppContext implements AppContext {
+  static class TestAppContext implements HistoryContext {
     final ApplicationAttemptId appAttemptID;
     final ApplicationId appID;
     final String user = MockJobs.newUserName();
@@ -152,6 +155,20 @@ public class TestHsWebServicesTasks exte
     public ClusterInfo getClusterInfo() {
       return null;
     }
+
+    @Override
+    public Map<JobId, Job> getAllJobs(ApplicationId appID) {
+      // TODO Auto-generated method stub
+      return null;
+    }
+
+    @Override
+    public JobsInfo getPartialJobs(Long offset, Long count, String user,
+        String queue, Long sBegin, Long sEnd, Long fBegin, Long fEnd,
+        JobState jobState) {
+      // TODO Auto-generated method stub
+      return null;
+    }
   }
 
   private Injector injector = Guice.createInjector(new ServletModule() {
@@ -167,6 +184,7 @@ public class TestHsWebServicesTasks exte
       bind(GenericExceptionHandler.class);
       bind(WebApp.class).toInstance(webApp);
       bind(AppContext.class).toInstance(appContext);
+      bind(HistoryContext.class).toInstance(appContext);
       bind(Configuration.class).toInstance(conf);
 
       serve("/*").with(GuiceContainer.class);



Mime
View raw message