hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sha...@apache.org
Subject svn commit: r1101385 [2/2] - in /hadoop/mapreduce/branches/MR-279: ./ mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job...
Date Tue, 10 May 2011 09:44:28 GMT
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java?rev=1101385&r1=1101384&r2=1101385&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java Tue May 10 09:44:27 2011
@@ -18,137 +18,1097 @@
 
 package org.apache.hadoop.mapreduce.v2.hs;
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
-import java.util.LinkedList;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.RemoteIterator;
-import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.v2.YarnMRJobConfig;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
-import org.apache.hadoop.mapreduce.v2.util.JobHistoryUtils;
+import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo;
 import org.apache.hadoop.yarn.Clock;
 import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 
+
 /*
  * Loads and manages the Job history cache.
  */
 public class JobHistory implements HistoryContext {
 
-  private Map<JobId, Job> completedJobCache =
-    new ConcurrentHashMap<JobId, Job>();
-  private Configuration conf;
-  private final ApplicationId appID;
-  private final String userName;
-  private final LinkedList<Job> jobQ = new LinkedList<Job>();
+  private static final int DEFAULT_JOBLIST_CACHE_SIZE = 20000;
+  private static final int DEFAULT_LOADEDJOB_CACHE_SIZE = 2000;
+  private static final int DEFAULT_DATESTRING_CACHE_SIZE = 200000;
+  private static final long DEFAULT_MOVE_THREAD_INTERVAL = 3 * 60 * 1000l; //3 minutes
+  private static final int DEFAULT_MOVE_THREAD_COUNT = 3;
+  
+  static final long DEFAULT_HISTORY_MAX_AGE = 7 * 24 * 60 * 60 * 1000L; //1 week
+  static final long DEFAULT_RUN_INTERVAL = 1 * 24 * 60 * 60 * 1000l; //1 day
+  
   private static final Log LOG = LogFactory.getLog(JobHistory.class);
-  private final int retiredJobsCacheSize = 1000; //TODO make it configurable
 
+  private static final Pattern DATE_PATTERN = Pattern
+      .compile("([0-1]?[0-9])/([0-3]?[0-9])/((?:2[0-9])[0-9][0-9])");
 
-  public JobHistory(Configuration conf) {
-    this.conf = conf;
-    userName = conf.get(MRJobConfig.USER_NAME, "history-user");
-    //TODO fixme - bogus appID for now
+  /*
+   * TODO Get rid of this once JobId has it's own comparator
+   */
+  private static final Comparator<JobId> JOB_ID_COMPARATOR = new Comparator<JobId>() {
+    @Override
+    public int compare(JobId o1, JobId o2) {
+      if (o1.getAppId().getClusterTimestamp() > o2.getAppId().getClusterTimestamp()) {
+        return 1;
+      } else if (o1.getAppId().getClusterTimestamp() < o2.getAppId().getClusterTimestamp()) {
+        return -1;
+      } else {
+        return o1.getId() - o2.getId();
+      }
+    }
+  };
+  
+  private static String DONE_BEFORE_SERIAL_TAIL = JobHistoryUtils.doneSubdirsBeforeSerialTail();
+  
+  /**
+   * Maps between a serial number (generated based on jobId) and the timestamp
+   * component(s) to which it belongs.
+   * Facilitates jobId based searches.
+   * If a jobId is not found in this list - it will not be found.
+   */
+  private final SortedMap<String, Set<String>> idToDateString = new ConcurrentSkipListMap<String, Set<String>>();
+
+  //Maintains minimal details for recent jobs (parsed from history file name).
+  //Sorted on Job Completion Time.
+  private final SortedMap<JobId, MetaInfo> jobListCache = new ConcurrentSkipListMap<JobId, MetaInfo>(
+      JOB_ID_COMPARATOR);
+  
+  
+  // Re-use exisiting MetaInfo objects if they exist for the specific JobId. (synchronization on MetaInfo)
+  // Check for existance of the object when using iterators.
+  private final SortedMap<JobId, MetaInfo> intermediateListCache = new ConcurrentSkipListMap<JobId, JobHistory.MetaInfo>(
+      JOB_ID_COMPARATOR);
+  
+  //Maintains a list of known done subdirectories. Not currently used.
+  private final Set<Path> existingDoneSubdirs = new HashSet<Path>();
+  
+  private final SortedMap<JobId, Job> loadedJobCache = new ConcurrentSkipListMap<JobId, Job>(
+      JOB_ID_COMPARATOR);
+
+  //The number of jobs to maintain in the job list cache.
+  private int jobListCacheSize;
+  
+  //The number of loaded jobs.
+  private int loadedJobCacheSize;
+  
+  //The number of entries in idToDateString
+  private int dateStringCacheSize;
+
+  //Time interval for the move thread.
+  private long moveThreadInterval;
+  
+  //Number of move threads.
+  private int numMoveThreads;
+  
+  private Configuration conf;
+
+  private boolean debugMode;
+  private int serialNumberLowDigits;
+  private String serialNumberFormat;
+  
+
+  private Path doneDirPrefixPath = null; // folder for completed jobs
+  private FileContext doneDirFc; // done Dir FileContext
+  
+  private Path intermediateDoneDirPath = null; //Intermediate Done Dir Path
+  private FileContext intermediaDoneDirFc; //Intermediate Done Dir FileContext
+
+  private Thread moveIntermediateToDoneThread = null;
+  private MoveIntermediateToDoneRunnable moveIntermediateToDoneRunnable = null;
+  private ScheduledThreadPoolExecutor cleanerScheduledExecutor = null;
+  
+  /*
+   * TODO
+   * Fix completion time in JobFinishedEvent
+   */
+  
+  /**
+   * Writes out files to the path
+   * .....${DONE_DIR}/VERSION_STRING/YYYY/MM/DD/HH/SERIAL_NUM/jh{index_entries}.jhist
+   */
+
+  public void init() throws IOException {
+    LOG.info("JobHistory Init");
+    debugMode = conf.getBoolean(YarnMRJobConfig.HISTORY_DEBUG_MODE_KEY, false);
+    serialNumberLowDigits = debugMode ? 1 : 3;
+    serialNumberFormat = ("%0"
+        + (JobHistoryUtils.SERIAL_NUMBER_DIRECTORY_DIGITS + serialNumberLowDigits) + "d");
+
+    
+    String doneDirPrefix = JobHistoryUtils
+        .getConfiguredHistoryServerDoneDirPrefix(conf);
+    try {
+      doneDirPrefixPath = FileContext.getFileContext(conf).makeQualified(
+          new Path(doneDirPrefix));
+      doneDirFc = FileContext.getFileContext(doneDirPrefixPath.toUri(), conf);
+      if (!doneDirFc.util().exists(doneDirPrefixPath)) {
+        try {
+          doneDirFc.mkdir(doneDirPrefixPath, new FsPermission(
+              JobHistoryUtils.HISTORY_DIR_PERMISSION), true);
+        } catch (FileAlreadyExistsException e) {
+          LOG.info("JobHistory Done Directory: [" + doneDirPrefixPath
+              + "] already exists.");
+        }
+      }
+    } catch (IOException e) {
+      LOG.info("error creating done directory on dfs " + e);
+      throw e;
+    }
+
+    String doneDirWithVersion = JobHistoryUtils
+        .getCurrentDoneDir(doneDirPrefix);
+    try {
+      Path doneDirWithVersionPath = FileContext.getFileContext(conf)
+          .makeQualified(new Path(doneDirWithVersion));
+      if (!doneDirFc.util().exists(doneDirWithVersionPath)) {
+        try {
+          doneDirFc.mkdir(doneDirWithVersionPath, new FsPermission(
+              JobHistoryUtils.HISTORY_DIR_PERMISSION), true);
+        } catch (FileAlreadyExistsException e) {
+          LOG.info("JobHistory Done Directory: [" + doneDirPrefixPath
+              + "] already exists.");
+        }
+      }
+    } catch (IOException e) {
+      LOG.info("error creating done_version directory on dfs " + e);
+      throw e;
+    }
+
+    String intermediateDoneDirPrefix = JobHistoryUtils
+    .getConfiguredHistoryIntermediateDoneDirPrefix(conf);
+    String intermediateDoneDir = JobHistoryUtils
+    .getCurrentDoneDir(intermediateDoneDirPrefix);
+    try {
+      intermediateDoneDirPath = FileContext.getFileContext(conf)
+          .makeQualified(new Path(intermediateDoneDir));
+      intermediaDoneDirFc = FileContext.getFileContext(
+          intermediateDoneDirPath.toUri(), conf);
+      if (!intermediaDoneDirFc.util().exists(intermediateDoneDirPath)) {
+        try {
+          intermediaDoneDirFc.mkdir(intermediateDoneDirPath,
+              new FsPermission(JobHistoryUtils.HISTORY_DIR_PERMISSION), true);
+        } catch (FileAlreadyExistsException e) {
+          LOG.info("Intermediate JobHistory Done Directory: ["
+              + intermediateDoneDirPath + "] already exists.");
+        }
+      }
+
+    } catch (IOException e) {
+      LOG.info("error creating done directory on dfs " + e);
+      throw e;
+    }
+    
+    
+    
+    jobListCacheSize = conf.getInt(YarnMRJobConfig.HISTORY_SERVER_JOBLIST_CACHE_SIZE_KEY, DEFAULT_JOBLIST_CACHE_SIZE);
+    loadedJobCacheSize = conf.getInt(YarnMRJobConfig.HISTORY_SERVER_LOADED_JOB_CACHE_SIZE_KEY, DEFAULT_LOADEDJOB_CACHE_SIZE);
+    dateStringCacheSize = conf.getInt(YarnMRJobConfig.HISTORY_SERVER_DATESTRING_CACHE_SIZE_KEY, DEFAULT_DATESTRING_CACHE_SIZE);
+    moveThreadInterval = conf.getLong(YarnMRJobConfig.HISTORY_SERVER_DATESTRING_CACHE_SIZE_KEY, DEFAULT_MOVE_THREAD_INTERVAL);
+    numMoveThreads = conf.getInt(YarnMRJobConfig.HISTORY_SERVER_NUM_MOVE_THREADS, DEFAULT_MOVE_THREAD_COUNT);
+
+    initExisting();
+  }
+  
+  public void start() {
+    //Start moveIntermediatToDoneThread
+    moveIntermediateToDoneRunnable = new MoveIntermediateToDoneRunnable(moveThreadInterval, numMoveThreads);
+    moveIntermediateToDoneThread = new Thread(moveIntermediateToDoneRunnable);
+    moveIntermediateToDoneThread.setName("MoveIntermediateToDoneScanner");
+    moveIntermediateToDoneThread.start();
+    
+    //Start historyCleaner
+    long maxAgeOfHistoryFiles = conf.getLong(
+        YarnMRJobConfig.HISTORY_MAXAGE, DEFAULT_HISTORY_MAX_AGE);
+    cleanerScheduledExecutor = new ScheduledThreadPoolExecutor(1);
+    long runInterval = conf.getLong(YarnMRJobConfig.HISTORY_CLEANER_RUN_INTERVAL, DEFAULT_RUN_INTERVAL);
+    cleanerScheduledExecutor.scheduleAtFixedRate(new HistoryCleaner(maxAgeOfHistoryFiles), 30*1000l, runInterval, TimeUnit.MILLISECONDS);
+  }
+  
+  public void stop() {
+    LOG.info("Stopping JobHistory");
+    if (moveIntermediateToDoneThread != null) {
+      LOG.info("Stopping move thread");
+      moveIntermediateToDoneRunnable.stop();
+      moveIntermediateToDoneThread.interrupt();
+      try {
+        LOG.info("Joining on move thread");
+        moveIntermediateToDoneThread.join();
+      } catch (InterruptedException e) {
+        LOG.info("Interrupted while stopping move thread");
+      }
+    }
+
+    if (cleanerScheduledExecutor != null) {
+      LOG.info("Stopping History Cleaner");
+      cleanerScheduledExecutor.shutdown();
+      boolean interrupted = false;
+      long currentTime = System.currentTimeMillis();
+      while (!cleanerScheduledExecutor.isShutdown()
+          && System.currentTimeMillis() > currentTime + 1000l && !interrupted) {
+        try {
+          Thread.sleep(20);
+        } catch (InterruptedException e) {
+          interrupted = true;
+        }
+      }
+      if (!cleanerScheduledExecutor.isShutdown()) {
+        LOG.warn("HistoryCleanerService shutdown may not have succeeded");
+      }
+    }
+  }
+  
+  public JobHistory(Configuration conf) throws IOException {
     this.appID = RecordFactoryProvider.getRecordFactory(conf)
         .newRecordInstance(ApplicationId.class);
+    this.conf = conf;
+    init();
+  }
+  
+  /**
+   * Populates index data structures.
+   * Should only be called at initialization times.
+   */
+  @SuppressWarnings("unchecked")
+  private void initExisting() throws IOException {
+    List<FileStatus> timestampedDirList = findTimestampedDirectories();
+    Collections.sort(timestampedDirList);
+    for (FileStatus fs : timestampedDirList) {
+      //TODO Could verify the correct format for these directories.
+      addDirectoryToSerialNumberIndex(fs.getPath());
+      addDirectoryToJobListCache(fs.getPath());
+    }
+  }
+  
+  private void removeDirectoryFromSerialNumberIndex(Path serialDirPath) {
+    String serialPart = serialDirPath.getName();
+    String timeStampPart = JobHistoryUtils.getTimestampPartFromPath(serialDirPath.toString());
+    if (timeStampPart == null) {
+      LOG.warn("Could not find timestamp portion from path: " + serialDirPath.toString() +". Continuing with next");
+      return;
+    }
+    if (serialPart == null) {
+      LOG.warn("Could not find serial portion from path: " + serialDirPath.toString() + ". Continuing with next");
+      return;
+    }
+    synchronized (idToDateString) {
+      if (idToDateString.containsKey(serialPart)) {
+        Set<String> set = idToDateString.get(serialPart);
+        set.remove(timeStampPart);
+        if (set.isEmpty()) {
+          idToDateString.remove(serialPart);
+        }
+      }
+  }
+  }
+  
+  private void addDirectoryToSerialNumberIndex(Path serialDirPath) {
+    String serialPart = serialDirPath.getName();
+    String timestampPart = JobHistoryUtils.getTimestampPartFromPath(serialDirPath.toString());
+    if (timestampPart == null) {
+      LOG.warn("Could not find timestamp portion from path: " + serialDirPath.toString() +". Continuing with next");
+      return;
+    }
+    if (serialPart == null) {
+      LOG.warn("Could not find serial portion from path: " + serialDirPath.toString() + ". Continuing with next");
+    }
+    addToSerialNumberIndex(serialPart, timestampPart);
+  }
+
+  private void addToSerialNumberIndex(String serialPart, String timestampPart) {
+    synchronized (idToDateString) {
+      if (!idToDateString.containsKey(serialPart)) {
+        idToDateString.put(serialPart, new HashSet<String>());
+        if (idToDateString.size() > dateStringCacheSize) {
+          idToDateString.remove(idToDateString.firstKey());
+        }
+      }
+      Set<String> datePartSet = idToDateString.get(serialPart);
+      datePartSet.add(timestampPart);
+    }
+  }
+  
+  private void addDirectoryToJobListCache(Path path) throws IOException {
+    List<FileStatus> historyFileList = scanDirectoryForHistoryFiles(path,
+        doneDirFc);
+    for (FileStatus fs : historyFileList) {
+      JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath()
+          .getName());
+      String confFileName = JobHistoryUtils
+          .getIntermediateConfFileName(jobIndexInfo.getJobId());
+      MetaInfo metaInfo = new MetaInfo(fs.getPath(), new Path(fs.getPath()
+          .getParent(), confFileName), jobIndexInfo);
+      addToJobListCache(jobIndexInfo.getJobId(), metaInfo);
+    }
+  }
+  
+  private static List<FileStatus> scanDirectory(Path path, FileContext fc, PathFilter pathFilter) throws IOException {
+    path = fc.makeQualified(path);
+    List<FileStatus> jhStatusList = new ArrayList<FileStatus>();
+      RemoteIterator<FileStatus> fileStatusIter = fc.listStatus(path);
+      while (fileStatusIter.hasNext()) {
+        FileStatus fileStatus = fileStatusIter.next();
+        Path filePath = fileStatus.getPath();
+        if (fileStatus.isFile() && pathFilter.accept(filePath)) {
+          jhStatusList.add(fileStatus);
+        }
+      }    
+    return jhStatusList;
+  }
+  
+  private static List<FileStatus> scanDirectoryForHistoryFiles(Path path, FileContext fc) throws IOException {
+    return scanDirectory(path, fc, JobHistoryUtils.getHistoryFileFilter());
   }
+  
+  /**
+   * Finds all history directories with a timestamp component by scanning 
+   * the filesystem.
+   * Used when the JobHistory server is started.
+   * @return
+   */
+  private List<FileStatus> findTimestampedDirectories() throws IOException {
+    List<FileStatus> fsList = JobHistoryUtils.localGlobber(doneDirFc, doneDirPrefixPath, DONE_BEFORE_SERIAL_TAIL);
+    return fsList;
+  }
+    
+  /**
+   * Adds an entry to the job list cache. Maintains the size.
+   */
+  private void addToJobListCache(JobId jobId, MetaInfo metaInfo) {
+    jobListCache.put(jobId, metaInfo);
+    if (jobListCache.size() > jobListCacheSize) {
+      jobListCache.remove(jobListCache.firstKey());
+    }
+  }
+
+  /**
+   * Adds an entry to the loaded job cache. Maintains the size.
+   */
+  private void  addToLoadedJobCache(Job job) {
+    synchronized(loadedJobCache) {
+      loadedJobCache.put(job.getID(), job);
+      if (loadedJobCache.size() > loadedJobCacheSize ) {
+        loadedJobCache.remove(loadedJobCache.firstKey());
+      }
+    }
+  }
+  
+  /**
+   * Populates files from the intermediate directory into the intermediate cache.
+   * @throws IOException
+   */
+  private void scanIntermediateDirectory() throws IOException {
+    List<FileStatus> fileStatusList = scanDirectoryForHistoryFiles(intermediateDoneDirPath, intermediaDoneDirFc);
+    for (FileStatus fs : fileStatusList) {
+      JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath().getName());
+      String doneFileName = JobHistoryUtils.getIntermediateDoneFileName(jobIndexInfo.getJobId());
+      if (intermediaDoneDirFc.util().exists(intermediaDoneDirFc.makeQualified(new Path(intermediateDoneDirPath, doneFileName)))) {
+        String confFileName = JobHistoryUtils.getIntermediateConfFileName(jobIndexInfo.getJobId());
+        MetaInfo metaInfo = new MetaInfo(fs.getPath(), new Path(fs.getPath().getParent(), confFileName), jobIndexInfo);
+        if (!intermediateListCache.containsKey(jobIndexInfo.getJobId())) {
+          intermediateListCache.put(jobIndexInfo.getJobId(), metaInfo);
+        }  
+      }
+    }
+  }
+
+  /**
+   * Checks for the existance of the done file in the intermediate done
+   * directory for the specified jobId.
+   * 
+   * @param jobId the jobId.
+   * @return true if a done file exists for the specified jobId.
+   * @throws IOException
+   */
+  private boolean doneFileExists(JobId jobId) throws IOException {
+    String doneFileName = JobHistoryUtils.getIntermediateDoneFileName(jobId);
+    Path qualifiedDoneFilePath = intermediaDoneDirFc.makeQualified(new Path(intermediateDoneDirPath, doneFileName));
+    if (intermediaDoneDirFc.util().exists(qualifiedDoneFilePath)) {
+      return true;
+    }
+    return false;
+  }
+  
+  /**
+   * Searches the job history file FileStatus list for the specified JobId.
+   * 
+   * @param fileStatusList fileStatus list of Job History Files.
+   * @param jobId The JobId to find.
+   * @param checkForDoneFile whether to check for the existance of a done file.
+   * @return A MetaInfo object for the jobId, null if not found.
+   * @throws IOException
+   */
+  private MetaInfo getJobMetaInfo(List<FileStatus> fileStatusList, JobId jobId, boolean checkForDoneFile) throws IOException {
+    for (FileStatus fs : fileStatusList) {
+      JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath().getName());
+      if (jobIndexInfo.getJobId().equals(jobId)) {
+        if (checkForDoneFile) {
+          if (!doneFileExists(jobIndexInfo.getJobId())) {
+            return null;
+          }
+        }
+        String confFileName = JobHistoryUtils
+            .getIntermediateConfFileName(jobIndexInfo.getJobId());
+        MetaInfo metaInfo = new MetaInfo(fs.getPath(), new Path(fs.getPath()
+            .getParent(), confFileName), jobIndexInfo);
+        return metaInfo;
+      }
+    }
+    return null;
+  }
+  
+  /**
+   * Scans old directories known by the idToDateString map for the specified 
+   * jobId.
+   * If the number of directories is higher than the supported size of the
+   * idToDateString cache, the jobId will not be found.
+   * @param jobId the jobId.
+   * @return
+   * @throws IOException
+   */
+  private MetaInfo scanOldDirsForJob(JobId jobId) throws IOException {
+    int jobSerialNumber = JobHistoryUtils.jobSerialNumber(jobId);
+    Integer boxedSerialNumber = jobSerialNumber;
+    Set<String> dateStringSet = idToDateString.get(boxedSerialNumber);
+    if (dateStringSet == null) {
+      return null;
+    }
+    for (String timestampPart : dateStringSet) {
+      Path logDir = canonicalHistoryLogPath(jobId, timestampPart);
+      List<FileStatus> fileStatusList = scanDirectoryForHistoryFiles(logDir, doneDirFc);
+      MetaInfo metaInfo = getJobMetaInfo(fileStatusList, jobId, false);
+      if (metaInfo != null) {
+        return metaInfo;
+      }
+    }
+    return null;
+  }
+  
+  /**
+   * Checks for the existence of the job history file in the interemediate directory.
+   * @param jobId
+   * @return
+   * @throws IOException
+   */
+  private MetaInfo scanIntermediateForJob(JobId jobId) throws IOException {
+    MetaInfo matchedMi = null;
+    List<FileStatus> fileStatusList = scanDirectoryForHistoryFiles(intermediateDoneDirPath, intermediaDoneDirFc);
+    
+    MetaInfo metaInfo = getJobMetaInfo(fileStatusList, jobId, true);
+    if (metaInfo == null) {
+      return null;
+    }
+    JobIndexInfo jobIndexInfo = metaInfo.getJobIndexInfo();
+    if (!intermediateListCache.containsKey(jobIndexInfo.getJobId())) {
+      intermediateListCache.put(jobIndexInfo.getJobId(), metaInfo);
+      matchedMi = metaInfo;
+    } else {
+      matchedMi = intermediateListCache.get(jobId);
+    }
+    return matchedMi;
+  }
+  
+  
+  
+  private class MoveIntermediateToDoneRunnable implements Runnable {
+
+    private long sleepTime;
+    private ThreadPoolExecutor moveToDoneExecutor = null;
+    private boolean running = false;
+    
+    public void stop() {
+      running = false;
+    }
+    
+    MoveIntermediateToDoneRunnable(long sleepTime, int numMoveThreads) {
+      this.sleepTime = sleepTime;
+      moveToDoneExecutor = new ThreadPoolExecutor(1, numMoveThreads, 1, TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>());
+      running = true;
+    }
+  
   @Override
-  public synchronized Job getJob(JobId jobId) {
-    Job job = completedJobCache.get(jobId);
-    if (job == null) {
+    public void run() {
+      Thread.currentThread().setName("IntermediateHistoryScanner");
+      try {
+        while (running) {
+          LOG.info("Starting scan to move intermediate done files");
+          scanIntermediateDirectory();
+          for (final MetaInfo metaInfo : intermediateListCache.values()) {
+            moveToDoneExecutor.execute(new Runnable() {
+              @Override
+              public void run() {
+                moveToDone(metaInfo);
+              }
+            });
+
+          }
+          synchronized (this) { // TODO Is this really required.
+            try {
+              this.wait(sleepTime);
+            } catch (InterruptedException e) {
+              LOG.info("IntermediateHistoryScannerThread interrupted");
+            }
+          }
+        }
+      } catch (IOException e) {
+        LOG.warn("Unable to get a list of intermediate files to be moved from: "
+            + intermediateDoneDirPath);
+      }
+    }
+  }
+  
+  private Job loadJob(MetaInfo metaInfo) {
+    synchronized(metaInfo) {
       try {
-        job = new CompletedJob(conf, jobId);
+        Job job = new CompletedJob(conf, metaInfo.getJobIndexInfo().getJobId(), metaInfo.getHistoryFile(), true);
+        addToLoadedJobCache(job);
+        return job;
       } catch (IOException e) {
-        LOG.warn("HistoryContext getJob failed " + e);
         throw new YarnException(e);
       }
-      completedJobCache.put(jobId, job);
-      jobQ.add(job);
-      if (jobQ.size() > retiredJobsCacheSize) {
-         Job removed = jobQ.remove();
-         completedJobCache.remove(removed.getID());
+    }
+  }
+  
+  private SortedMap<JobId, JobIndexInfo> getAllJobsMetaInfo() {
+    SortedMap<JobId, JobIndexInfo> result = new TreeMap<JobId, JobIndexInfo>(JOB_ID_COMPARATOR);
+      try {
+      scanIntermediateDirectory();
+      } catch (IOException e) {
+      LOG.warn("Failed to scan intermediate directory", e);
+        throw new YarnException(e);
+      }
+    for (JobId jobId : intermediateListCache.keySet()) {
+      MetaInfo mi = intermediateListCache.get(jobId);
+      if (mi != null) {
+        result.put(jobId, mi.getJobIndexInfo());
+      }
+    }
+    for (JobId jobId : jobListCache.keySet()) {
+      MetaInfo mi = jobListCache.get(jobId);
+      if (mi != null) {
+        result.put(jobId, mi.getJobIndexInfo());
+      }
+    }
+    return result;
+  }
+  
+  private Map<JobId, Job> getAllJobsInternal() {
+    //TODO This should ideally be using getAllJobsMetaInfo
+    // or get rid of that method once Job has APIs for user, finishTime etc.
+    SortedMap<JobId, Job> result = new TreeMap<JobId, Job>(JOB_ID_COMPARATOR);
+    try {
+      scanIntermediateDirectory();
+    } catch (IOException e) {
+      LOG.warn("Failed to scan intermediate directory", e);
+      throw new YarnException(e);
+    }
+    for (JobId jobId : intermediateListCache.keySet()) {
+      MetaInfo mi = intermediateListCache.get(jobId);
+      if (mi != null) {
+        result.put(jobId, new PartialJob(mi.getJobIndexInfo(), mi
+            .getJobIndexInfo().getJobId()));
+      }
+    }
+    for (JobId jobId : jobListCache.keySet()) {
+      MetaInfo mi = jobListCache.get(jobId);
+      if (mi != null) {
+        result.put(jobId, new PartialJob(mi.getJobIndexInfo(), mi
+            .getJobIndexInfo().getJobId()));
+      }
+    }
+    return result;
+  }
+
+  /**
+   * Helper method for test cases.
+   */
+  MetaInfo getJobMetaInfo(JobId jobId) throws IOException {
+    //MetaInfo available in cache.
+    MetaInfo metaInfo = null;
+    if (jobListCache.containsKey(jobId)) {
+      metaInfo = jobListCache.get(jobId);
+    }
+
+    if (metaInfo != null) {
+      return metaInfo;
+    }
+    
+    //MetaInfo not available. Check intermediate directory for meta info.
+    metaInfo = scanIntermediateForJob(jobId);
+    if (metaInfo != null) {
+      return metaInfo;
+    }
+    
+    //Intermediate directory does not contain job. Search through older ones.
+    metaInfo = scanOldDirsForJob(jobId);
+    if (metaInfo != null) {
+      return metaInfo;
+    }
+    return null;
+  }
+  
+  private Job findJob(JobId jobId) throws IOException {
+    //Job already loaded.
+    synchronized (loadedJobCache) {
+      if (loadedJobCache.containsKey(jobId)) {
+        return loadedJobCache.get(jobId);
       }
     }
+    
+    //MetaInfo available in cache.
+    MetaInfo metaInfo = null;
+    if (jobListCache.containsKey(jobId)) {
+      metaInfo = jobListCache.get(jobId);
+    }
+
+    if (metaInfo != null) {
+      return loadJob(metaInfo);
+    }
+    
+    //MetaInfo not available. Check intermediate directory for meta info.
+    metaInfo = scanIntermediateForJob(jobId);
+    if (metaInfo != null) {
+      return loadJob(metaInfo);
+    }
+    
+    //Intermediate directory does not contain job. Search through older ones.
+    metaInfo = scanOldDirsForJob(jobId);
+    if (metaInfo != null) {
+      return loadJob(metaInfo);
+    }
+    return null;
+  }
+  
+  /**
+   * Searches cached jobs for the specified criteria (AND). Ignores the criteria if null.
+   * @param soughtUser
+   * @param soughtJobNameSubstring
+   * @param soughtDateStrings
+   * @return
+   */
+  private Map<JobId, Job> findJobs(String soughtUser, String soughtJobNameSubstring, String[] soughtDateStrings) {
+    boolean searchUser = true;
+    boolean searchJobName = true;
+    boolean searchDates = true;
+    List<Calendar> soughtCalendars = null;
+    
+    if (soughtUser == null) {
+      searchUser = false;
+    }
+    if (soughtJobNameSubstring == null) {
+      searchJobName = false; 
+    }
+    if (soughtDateStrings == null) {
+      searchDates = false;
+    } else {
+      soughtCalendars = getSoughtDateAsCalendar(soughtDateStrings);
+    }
+    
+    Map<JobId, Job> resultMap = new TreeMap<JobId, Job>();
+    
+    SortedMap<JobId, JobIndexInfo> allJobs = getAllJobsMetaInfo();
+    for (JobId jobId : allJobs.keySet()) {
+      JobIndexInfo indexInfo = allJobs.get(jobId);
+      String jobName = indexInfo.getJobName();
+      String jobUser = indexInfo.getUser();
+      long finishTime = indexInfo.getFinishTime();
+    
+      if (searchUser) {
+        if (!soughtUser.equals(jobUser)) {
+          continue;
+        }
+      }
+      
+      if (searchJobName) {
+        if (!jobName.contains(soughtJobNameSubstring)) {
+          continue;
+        }
+      }
+      
+      if (searchDates) {
+        boolean matchedDate = false;
+        Calendar jobCal = Calendar.getInstance();
+        jobCal.setTimeInMillis(finishTime);
+        for (Calendar cal : soughtCalendars) {
+          if (jobCal.get(Calendar.YEAR) == cal.get(Calendar.YEAR) &&
+              jobCal.get(Calendar.MONTH) == cal.get(Calendar.MONTH) &&
+              jobCal.get(Calendar.DAY_OF_MONTH) == cal.get(Calendar.DAY_OF_MONTH)) {
+            matchedDate = true;
+            break;
+          }
+        }
+        if (!matchedDate) {
+          break;
+        }
+      }
+      resultMap.put(jobId, new PartialJob(indexInfo, jobId));
+    }
+    return resultMap;
+  }
+  
+  private List<Calendar> getSoughtDateAsCalendar(String [] soughtDateStrings) {
+    List<Calendar> soughtCalendars = new ArrayList<Calendar>();
+    for (int i = 0 ; i < soughtDateStrings.length ; i++) {
+      String soughtDate = soughtDateStrings[i];
+      if (soughtDate.length() != 0) {
+        Matcher m = DATE_PATTERN.matcher(soughtDate);
+        if (m.matches()) {
+          String yyyyPart = m.group(3);
+          String mmPart = m.group(1);
+          String ddPart = m.group(2);
+          
+          if (yyyyPart.length() == 2) {
+            yyyyPart = "20" + yyyyPart;
+          }
+          if (mmPart.length() == 1) {
+            mmPart = "0" + mmPart;
+          }
+          if (ddPart.length() == 1) {
+            ddPart = "0" + ddPart;
+          }
+          Calendar soughtCal = Calendar.getInstance();
+          soughtCal.set(Calendar.YEAR, Integer.parseInt(yyyyPart));
+          soughtCal.set(Calendar.MONTH, Integer.parseInt(mmPart) - 1);
+          soughtCal.set(Calendar.DAY_OF_MONTH, Integer.parseInt(ddPart) -1);
+          soughtCalendars.add(soughtCal);
+        }
+      }
+    }
+    return soughtCalendars;
+  }
+  
+  
+
+  
+  private void moveToDone(MetaInfo metaInfo) {
+    long completeTime = metaInfo.getJobIndexInfo().getFinishTime();
+    if (completeTime == 0) completeTime = System.currentTimeMillis();
+    JobId jobId = metaInfo.getJobIndexInfo().getJobId();
+    
+    List<Path> paths = new ArrayList<Path>();
+    Path historyFile = metaInfo.getHistoryFile();
+    if (historyFile == null) {
+      LOG.info("No file for job-history with " + jobId + " found in cache!");
+    } else {
+      paths.add(historyFile);
+    }
+    
+    Path confFile = metaInfo.getConfFile();
+    if (confFile == null) {
+      LOG.info("No file for jobConf with " + jobId + " found in cache!");
+    } else {
+      paths.add(confFile);
+    }
+    
+    Path targetDir = canonicalHistoryLogPath(jobId, completeTime);
+    addDirectoryToSerialNumberIndex(targetDir);
+    try {
+      maybeMakeSubdirectory(targetDir);
+    } catch (IOException e) {
+      LOG.info("Failed creating subdirectory: " + targetDir + " while attempting to move files for jobId: " + jobId);
+      return;
+    }
+    synchronized (metaInfo) {
+      if (historyFile != null) {
+        Path toPath = doneDirFc.makeQualified(new Path(targetDir, historyFile.getName()));
+        try {
+          moveToDoneNow(historyFile, toPath);
+        } catch (IOException e) {
+          LOG.info("Failed to move file: " + historyFile + " for jobId: " + jobId);
+          return;
+        }
+        metaInfo.setHistoryFile(toPath);
+      }
+      if (confFile != null) {
+        Path toPath = doneDirFc.makeQualified(new Path(targetDir, confFile.getName()));
+        try {
+          moveToDoneNow(confFile, toPath);
+        } catch (IOException e) {
+          LOG.info("Failed to move file: " + historyFile + " for jobId: " + jobId);
+          return;
+        }
+        metaInfo.setConfFile(toPath);
+      }
+    }
+    //TODO Does this need to be synchronized ?
+    Path doneFileToDelete = intermediaDoneDirFc.makeQualified(new Path(intermediateDoneDirPath, JobHistoryUtils.getIntermediateDoneFileName(jobId)));
+    try {
+      intermediaDoneDirFc.delete(doneFileToDelete, false);
+    } catch (IOException e) {
+      LOG.info("Unable to remove done file: " + doneFileToDelete);
+    }
+    addToJobListCache(jobId, metaInfo);
+    intermediateListCache.remove(jobId);
+  }
+  
+  private void moveToDoneNow(Path src, Path target) throws IOException {
+    LOG.info("Moving " + src.toString() + " to " + target.toString());
+    intermediaDoneDirFc.util().copy(src, target);
+    intermediaDoneDirFc.delete(src, false);
+    doneDirFc.setPermission(target,
+        new FsPermission(JobHistoryUtils.HISTORY_FILE_PERMISSION));
+  }
+  
+  private void maybeMakeSubdirectory(Path path) throws IOException {
+    boolean existsInExistingCache = false;
+    synchronized(existingDoneSubdirs) {
+      if (existingDoneSubdirs.contains(path)) existsInExistingCache = true;
+    }
+    try {
+      doneDirFc.getFileStatus(path);
+      if (!existsInExistingCache) {
+        existingDoneSubdirs.add(path);
+        if (debugMode) {
+          LOG.info("JobHistory.maybeMakeSubdirectory -- We believed "
+                             + path + " already existed, but it didn't.");
+        }
+      }
+    } catch (FileNotFoundException fnfE) {
+      try {
+        doneDirFc.mkdir(path, new FsPermission(JobHistoryUtils.HISTORY_DIR_PERMISSION), true);
+        synchronized(existingDoneSubdirs) {
+          existingDoneSubdirs.add(path);
+        }
+      } catch (FileAlreadyExistsException faeE) { //Nothing to do.
+      }
+    }
+  }
+  
+  private Path canonicalHistoryLogPath(JobId id, String timestampComponent) {
+    return new Path(doneDirPrefixPath, JobHistoryUtils.historyLogSubdirectory(id, timestampComponent, serialNumberFormat));
+  }
+  
+  private Path canonicalHistoryLogPath(JobId id, long millisecondTime) {
+    String timestampComponent = JobHistoryUtils.timestampDirectoryComponent(millisecondTime, debugMode);
+    return new Path(doneDirPrefixPath, JobHistoryUtils.historyLogSubdirectory(id, timestampComponent, serialNumberFormat));
+  }  
+  
+  @Override
+  public synchronized Job getJob(JobId jobId) {
+    Job job = null;
+    try {
+      job = findJob(jobId);
+    } catch (IOException e) {
+      throw new YarnException(e);
+    }
     return job;
   }
 
   @Override
   public Map<JobId, Job> getAllJobs(ApplicationId appID) {
-    //currently there is 1 to 1 mapping between app and job id
+    LOG.info("Called getAllJobs(AppId): " + appID);
+//    currently there is 1 to 1 mapping between app and job id
     org.apache.hadoop.mapreduce.JobID oldJobID = TypeConverter.fromYarn(appID);
     Map<JobId, Job> jobs = new HashMap<JobId, Job>();
     JobId jobID = TypeConverter.toYarn(oldJobID);
     jobs.put(jobID, getJob(jobID));
     return jobs;
+//    return getAllJobs();
   }
   
-  //TODO FIX ME use indexed search so we do not reload the 
-  // previously processed files
-  @Override
+  /* (non-Javadoc)
+   * @see org.apache.hadoop.mapreduce.v2.hs.HistoryContext#getAllJobs()
+   * 
+   * Returns a recent list of jobs. This may not be the complete set.
+   * If a previous jobId is known - it can be queries via the getJob(JobId)
+   * method.
+   * Size of this list is determined by the size of the job list cache.
+   * This can be fixed when pagination is implemented - return the first set of
+   * jobs via the cache, go to DFS only when an attempt is made to navigate
+   * past the cached list.
+   * This does involve a DFS oepration of scanning the intermediate directory.
+   */
   public Map<JobId, Job> getAllJobs() {
-    //currently there is 1 to 1 mapping between app and job id
-    Map<JobId, Job> jobs = new HashMap<JobId, Job>();
-    String jobhistoryDir = JobHistoryUtils.getConfiguredHistoryDoneDirPrefix(conf);
+    return getAllJobsInternal();
+        }
+
+  
+  
+  
+  
+  
+  static class MetaInfo {
+    private Path historyFile;
+    private Path confFile; 
+    JobIndexInfo jobIndexInfo;
+
+    MetaInfo(Path historyFile, Path confFile, JobIndexInfo jobIndexInfo) {
+      this.historyFile = historyFile;
+      this.confFile = confFile;
+      this.jobIndexInfo = jobIndexInfo;
+      }
+
+    Path getHistoryFile() { return historyFile; }
+    Path getConfFile() { return confFile; }
+    JobIndexInfo getJobIndexInfo() { return jobIndexInfo; }
+    
+    void setHistoryFile(Path historyFile) { this.historyFile = historyFile; }
+    void setConfFile(Path confFile) {this.confFile = confFile; }
+  }
+  
+
+  public class HistoryCleaner implements Runnable {
+    private long currentTime;
     
-    String currentJobHistoryDoneDir = JobHistoryUtils.getCurrentDoneDir(jobhistoryDir);
+    long maxAgeMillis;
+    long filesDeleted = 0;
+    long dirsDeleted = 0;
     
-    try {
-      Path done = FileContext.getFileContext(conf).makeQualified(
-          new Path(currentJobHistoryDoneDir));
-      FileContext doneDirFc = FileContext.getFileContext(done.toUri(), conf);
-      RemoteIterator<LocatedFileStatus> historyFiles = doneDirFc.util()
-          .listFiles(done, true);
-      if (historyFiles != null) {
-        FileStatus f;
-        while (historyFiles.hasNext()) {
-          f = historyFiles.next();
-          if (f.isDirectory()) continue;
-          if (!f.getPath().getName().endsWith(JobHistoryUtils.JOB_HISTORY_FILE_EXTENSION)) continue;
-          //TODO_JH_Change to parse the name properly
-          String fileName = f.getPath().getName();
-          String jobName = fileName.substring(0, fileName.indexOf(JobHistoryUtils.JOB_HISTORY_FILE_EXTENSION));
-          LOG.info("Processing job: " + jobName);
-          org.apache.hadoop.mapreduce.JobID oldJobID = JobID.forName(jobName);
-          JobId jobID = TypeConverter.toYarn(oldJobID);
-          Job job = new CompletedJob(conf, jobID, false);
-          jobs.put(jobID, job);
-          // completedJobCache.put(jobID, job);
-        }
+    public HistoryCleaner(long maxAge) {
+      this.maxAgeMillis = maxAge;
+    }
+    
+    @SuppressWarnings("unchecked")
+    public void run() {
+      LOG.info("History Cleaner started");
+      currentTime = System.currentTimeMillis();
+      boolean halted = false;
+      //TODO Delete YYYY/MM/DD directories.
+      try {
+        List<FileStatus> serialDirList = findTimestampedDirectories();
+        //Sort in ascending order. Relies on YYYY/MM/DD/Serial
+        Collections.sort(serialDirList);
+        for (FileStatus serialDir : serialDirList) {
+          List<FileStatus> historyFileList = scanDirectoryForHistoryFiles(serialDir.getPath(), doneDirFc);
+          for (FileStatus historyFile : historyFileList) {
+            JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(historyFile.getPath().getName());
+            long effectiveTimestamp = getEffectiveTimestamp(jobIndexInfo.getFinishTime(), historyFile);
+            if (shouldDelete(effectiveTimestamp)) {
+              String confFileName = JobHistoryUtils.getIntermediateConfFileName(jobIndexInfo.getJobId());
+              MetaInfo metaInfo = new MetaInfo(historyFile.getPath(), new Path(historyFile.getPath().getParent(), confFileName), jobIndexInfo);
+              delete(metaInfo);
+            } else {
+              halted = true;
+              break;
+            }
+          }
+          if (!halted) {
+            deleteDir(serialDir.getPath());
+            removeDirectoryFromSerialNumberIndex(serialDir.getPath());
+            synchronized (existingDoneSubdirs) {
+              existingDoneSubdirs.remove(serialDir.getPath());  
+            }
+            
+          } else {
+            break; //Don't scan any more directories.
+    }
+  }
+      } catch (IOException e) {
+        LOG.warn("Error in History cleaner run", e);
       }
-    } catch (IOException ie) {
-      LOG.info("Error while creating historyFileMap" + ie);
+      LOG.info("History Cleaner complete");
+      LOG.info("FilesDeleted: " + filesDeleted);
+      LOG.info("Directories Deleted: " + dirsDeleted);
     }
-    return jobs;
+    
+    private boolean shouldDelete(long ts) {
+      return ((ts + maxAgeMillis) <= currentTime);
+    }
+    
+    private long getEffectiveTimestamp(long finishTime, FileStatus fileStatus) {
+      if (finishTime == 0) {
+        return fileStatus.getModificationTime();
+      }
+      return finishTime;
+    }
+    
+    private void delete(MetaInfo metaInfo) throws IOException {
+      deleteFile(metaInfo.getHistoryFile());
+      deleteFile(metaInfo.getConfFile());
+      jobListCache.remove(metaInfo.getJobIndexInfo().getJobId());
+      loadedJobCache.remove(metaInfo.getJobIndexInfo().getJobId());
+      //TODO Get rid of entries in the cache.
+    }
+    
+    private void deleteFile(Path path) throws IOException {
+      delete(path, false);
+      filesDeleted++;
+    }
+    
+    private void deleteDir(Path path) throws IOException {
+      delete(path, true);
+      dirsDeleted++;
+    }
+    
+    private void delete(Path path, boolean recursive) throws IOException {
+      doneDirFc.delete(doneDirFc.makeQualified(path), recursive);
+    }
+    
   }
+  
+  
+  
+  
+  
+  
+  
+  
+  //TODO AppContext - Not Required
+  private final ApplicationId appID;
   @Override
   public ApplicationId getApplicationID() {
+  //TODO fixme - bogus appID for now
     return appID;
   }
+  
+  //TODO AppContext - Not Required
   @Override
   public EventHandler getEventHandler() {
     // TODO Auto-generated method stub
     return null;
   }
+  
+  //TODO AppContext - Not Required
+  private String userName;
   @Override
   public CharSequence getUser() {
+    if (userName != null) {
+      userName = conf.get(MRJobConfig.USER_NAME, "history-user");
+    }
     return userName;
   }
   
-  
+  //TODO AppContext - Not Required
  @Override
  public Clock getClock() {
    return null;

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java?rev=1101385&r1=1101384&r2=1101385&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java Tue May 10 09:44:27 2011
@@ -18,11 +18,14 @@
 
 package org.apache.hadoop.mapreduce.v2.hs;
 
+import java.io.IOException;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.service.CompositeService;
 
@@ -33,8 +36,9 @@ import org.apache.hadoop.yarn.service.Co
  *****************************************************************/
 public class JobHistoryServer extends CompositeService {
   private static final Log LOG = LogFactory.getLog(JobHistoryServer.class);
+  private HistoryContext historyContext;
   private HistoryClientService clientService;
-  private HistoryCleanerService cleanerService;
+//  private HistoryCleanerService cleanerService;
 
   static{
     Configuration.addDefaultResource("mapred-default.xml");
@@ -47,14 +51,25 @@ public class JobHistoryServer extends Co
 
   public synchronized void init(Configuration conf) {
     Configuration config = new YarnConfiguration(conf);
-    HistoryContext history = new JobHistory(conf);
-    clientService = new HistoryClientService(history);
-    cleanerService = new HistoryCleanerService(config);
+    historyContext = null;
+    try {
+      historyContext = new JobHistory(conf);
+      ((JobHistory)historyContext).start();
+    } catch (IOException e) {
+      throw new YarnException(e);
+    }
+    clientService = new HistoryClientService(historyContext);
+//    cleanerService = new HistoryCleanerService(config);
     addService(clientService);
-    addService(cleanerService);
+//    addService(cleanerService);
     super.init(config);
   }
 
+  public void stop() {
+    ((JobHistory)historyContext).stop();
+    super.stop();
+  }
+
   public static void main(String[] args) {
     StringUtils.startupShutdownMessage(JobHistoryServer.class, args, LOG);
     JobHistoryServer server = null;

Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java?rev=1101385&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java Tue May 10 09:44:27 2011
@@ -0,0 +1,113 @@
+package org.apache.hadoop.mapreduce.v2.hs;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.mapreduce.JobACL;
+import org.apache.hadoop.mapreduce.v2.api.records.Counters;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
+import org.apache.hadoop.mapreduce.v2.app.job.Task;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+
+public class PartialJob implements org.apache.hadoop.mapreduce.v2.app.job.Job {
+
+  private JobIndexInfo jobIndexInfo = null;
+  private JobId jobId = null;
+  private JobReport jobReport = null;
+  
+  public PartialJob(JobIndexInfo jobIndexInfo, JobId jobId) {
+    this.jobIndexInfo = jobIndexInfo;
+    this.jobId = jobId;
+    jobReport = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(JobReport.class);
+  }
+  
+  @Override
+  public JobId getID() {
+//    return jobIndexInfo.getJobId();
+    return this.jobId;
+  }
+
+  @Override
+  public String getName() {
+    return jobIndexInfo.getJobName();
+  }
+
+  @Override
+  public JobState getState() {
+    return JobState.SUCCEEDED;
+  }
+
+  @Override
+  public JobReport getReport() {
+    return jobReport;
+  }
+
+  @Override
+  public Counters getCounters() {
+    return null;
+  }
+
+  @Override
+  public Map<TaskId, Task> getTasks() {
+    return null;
+  }
+
+  @Override
+  public Map<TaskId, Task> getTasks(TaskType taskType) {
+    return null;
+  }
+
+  @Override
+  public Task getTask(TaskId taskID) {
+    return null;
+  }
+
+  @Override
+  public List<String> getDiagnostics() {
+    return null;
+  }
+
+  @Override
+  public int getTotalMaps() {
+    return jobIndexInfo.getNumMaps();
+  }
+
+  @Override
+  public int getTotalReduces() {
+    return jobIndexInfo.getNumReduces();
+  }
+
+  @Override
+  public int getCompletedMaps() {
+    return jobIndexInfo.getNumMaps();
+  }
+
+  @Override
+  public int getCompletedReduces() {
+    return jobIndexInfo.getNumReduces();
+  }
+
+  @Override
+  public boolean isUber() {
+    return false;
+  }
+
+  @Override
+  public TaskAttemptCompletionEvent[] getTaskAttemptCompletionEvents(
+      int fromEventId, int maxEvents) {
+    return null;
+  }
+
+  @Override
+  public boolean checkAccess(UserGroupInformation callerUGI, JobACL jobOperation) {
+    return false;
+  }
+
+}

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEvents.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEvents.java?rev=1101385&r1=1101384&r2=1101385&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEvents.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEvents.java Tue May 10 09:44:27 2011
@@ -52,7 +52,7 @@ public class TestJobHistoryEvents {
   public void testHistoryEvents() throws Exception {
     Configuration conf = new Configuration();
     conf.set(MRJobConfig.USER_NAME, "test");
-    MRApp app = new MRApp(2, 1, true);
+    MRApp app = new MRApp(2, 1, true, this.getClass().getName(), true);
     app.submit(conf);
     Job job = app.getContext().getAllJobs().values().iterator().next();
     JobId jobId = job.getID();

Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java?rev=1101385&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java Tue May 10 09:44:27 2011
@@ -0,0 +1,128 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.hs;
+
+import java.io.IOException;
+
+import junit.framework.Assert;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.app.MRApp;
+import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo;
+import org.junit.Test;
+
+public class TestJobHistoryParsing {
+  private static final Log LOG = LogFactory.getLog(TestJobHistoryParsing.class);
+  //TODO FIX once final CompletedStatusStore is available
+//  private static final String STATUS_STORE_DIR_KEY =
+//    "yarn.server.nodemanager.jobstatus";
+  @Test
+  public void testHistoryParsing() throws Exception {
+    Configuration conf = new Configuration();
+    MRApp app = new MRApp(2, 1, true, this.getClass().getName(), true);
+    app.submit(conf);
+    Job job = app.getContext().getAllJobs().values().iterator().next();
+    JobId jobId = job.getID();
+    LOG.info("JOBID is " + TypeConverter.fromYarn(jobId).toString());
+    app.waitForState(job, JobState.SUCCEEDED);
+    app.stop();
+    
+    
+    String user =
+      conf.get(MRJobConfig.USER_NAME, System.getProperty("user.name"));
+    String jobhistoryDir = JobHistoryUtils.getConfiguredHistoryIntermediateDoneDirPrefix(conf);
+    JobHistory jobHistory = new JobHistory(conf);
+    
+    String currentJobHistoryDir = JobHistoryUtils.getCurrentDoneDir(jobhistoryDir);
+    JobIndexInfo jobIndexInfo = jobHistory.getJobMetaInfo(jobId).getJobIndexInfo();
+    String jobhistoryFileName = FileNameIndexUtils.getDoneFileName(jobIndexInfo);
+    
+    Path historyFilePath = new Path(currentJobHistoryDir, jobhistoryFileName);
+    FSDataInputStream in = null;
+    LOG.info("JobHistoryFile is: " + historyFilePath);
+    try {
+      FileContext fc = FileContext.getFileContext(conf);
+      in = fc.open(fc.makeQualified(historyFilePath));
+    } catch (IOException ioe) {
+      LOG.info("Can not open history file: " + historyFilePath, ioe);
+      throw (new Exception("Can not open History File"));
+    }
+    
+    JobHistoryParser parser = new JobHistoryParser(in);
+    JobInfo jobInfo = parser.parse();
+    
+    Assert.assertTrue ("Incorrect username ",
+        jobInfo.getUsername().equals("mapred"));
+    Assert.assertTrue("Incorrect jobName ",
+        jobInfo.getJobname().equals("test"));
+    Assert.assertTrue("Incorrect queuename ",
+        jobInfo.getJobQueueName().equals("default"));
+    Assert.assertTrue("incorrect conf path",
+        jobInfo.getJobConfPath().equals("test"));
+    Assert.assertTrue("incorrect finishedMap ",
+        jobInfo.getFinishedMaps() == 2);
+    Assert.assertTrue("incorrect finishedReduces ",
+        jobInfo.getFinishedReduces() == 1);
+    int totalTasks = jobInfo.getAllTasks().size();
+    Assert.assertTrue("total number of tasks is incorrect  ", totalTasks == 3);
+
+    //Assert at taskAttempt level
+    for (TaskInfo taskInfo :  jobInfo.getAllTasks().values()) {
+      int taskAttemptCount = taskInfo.getAllTaskAttempts().size();
+      Assert.assertTrue("total number of task attempts ", 
+          taskAttemptCount == 1);
+    }
+//
+//   // Test for checking jobstats for job status store
+//    Path statusFilePath = new Path(jobstatusDir, "jobstats");
+//    try {
+//      FileContext fc = FileContext.getFileContext(statusFilePath.toUri());
+//      in = fc.open(statusFilePath);
+//    } catch (IOException ioe) {
+//      LOG.info("Can not open status file "+ ioe);
+//      throw (new Exception("Can not open status File"));
+//    }
+//    parser = new JobHistoryParser(in);
+//    jobInfo = parser.parse();
+//    Assert.assertTrue("incorrect finishedMap in job stats file ",
+//        jobInfo.getFinishedMaps() == 2);
+//    Assert.assertTrue("incorrect finishedReduces in job stats file ",
+//        jobInfo.getFinishedReduces() == 1);
+  }
+
+  public static void main(String[] args) throws Exception {
+    TestJobHistoryParsing t = new TestJobHistoryParsing();
+    t.testHistoryParsing();
+  }
+}

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java?rev=1101385&r1=1101384&r2=1101385&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java Tue May 10 09:44:27 2011
@@ -69,6 +69,9 @@ public class MiniMRYarnCluster extends M
     conf.set(YARNApplicationConstants.APPS_STAGING_DIR_KEY, new File(
         getTestWorkDir(),
         "apps_staging_dir/${user.name}/").getAbsolutePath());
+    conf.set(YARNApplicationConstants.APPS_HISTORY_STAGING_DIR_KEY, new File(
+        getTestWorkDir(), "history_staging_dir/${user.name}/")
+        .getAbsolutePath());
     conf.set(MRConfig.MASTER_ADDRESS, "test"); // The default is local because of
                                              // which shuffle doesn't happen
     //configure the shuffle service in NM

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestUberAM.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestUberAM.java?rev=1101385&r1=1101384&r2=1101385&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestUberAM.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestUberAM.java Tue May 10 09:44:27 2011
@@ -20,21 +20,11 @@ package org.apache.hadoop.mapreduce.v2;
 
 import java.io.File;
 import java.io.IOException;
-import java.lang.Exception;
-import java.security.PrivilegedExceptionAction;
 
 import junit.framework.Assert;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.FailingMapper;
-import org.apache.hadoop.RandomTextWriterJob;
-import org.apache.hadoop.SleepJob;
-import org.apache.hadoop.RandomTextWriterJob.RandomInputFormat;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobStatus;
 import org.apache.hadoop.mapreduce.MRJobConfig;
@@ -42,20 +32,7 @@ import org.apache.hadoop.mapreduce.TaskA
 import org.apache.hadoop.mapreduce.TaskCompletionEvent;
 import org.apache.hadoop.mapreduce.TaskID;
 import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
-import org.apache.hadoop.mapreduce.v2.TestMRJobs;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.yarn.conf.YARNApplicationConstants;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.YarnServerConfig;
-import org.apache.hadoop.yarn.server.nodemanager.NMConfig;
-import org.apache.hadoop.yarn.server.resourcemanager.RMConfig;
-import org.junit.AfterClass;
 import org.junit.BeforeClass;
-import org.junit.Test;
 
 public class TestUberAM extends TestMRJobs {
 

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YARNApplicationConstants.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YARNApplicationConstants.java?rev=1101385&r1=1101384&r2=1101385&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YARNApplicationConstants.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YARNApplicationConstants.java Tue May 10 09:44:27 2011
@@ -38,6 +38,8 @@ public class YARNApplicationConstants {
 
   public static final String APPS_STAGING_DIR_KEY = "yarn.apps.stagingDir";
 
+  public static final String APPS_HISTORY_STAGING_DIR_KEY = "yarn.apps.history.stagingDir";
+  
   public static final String YARN_MAPREDUCE_APP_JAR_PATH =
       "$YARN_HOME/modules/" + HADOOP_MAPREDUCE_CLIENT_APP_JAR_NAME;
 

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/resources/yarn-default.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/resources/yarn-default.xml?rev=1101385&r1=1101384&r2=1101385&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/resources/yarn-default.xml (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/resources/yarn-default.xml Tue May 10 09:44:27 2011
@@ -57,6 +57,10 @@
     <value>/tmp/hadoop-yarn/${user.name}/staging</value>
    </property>
 
+  <property>
+    <name>yarn.apps.history.stagingDir</name>
+    <value>/tmp/hadoop-yarn/${user.name}/staging</value>
+   </property>
 
   <property>
     <name>yarn.server.nodemanager.keytab</name>



Mime
View raw message