hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject svn commit: r1327354 - in /hadoop/common/trunk/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/...
Date Wed, 18 Apr 2012 01:59:17 GMT
Author: sseth
Date: Wed Apr 18 01:59:16 2012
New Revision: 1327354

URL: http://svn.apache.org/viewvc?rev=1327354&view=rev
Log:
MAPREDUCE-3972. Fix locking and exception issues in JobHistory server. (Contributed by Robert Joseph Evans)

Modified:
    hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Job.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/AMWebServices.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/ConfBlock.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/ConfInfo.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CachedHistoryStorage.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryStorage.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsWebServices.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEntities.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java

Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1327354&r1=1327353&r2=1327354&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Wed Apr 18 01:59:16 2012
@@ -268,6 +268,9 @@ Release 0.23.3 - UNRELEASED
     MAPREDUCE-4134. Remove references of mapred.child.ulimit etc. since they
     are not being used any more (Ravi Prakash via bobby)
 
+    MAPREDUCE-3972. Fix locking and exception issues in JobHistory server.
+    (Robert Joseph Evans via sseth)
+
   OPTIMIZATIONS
 
   BUG FIXES

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Job.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Job.java?rev=1327354&r1=1327353&r2=1327354&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Job.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Job.java Wed Apr 18 01:59:16 2012
@@ -18,9 +18,11 @@
 
 package org.apache.hadoop.mapreduce.v2.app.job;
 
+import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.JobACL;
@@ -72,6 +74,13 @@ public interface Job {
   Path getConfFile();
   
   /**
+   * @return a parsed version of the config files pointed to by 
+   * {@link #getConfFile()}.
+   * @throws IOException on any error trying to load the conf file. 
+   */
+  Configuration loadConfFile() throws IOException;
+  
+  /**
    * @return the ACLs for this job for each type of JobACL given. 
    */
   Map<JobACL, AccessControlList> getJobACLs();

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java?rev=1327354&r1=1327353&r2=1327354&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java Wed Apr 18 01:59:16 2012
@@ -37,6 +37,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
@@ -1472,4 +1473,13 @@ public class JobImpl implements org.apac
       job.finished(JobState.ERROR);
     }
   }
+
+  @Override
+  public Configuration loadConfFile() throws IOException {
+    Path confPath = getConfFile();
+    FileContext fc = FileContext.getFileContext(confPath.toUri(), conf);
+    Configuration jobConf = new Configuration(false);
+    jobConf.addResource(fc.open(confPath));
+    return jobConf;
+  }
 }

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/AMWebServices.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/AMWebServices.java?rev=1327354&r1=1327353&r2=1327354&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/AMWebServices.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/AMWebServices.java Wed Apr 18 01:59:16 2012
@@ -31,7 +31,6 @@ import javax.ws.rs.core.Context;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response.Status;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.JobACL;
 import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
@@ -68,14 +67,11 @@ import com.google.inject.Inject;
 public class AMWebServices {
   private final AppContext appCtx;
   private final App app;
-  private final Configuration conf;
-
+  
   @Inject
-  public AMWebServices(final App app, final AppContext context,
-      final Configuration conf) {
+  public AMWebServices(final App app, final AppContext context) {
     this.appCtx = context;
     this.app = app;
-    this.conf = conf;
   }
 
   Boolean hasAccess(Job job, HttpServletRequest request) {
@@ -272,7 +268,7 @@ public class AMWebServices {
     checkAccess(job, hsr);
     ConfInfo info;
     try {
-      info = new ConfInfo(job, this.conf);
+      info = new ConfInfo(job);
     } catch (IOException e) {
       throw new NotFoundException("unable to load configuration for job: "
           + jid);

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/ConfBlock.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/ConfBlock.java?rev=1327354&r1=1327353&r2=1327354&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/ConfBlock.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/ConfBlock.java Wed Apr 18 01:59:16 2012
@@ -23,7 +23,6 @@ import static org.apache.hadoop.yarn.web
 
 import java.io.IOException;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
@@ -44,11 +43,9 @@ import com.google.inject.Inject;
  */
 public class ConfBlock extends HtmlBlock {
   final AppContext appContext;
-  final Configuration conf;
 
-  @Inject ConfBlock(AppContext appctx, Configuration conf) {
+  @Inject ConfBlock(AppContext appctx) {
     appContext = appctx;
-    this.conf = conf;
   }
 
   /*
@@ -71,7 +68,7 @@ public class ConfBlock extends HtmlBlock
     }
     Path confPath = job.getConfFile();
     try {
-      ConfInfo info = new ConfInfo(job, this.conf);
+      ConfInfo info = new ConfInfo(job);
 
       html.div().h3(confPath.toString())._();
       TBODY<TABLE<Hamlet>> tbody = html.

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/ConfInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/ConfInfo.java?rev=1327354&r1=1327353&r2=1327354&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/ConfInfo.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/ConfInfo.java Wed Apr 18 01:59:16 2012
@@ -40,15 +40,11 @@ public class ConfInfo {
   public ConfInfo() {
   }
 
-  public ConfInfo(Job job, Configuration conf) throws IOException {
+  public ConfInfo(Job job) throws IOException {
 
-    Path confPath = job.getConfFile();
     this.property = new ArrayList<ConfEntryInfo>();
-    // Read in the configuration file and put it in a key/value table.
-    FileContext fc = FileContext.getFileContext(confPath.toUri(), conf);
-    Configuration jobConf = new Configuration(false);
-    jobConf.addResource(fc.open(confPath));
-    this.path = confPath.toString();
+    Configuration jobConf = job.loadConfFile();
+    this.path = job.getConfFile().toString();
     for (Map.Entry<String, String> entry : jobConf) {
       this.property.add(new ConfEntryInfo(entry.getKey(), entry.getValue()));
     }

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java?rev=1327354&r1=1327353&r2=1327354&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java Wed Apr 18 01:59:16 2012
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.mapreduce.v2.app;
 
+import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -27,6 +28,7 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobACLsManager;
 import org.apache.hadoop.mapred.ShuffleHandler;
@@ -442,7 +444,7 @@ public class MockJobs extends MockApps {
     final Path configFile = confFile;
 
     Map<JobACL, AccessControlList> tmpJobACLs = new HashMap<JobACL, AccessControlList>();
-    Configuration conf = new Configuration();
+    final Configuration conf = new Configuration();
     conf.set(JobACL.VIEW_JOB.getAclName(), "testuser");
     conf.setBoolean(MRConfig.MR_ACLS_ENABLED, true);
 
@@ -564,6 +566,14 @@ public class MockJobs extends MockApps {
         amInfoList.add(createAMInfo(2));
         return amInfoList;
       }
+
+      @Override
+      public Configuration loadConfFile() throws IOException {
+        FileContext fc = FileContext.getFileContext(configFile.toUri(), conf);
+        Configuration jobConf = new Configuration(false);
+        jobConf.addResource(fc.open(configFile));
+        return jobConf;
+      }
     };
   }
 

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java?rev=1327354&r1=1327353&r2=1327354&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java Wed Apr 18 01:59:16 2012
@@ -489,6 +489,11 @@ public class TestRuntimeEstimators {
     public List<AMInfo> getAMInfos() {
       throw new UnsupportedOperationException("Not supported yet.");
     }
+    
+    @Override
+    public Configuration loadConfFile() {
+      throw new UnsupportedOperationException();
+    }
   }
 
   /*

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CachedHistoryStorage.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CachedHistoryStorage.java?rev=1327354&r1=1327353&r2=1327354&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CachedHistoryStorage.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CachedHistoryStorage.java Wed Apr 18 01:59:16 2012
@@ -34,7 +34,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
 import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobsInfo;
-import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.MetaInfo;
+import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.HistoryFileInfo;
 import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobInfo;
 import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
 import org.apache.hadoop.yarn.YarnException;
@@ -82,32 +82,41 @@ public class CachedHistoryStorage extend
     super(CachedHistoryStorage.class.getName());
   }
 
-  private Job loadJob(MetaInfo metaInfo) {
+  private Job loadJob(HistoryFileInfo fileInfo) {
     try {
-      Job job = hsManager.loadJob(metaInfo);
+      Job job = fileInfo.loadJob();
       if (LOG.isDebugEnabled()) {
         LOG.debug("Adding " + job.getID() + " to loaded job cache");
       }
+      // We can clobber results here, but that should be OK, because it only
+      // means that we may have two identical copies of the same job floating
+      // around for a while.
       loadedJobCache.put(job.getID(), job);
       return job;
     } catch (IOException e) {
       throw new YarnException(
-          "Could not find/load job: " + metaInfo.getJobId(), e);
+          "Could not find/load job: " + fileInfo.getJobId(), e);
     }
   }
 
   @Override
-  public synchronized Job getFullJob(JobId jobId) {
+  public Job getFullJob(JobId jobId) {
     if (LOG.isDebugEnabled()) {
       LOG.debug("Looking for Job " + jobId);
     }
     try {
-      Job result = loadedJobCache.get(jobId);
-      if (result == null) {
-        MetaInfo metaInfo = hsManager.getMetaInfo(jobId);
-        if (metaInfo != null) {
-          result = loadJob(metaInfo);
+      HistoryFileInfo fileInfo = hsManager.getFileInfo(jobId);
+      Job result = null;
+      if (fileInfo != null) {
+        result = loadedJobCache.get(jobId);
+        if (result == null) {
+          result = loadJob(fileInfo);
+        } else if(fileInfo.isDeleted()) {
+          loadedJobCache.remove(jobId);
+          result = null;
         }
+      } else {
+        loadedJobCache.remove(jobId);
       }
       return result;
     } catch (IOException e) {
@@ -120,25 +129,20 @@ public class CachedHistoryStorage extend
     LOG.debug("Called getAllPartialJobs()");
     SortedMap<JobId, Job> result = new TreeMap<JobId, Job>();
     try {
-      for (MetaInfo mi : hsManager.getAllMetaInfo()) {
+      for (HistoryFileInfo mi : hsManager.getAllFileInfo()) {
         if (mi != null) {
           JobId id = mi.getJobId();
           result.put(id, new PartialJob(mi.getJobIndexInfo(), id));
         }
       }
     } catch (IOException e) {
-      LOG.warn("Error trying to scan for all MetaInfos", e);
+      LOG.warn("Error trying to scan for all FileInfos", e);
       throw new YarnException(e);
     }
     return result;
   }
 
   @Override
-  public void jobRemovedFromHDFS(JobId jobId) {
-    loadedJobCache.remove(jobId);
-  }
-
-  @Override
   public JobsInfo getPartialJobs(Long offset, Long count, String user,
       String queue, Long sBegin, Long sEnd, Long fBegin, Long fEnd,
       JobState jobState) {
@@ -173,6 +177,7 @@ public class CachedHistoryStorage extend
     if (end < 0) { // due to overflow
       end = Long.MAX_VALUE;
     }
+
     for (Job job : jobs) {
       if (at > end) {
         break;

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java?rev=1327354&r1=1327353&r2=1327354&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java Wed Apr 18 01:59:16 2012
@@ -53,6 +53,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.mapreduce.v2.app.job.Task;
 import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.HistoryFileInfo;
 import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
 import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -71,7 +72,7 @@ public class CompletedJob implements org
   private final Configuration conf;
   private final JobId jobId; //Can be picked from JobInfo with a conversion.
   private final String user; //Can be picked up from JobInfo
-  private final Path confFile;
+  private final HistoryFileInfo info;
   private JobInfo jobInfo;
   private JobReport report;
   AtomicBoolean tasksLoaded = new AtomicBoolean(false);
@@ -84,13 +85,14 @@ public class CompletedJob implements org
   
   
   public CompletedJob(Configuration conf, JobId jobId, Path historyFile, 
-      boolean loadTasks, String userName, Path confFile, JobACLsManager aclsMgr) 
+      boolean loadTasks, String userName, HistoryFileInfo info,
+      JobACLsManager aclsMgr) 
           throws IOException {
     LOG.info("Loading job: " + jobId + " from file: " + historyFile);
     this.conf = conf;
     this.jobId = jobId;
     this.user = userName;
-    this.confFile = confFile;
+    this.info = info;
     this.aclsMgr = aclsMgr;
     loadFullHistoryData(loadTasks, historyFile);
   }
@@ -134,7 +136,7 @@ public class CompletedJob implements org
     report.setUser(jobInfo.getUsername());
     report.setMapProgress((float) getCompletedMaps() / getTotalMaps());
     report.setReduceProgress((float) getCompletedReduces() / getTotalReduces());
-    report.setJobFile(confFile.toString());
+    report.setJobFile(getConfFile().toString());
     String historyUrl = "N/A";
     try {
       historyUrl = JobHistoryUtils.getHistoryUrl(conf, jobId.getAppId());
@@ -392,7 +394,16 @@ public class CompletedJob implements org
    */
   @Override
   public Path getConfFile() {
-    return confFile;
+    return info.getConfFile();
+  }
+  
+  /*
+   * (non-Javadoc)
+   * @see org.apache.hadoop.mapreduce.v2.app.job.Job#loadConfFile()
+   */
+  @Override
+  public Configuration loadConfFile() throws IOException {
+    return info.loadConfFile();
   }
 
   @Override

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java?rev=1327354&r1=1327353&r2=1327354&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java Wed Apr 18 01:59:16 2012
@@ -25,12 +25,17 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -57,6 +62,8 @@ import org.apache.hadoop.mapreduce.v2.jo
 import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.service.AbstractService;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
 /**
  * This class provides a way to interact with history files in a thread safe
  * manor.
@@ -67,33 +74,251 @@ public class HistoryFileManager extends 
   private static final Log LOG = LogFactory.getLog(HistoryFileManager.class);
   private static final Log SUMMARY_LOG = LogFactory.getLog(JobSummary.class);
 
+  private static enum HistoryInfoState {
+    IN_INTERMEDIATE, IN_DONE, DELETED, MOVE_FAILED
+  };
+
   private static String DONE_BEFORE_SERIAL_TAIL = JobHistoryUtils
       .doneSubdirsBeforeSerialTail();
 
-  public static class MetaInfo {
+  /**
+   * Maps between a serial number (generated based on jobId) and the timestamp
+   * component(s) to which it belongs. Facilitates jobId based searches. If a
+   * jobId is not found in this list - it will not be found.
+   */
+  private static class SerialNumberIndex {
+    private SortedMap<String, Set<String>> cache;
+    private int maxSize;
+
+    public SerialNumberIndex(int maxSize) {
+      this.cache = new TreeMap<String, Set<String>>();
+      this.maxSize = maxSize;
+    }
+
+    public synchronized void add(String serialPart, String timestampPart) {
+      if (!cache.containsKey(serialPart)) {
+        cache.put(serialPart, new HashSet<String>());
+        if (cache.size() > maxSize) {
+          String key = cache.firstKey();
+          LOG.error("Dropping " + key
+              + " from the SerialNumberIndex. We will no "
+              + "longer be able to see jobs that are in that serial index for "
+              + cache.get(key));
+          cache.remove(key);
+        }
+      }
+      Set<String> datePartSet = cache.get(serialPart);
+      datePartSet.add(timestampPart);
+    }
+
+    public synchronized void remove(String serialPart, String timeStampPart) {
+      if (cache.containsKey(serialPart)) {
+        Set<String> set = cache.get(serialPart);
+        set.remove(timeStampPart);
+        if (set.isEmpty()) {
+          cache.remove(serialPart);
+        }
+      }
+    }
+
+    public synchronized Set<String> get(String serialPart) {
+      Set<String> found = cache.get(serialPart);
+      if (found != null) {
+        return new HashSet<String>(found);
+      }
+      return null;
+    }
+  }
+
+  private static class JobListCache {
+    private ConcurrentSkipListMap<JobId, HistoryFileInfo> cache;
+    private int maxSize;
+    private long maxAge;
+
+    public JobListCache(int maxSize, long maxAge) {
+      this.maxSize = maxSize;
+      this.maxAge = maxAge;
+      this.cache = new ConcurrentSkipListMap<JobId, HistoryFileInfo>();
+    }
+
+    public HistoryFileInfo addIfAbsent(HistoryFileInfo fileInfo) {
+      JobId jobId = fileInfo.getJobIndexInfo().getJobId();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Adding " + jobId + " to job list cache with "
+            + fileInfo.getJobIndexInfo());
+      }
+      HistoryFileInfo old = cache.putIfAbsent(jobId, fileInfo);
+      if (cache.size() > maxSize) {
+        //There is a race here, where more then one thread could be trying to
+        // remove entries.  This could result in too many entries being removed
+        // from the cache.  This is considered OK as the size of the cache
+        // should be rather large, and we would rather have performance over
+        // keeping the cache size exactly at the maximum.
+        Iterator<JobId> keys = cache.navigableKeySet().iterator();
+        long cutoff = System.currentTimeMillis() - maxAge;
+        while(cache.size() > maxSize && keys.hasNext()) {
+          JobId key = keys.next();
+          HistoryFileInfo firstValue = cache.get(key);
+          if(firstValue != null) {
+            synchronized(firstValue) {
+              if (firstValue.isMovePending()) {
+                if(firstValue.didMoveFail() && 
+                    firstValue.jobIndexInfo.getFinishTime() <= cutoff) {
+                  cache.remove(key);
+                  //Now lets try to delete it
+                  try {
+                    firstValue.delete();
+                  } catch (IOException e) {
+                    LOG.error("Error while trying to delete history files" +
+                    		" that could not be moved to done.", e);
+                  }
+                } else {
+                  LOG.warn("Waiting to remove " + key
+                      + " from JobListCache because it is not in done yet.");
+                }
+              } else {
+                cache.remove(key);
+              }
+            }
+          }
+        }
+      }
+      return old;
+    }
+
+    public void delete(HistoryFileInfo fileInfo) {
+      cache.remove(fileInfo.getJobId());
+    }
+
+    public Collection<HistoryFileInfo> values() {
+      return new ArrayList<HistoryFileInfo>(cache.values());
+    }
+
+    public HistoryFileInfo get(JobId jobId) {
+      return cache.get(jobId);
+    }
+  }
+
+  public class HistoryFileInfo {
     private Path historyFile;
     private Path confFile;
     private Path summaryFile;
     private JobIndexInfo jobIndexInfo;
+    private HistoryInfoState state;
 
-    public MetaInfo(Path historyFile, Path confFile, Path summaryFile,
-        JobIndexInfo jobIndexInfo) {
+    private HistoryFileInfo(Path historyFile, Path confFile, Path summaryFile,
+        JobIndexInfo jobIndexInfo, boolean isInDone) {
       this.historyFile = historyFile;
       this.confFile = confFile;
       this.summaryFile = summaryFile;
       this.jobIndexInfo = jobIndexInfo;
+      state = isInDone ? HistoryInfoState.IN_DONE
+          : HistoryInfoState.IN_INTERMEDIATE;
     }
 
-    private Path getHistoryFile() {
-      return historyFile;
+    private synchronized boolean isMovePending() {
+      return state == HistoryInfoState.IN_INTERMEDIATE
+          || state == HistoryInfoState.MOVE_FAILED;
     }
 
-    private Path getConfFile() {
-      return confFile;
+    private synchronized boolean didMoveFail() {
+      return state == HistoryInfoState.MOVE_FAILED;
+    }
+    
+    /**
+     * @return true if the files backed by this were deleted.
+     */
+    public synchronized boolean isDeleted() {
+      return state == HistoryInfoState.DELETED;
     }
 
-    private Path getSummaryFile() {
-      return summaryFile;
+    private synchronized void moveToDone() throws IOException {
+      if (!isMovePending()) {
+        // It was either deleted or is already in done. Either way do nothing
+        return;
+      }
+      try {
+        long completeTime = jobIndexInfo.getFinishTime();
+        if (completeTime == 0) {
+          completeTime = System.currentTimeMillis();
+        }
+        JobId jobId = jobIndexInfo.getJobId();
+
+        List<Path> paths = new ArrayList<Path>(2);
+        if (historyFile == null) {
+          LOG.info("No file for job-history with " + jobId + " found in cache!");
+        } else {
+          paths.add(historyFile);
+        }
+
+        if (confFile == null) {
+          LOG.info("No file for jobConf with " + jobId + " found in cache!");
+        } else {
+          paths.add(confFile);
+        }
+
+        if (summaryFile == null) {
+          LOG.info("No summary file for job: " + jobId);
+        } else {
+          String jobSummaryString = getJobSummary(intermediateDoneDirFc,
+              summaryFile);
+          SUMMARY_LOG.info(jobSummaryString);
+          LOG.info("Deleting JobSummary file: [" + summaryFile + "]");
+          intermediateDoneDirFc.delete(summaryFile, false);
+          summaryFile = null;
+        }
+
+        Path targetDir = canonicalHistoryLogPath(jobId, completeTime);
+        addDirectoryToSerialNumberIndex(targetDir);
+        makeDoneSubdir(targetDir);
+        if (historyFile != null) {
+          Path toPath = doneDirFc.makeQualified(new Path(targetDir, historyFile
+              .getName()));
+          if (!toPath.equals(historyFile)) {
+            moveToDoneNow(historyFile, toPath);
+            historyFile = toPath;
+          }
+        }
+        if (confFile != null) {
+          Path toPath = doneDirFc.makeQualified(new Path(targetDir, confFile
+              .getName()));
+          if (!toPath.equals(confFile)) {
+            moveToDoneNow(confFile, toPath);
+            confFile = toPath;
+          }
+        }
+        state = HistoryInfoState.IN_DONE;
+      } catch (Throwable t) {
+        LOG.error("Error while trying to move a job to done", t);
+        this.state = HistoryInfoState.MOVE_FAILED;
+      }
+    }
+
+    /**
+     * Parse a job from the JobHistoryFile, if the underlying file is not going
+     * to be deleted.
+     * 
+     * @return the Job or null if the underlying file was deleted.
+     * @throws IOException
+     *           if there is an error trying to read the file.
+     */
+    public synchronized Job loadJob() throws IOException {
+      return new CompletedJob(conf, jobIndexInfo.getJobId(), historyFile,
+          false, jobIndexInfo.getUser(), this, aclsMgr);
+    }
+
+    /**
+     * Return the history file.  This should only be used for testing.
+     * @return the history file.
+     */
+    synchronized Path getHistoryFile() {
+      return historyFile;
+    }
+    
+    private synchronized void delete() throws IOException {
+      state = HistoryInfoState.DELETED;
+      doneDirFc.delete(doneDirFc.makeQualified(historyFile), false);
+      doneDirFc.delete(doneDirFc.makeQualified(confFile), false);
     }
 
     public JobIndexInfo getJobIndexInfo() {
@@ -104,57 +329,35 @@ public class HistoryFileManager extends 
       return jobIndexInfo.getJobId();
     }
 
-    private void setHistoryFile(Path historyFile) {
-      this.historyFile = historyFile;
-    }
-
-    private void setConfFile(Path confFile) {
-      this.confFile = confFile;
+    public synchronized Path getConfFile() {
+      return confFile;
     }
-
-    private void setSummaryFile(Path summaryFile) {
-      this.summaryFile = summaryFile;
+    
+    public synchronized Configuration loadConfFile() throws IOException {
+      FileContext fc = FileContext.getFileContext(confFile.toUri(), conf);
+      Configuration jobConf = new Configuration(false);
+      jobConf.addResource(fc.open(confFile));
+      return jobConf;
     }
   }
 
-  /**
-   * Maps between a serial number (generated based on jobId) and the timestamp
-   * component(s) to which it belongs. Facilitates jobId based searches. If a
-   * jobId is not found in this list - it will not be found.
-   */
-  private final SortedMap<String, Set<String>> idToDateString = 
-    new TreeMap<String, Set<String>>();
-  // The number of entries in idToDateString
-  private int dateStringCacheSize;
-
-  // Maintains minimal details for recent jobs (parsed from history file name).
-  // Sorted on Job Completion Time.
-  private final SortedMap<JobId, MetaInfo> jobListCache = 
-    new ConcurrentSkipListMap<JobId, MetaInfo>();
-  // The number of jobs to maintain in the job list cache.
-  private int jobListCacheSize;
-
-  // Re-use existing MetaInfo objects if they exist for the specific JobId.
-  // (synchronization on MetaInfo)
-  // Check for existence of the object when using iterators.
-  private final SortedMap<JobId, MetaInfo> intermediateListCache = 
-    new ConcurrentSkipListMap<JobId, MetaInfo>();
+  private SerialNumberIndex serialNumberIndex = null;
+  private JobListCache jobListCache = null;
 
   // Maintains a list of known done subdirectories.
-  private final Set<Path> existingDoneSubdirs = new HashSet<Path>();
+  private final Set<Path> existingDoneSubdirs = Collections
+      .synchronizedSet(new HashSet<Path>());
 
   /**
    * Maintains a mapping between intermediate user directories and the last
    * known modification time.
    */
-  private Map<String, Long> userDirModificationTimeMap = 
-    new HashMap<String, Long>();
+  private Map<String, Long> userDirModificationTimeMap = new HashMap<String, Long>();
 
   private JobACLsManager aclsMgr;
 
   private Configuration conf;
 
-  // TODO Remove me!!!!
   private boolean debugMode;
   private String serialNumberFormat;
 
@@ -165,6 +368,9 @@ public class HistoryFileManager extends 
   private FileContext intermediateDoneDirFc; // Intermediate Done Dir
                                              // FileContext
 
+  private ThreadPoolExecutor moveToDoneExecutor = null;
+  private long maxHistoryAge = 0;
+  
   public HistoryFileManager() {
     super(HistoryFileManager.class.getName());
   }
@@ -211,12 +417,25 @@ public class HistoryFileManager extends 
 
     this.aclsMgr = new JobACLsManager(conf);
 
-    jobListCacheSize = conf.getInt(JHAdminConfig.MR_HISTORY_JOBLIST_CACHE_SIZE,
-        JHAdminConfig.DEFAULT_MR_HISTORY_JOBLIST_CACHE_SIZE);
+    maxHistoryAge = conf.getLong(JHAdminConfig.MR_HISTORY_MAX_AGE_MS,
+        JHAdminConfig.DEFAULT_MR_HISTORY_MAX_AGE);
+    
+    jobListCache = new JobListCache(conf.getInt(
+        JHAdminConfig.MR_HISTORY_JOBLIST_CACHE_SIZE,
+        JHAdminConfig.DEFAULT_MR_HISTORY_JOBLIST_CACHE_SIZE),
+        maxHistoryAge);
 
-    dateStringCacheSize = conf.getInt(
+    serialNumberIndex = new SerialNumberIndex(conf.getInt(
         JHAdminConfig.MR_HISTORY_DATESTRING_CACHE_SIZE,
-        JHAdminConfig.DEFAULT_MR_HISTORY_DATESTRING_CACHE_SIZE);
+        JHAdminConfig.DEFAULT_MR_HISTORY_DATESTRING_CACHE_SIZE));
+
+    int numMoveThreads = conf.getInt(
+        JHAdminConfig.MR_HISTORY_MOVE_THREAD_COUNT,
+        JHAdminConfig.DEFAULT_MR_HISTORY_MOVE_THREAD_COUNT);
+    ThreadFactory tf = new ThreadFactoryBuilder().setNameFormat(
+        "MoveIntermediateToDone Thread #%d").build();
+    moveToDoneExecutor = new ThreadPoolExecutor(numMoveThreads, numMoveThreads,
+        1, TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>(), tf);
 
     super.init(conf);
   }
@@ -249,6 +468,7 @@ public class HistoryFileManager extends 
   void initExisting() throws IOException {
     LOG.info("Initializing Existing Jobs...");
     List<FileStatus> timestampedDirList = findTimestampedDirectories();
+    // Sort first just so insertion is in a consistent order
     Collections.sort(timestampedDirList);
     for (FileStatus fs : timestampedDirList) {
       // TODO Could verify the correct format for these directories.
@@ -271,16 +491,7 @@ public class HistoryFileManager extends 
           + serialDirPath.toString() + ". Continuing with next");
       return;
     }
-    synchronized (idToDateString) {
-      // TODO make this thread safe without the synchronize
-      if (idToDateString.containsKey(serialPart)) {
-        Set<String> set = idToDateString.get(serialPart);
-        set.remove(timeStampPart);
-        if (set.isEmpty()) {
-          idToDateString.remove(serialPart);
-        }
-      }
-    }
+    serialNumberIndex.remove(serialPart, timeStampPart);
   }
 
   private void addDirectoryToSerialNumberIndex(Path serialDirPath) {
@@ -299,21 +510,7 @@ public class HistoryFileManager extends 
       LOG.warn("Could not find serial portion from path: "
           + serialDirPath.toString() + ". Continuing with next");
     }
-    addToSerialNumberIndex(serialPart, timestampPart);
-  }
-
-  private void addToSerialNumberIndex(String serialPart, String timestampPart) {
-    synchronized (idToDateString) {
-      // TODO make this thread safe without the synchronize
-      if (!idToDateString.containsKey(serialPart)) {
-        idToDateString.put(serialPart, new HashSet<String>());
-        if (idToDateString.size() > dateStringCacheSize) {
-          idToDateString.remove(idToDateString.firstKey());
-        }
-        Set<String> datePartSet = idToDateString.get(serialPart);
-        datePartSet.add(timestampPart);
-      }
-    }
+    serialNumberIndex.add(serialPart, timestampPart);
   }
 
   private void addDirectoryToJobListCache(Path path) throws IOException {
@@ -332,10 +529,10 @@ public class HistoryFileManager extends 
           .getIntermediateConfFileName(jobIndexInfo.getJobId());
       String summaryFileName = JobHistoryUtils
           .getIntermediateSummaryFileName(jobIndexInfo.getJobId());
-      MetaInfo metaInfo = new MetaInfo(fs.getPath(), new Path(fs.getPath()
-          .getParent(), confFileName), new Path(fs.getPath().getParent(),
-          summaryFileName), jobIndexInfo);
-      addToJobListCache(metaInfo);
+      HistoryFileInfo fileInfo = new HistoryFileInfo(fs.getPath(), new Path(fs
+          .getPath().getParent(), confFileName), new Path(fs.getPath()
+          .getParent(), summaryFileName), jobIndexInfo, true);
+      jobListCache.addIfAbsent(fileInfo);
     }
   }
 
@@ -371,25 +568,18 @@ public class HistoryFileManager extends 
     return fsList;
   }
 
-  private void addToJobListCache(MetaInfo metaInfo) {
-    JobId jobId = metaInfo.getJobIndexInfo().getJobId();
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Adding " + jobId + " to job list cache with "
-          + metaInfo.getJobIndexInfo());
-    }
-    jobListCache.put(jobId, metaInfo);
-    if (jobListCache.size() > jobListCacheSize) {
-      jobListCache.remove(jobListCache.firstKey());
-    }
-  }
-
   /**
    * Scans the intermediate directory to find user directories. Scans these for
-   * history files if the modification time for the directory has changed.
+   * history files if the modification time for the directory has changed. Once
+   * it finds history files it starts the process of moving them to the done 
+   * directory.
    * 
    * @throws IOException
+   *           if there was a error while scanning
    */
-  private void scanIntermediateDirectory() throws IOException {
+  void scanIntermediateDirectory() throws IOException {
+    // TODO it would be great to limit how often this happens, except in the
+    // case where we are looking for a particular job.
     List<FileStatus> userDirList = JobHistoryUtils.localGlobber(
         intermediateDoneDirFc, intermediateDoneDirPath, "");
 
@@ -405,7 +595,12 @@ public class HistoryFileManager extends 
         }
       }
       if (shouldScan) {
-        scanIntermediateDirectory(userDir.getPath());
+        try {
+          scanIntermediateDirectory(userDir.getPath());
+        } catch (IOException e) {
+          LOG.error("Error while trying to scan the directory " 
+              + userDir.getPath(), e);
+        }
       }
     }
   }
@@ -426,11 +621,33 @@ public class HistoryFileManager extends 
           .getIntermediateConfFileName(jobIndexInfo.getJobId());
       String summaryFileName = JobHistoryUtils
           .getIntermediateSummaryFileName(jobIndexInfo.getJobId());
-      MetaInfo metaInfo = new MetaInfo(fs.getPath(), new Path(fs.getPath()
-          .getParent(), confFileName), new Path(fs.getPath().getParent(),
-          summaryFileName), jobIndexInfo);
-      if (!intermediateListCache.containsKey(jobIndexInfo.getJobId())) {
-        intermediateListCache.put(jobIndexInfo.getJobId(), metaInfo);
+      HistoryFileInfo fileInfo = new HistoryFileInfo(fs.getPath(), new Path(fs
+          .getPath().getParent(), confFileName), new Path(fs.getPath()
+          .getParent(), summaryFileName), jobIndexInfo, false);
+
+      final HistoryFileInfo old = jobListCache.addIfAbsent(fileInfo);
+      if (old == null || old.didMoveFail()) {
+        final HistoryFileInfo found = (old == null) ? fileInfo : old;
+        long cutoff = System.currentTimeMillis() - maxHistoryAge;
+        if(found.getJobIndexInfo().getFinishTime() <= cutoff) {
+          try {
+            found.delete();
+          } catch (IOException e) {
+            LOG.warn("Error cleaning up a HistoryFile that is out of date.", e);
+          }
+        } else {
+          moveToDoneExecutor.execute(new Runnable() {
+            @Override
+            public void run() {
+              try {
+                found.moveToDone();
+              } catch (IOException e) {
+                LOG.info("Failed to process fileInfo for job: " + 
+                    found.getJobId(), e);
+              }
+            }
+          });
+        }
       }
     }
   }
@@ -442,11 +659,11 @@ public class HistoryFileManager extends 
    *          fileStatus list of Job History Files.
    * @param jobId
    *          The JobId to find.
-   * @return A MetaInfo object for the jobId, null if not found.
+   * @return A FileInfo object for the jobId, null if not found.
    * @throws IOException
    */
-  private MetaInfo getJobMetaInfo(List<FileStatus> fileStatusList, JobId jobId)
-      throws IOException {
+  private HistoryFileInfo getJobFileInfo(List<FileStatus> fileStatusList,
+      JobId jobId) throws IOException {
     for (FileStatus fs : fileStatusList) {
       JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath()
           .getName());
@@ -455,10 +672,10 @@ public class HistoryFileManager extends 
             .getIntermediateConfFileName(jobIndexInfo.getJobId());
         String summaryFileName = JobHistoryUtils
             .getIntermediateSummaryFileName(jobIndexInfo.getJobId());
-        MetaInfo metaInfo = new MetaInfo(fs.getPath(), new Path(fs.getPath()
-            .getParent(), confFileName), new Path(fs.getPath().getParent(),
-            summaryFileName), jobIndexInfo);
-        return metaInfo;
+        HistoryFileInfo fileInfo = new HistoryFileInfo(fs.getPath(), new Path(
+            fs.getPath().getParent(), confFileName), new Path(fs.getPath()
+            .getParent(), summaryFileName), jobIndexInfo, true);
+        return fileInfo;
       }
     }
     return null;
@@ -474,175 +691,51 @@ public class HistoryFileManager extends 
    * @return
    * @throws IOException
    */
-  private MetaInfo scanOldDirsForJob(JobId jobId) throws IOException {
+  private HistoryFileInfo scanOldDirsForJob(JobId jobId) throws IOException {
     int jobSerialNumber = JobHistoryUtils.jobSerialNumber(jobId);
     String boxedSerialNumber = String.valueOf(jobSerialNumber);
-    Set<String> dateStringSet;
-    synchronized (idToDateString) {
-      Set<String> found = idToDateString.get(boxedSerialNumber);
-      if (found == null) {
-        return null;
-      } else {
-        dateStringSet = new HashSet<String>(found);
-      }
+    Set<String> dateStringSet = serialNumberIndex.get(boxedSerialNumber);
+    if (dateStringSet == null) {
+      return null;
     }
     for (String timestampPart : dateStringSet) {
       Path logDir = canonicalHistoryLogPath(jobId, timestampPart);
       List<FileStatus> fileStatusList = scanDirectoryForHistoryFiles(logDir,
           doneDirFc);
-      MetaInfo metaInfo = getJobMetaInfo(fileStatusList, jobId);
-      if (metaInfo != null) {
-        return metaInfo;
+      HistoryFileInfo fileInfo = getJobFileInfo(fileStatusList, jobId);
+      if (fileInfo != null) {
+        return fileInfo;
       }
     }
     return null;
   }
 
-  /**
-   * Checks for the existence of the job history file in the intermediate
-   * directory.
-   * 
-   * @param jobId
-   * @return
-   * @throws IOException
-   */
-  private MetaInfo scanIntermediateForJob(JobId jobId) throws IOException {
-    scanIntermediateDirectory();
-    return intermediateListCache.get(jobId);
-  }
-
-  /**
-   * Parse a job from the JobHistoryFile, if the underlying file is not going to
-   * be deleted.
-   * 
-   * @param metaInfo
-   *          the where the JobHistory is stored.
-   * @return the Job or null if the underlying file was deleted.
-   * @throws IOException
-   *           if there is an error trying to read the file.
-   */
-  public Job loadJob(MetaInfo metaInfo) throws IOException {
-    return new CompletedJob(conf, metaInfo.getJobIndexInfo().getJobId(),
-        metaInfo.getHistoryFile(), false, metaInfo.getJobIndexInfo().getUser(),
-        metaInfo.getConfFile(), aclsMgr);
-  }
-
-  public Collection<MetaInfo> getAllMetaInfo() throws IOException {
+  public Collection<HistoryFileInfo> getAllFileInfo() throws IOException {
     scanIntermediateDirectory();
-    ArrayList<MetaInfo> result = new ArrayList<MetaInfo>();
-    result.addAll(intermediateListCache.values());
-    result.addAll(jobListCache.values());
-    return result;
+    return jobListCache.values();
   }
 
-  Collection<MetaInfo> getIntermediateMetaInfos() throws IOException {
-    scanIntermediateDirectory();
-    return intermediateListCache.values();
-  }
-
-  public MetaInfo getMetaInfo(JobId jobId) throws IOException {
-    // MetaInfo available in cache.
-    MetaInfo metaInfo = null;
-    if (jobListCache.containsKey(jobId)) {
-      metaInfo = jobListCache.get(jobId);
-    }
-
-    if (metaInfo != null) {
-      return metaInfo;
+  public HistoryFileInfo getFileInfo(JobId jobId) throws IOException {
+    // FileInfo available in cache.
+    HistoryFileInfo fileInfo = jobListCache.get(jobId);
+    if (fileInfo != null) {
+      return fileInfo;
     }
-
-    // MetaInfo not available. Check intermediate directory for meta info.
-    metaInfo = scanIntermediateForJob(jobId);
-    if (metaInfo != null) {
-      return metaInfo;
+    // OK so scan the intermediate to be sure we did not lose it that way
+    scanIntermediateDirectory();
+    fileInfo = jobListCache.get(jobId);
+    if (fileInfo != null) {
+      return fileInfo;
     }
 
     // Intermediate directory does not contain job. Search through older ones.
-    metaInfo = scanOldDirsForJob(jobId);
-    if (metaInfo != null) {
-      return metaInfo;
+    fileInfo = scanOldDirsForJob(jobId);
+    if (fileInfo != null) {
+      return fileInfo;
     }
     return null;
   }
 
-  void moveToDone(MetaInfo metaInfo) throws IOException {
-    long completeTime = metaInfo.getJobIndexInfo().getFinishTime();
-    if (completeTime == 0)
-      completeTime = System.currentTimeMillis();
-    JobId jobId = metaInfo.getJobIndexInfo().getJobId();
-
-    List<Path> paths = new ArrayList<Path>();
-    Path historyFile = metaInfo.getHistoryFile();
-    if (historyFile == null) {
-      LOG.info("No file for job-history with " + jobId + " found in cache!");
-    } else {
-      paths.add(historyFile);
-    }
-
-    Path confFile = metaInfo.getConfFile();
-    if (confFile == null) {
-      LOG.info("No file for jobConf with " + jobId + " found in cache!");
-    } else {
-      paths.add(confFile);
-    }
-
-    // TODO Check all mi getters and setters for the conf path
-    Path summaryFile = metaInfo.getSummaryFile();
-    if (summaryFile == null) {
-      LOG.info("No summary file for job: " + jobId);
-    } else {
-      try {
-        String jobSummaryString = getJobSummary(intermediateDoneDirFc,
-            summaryFile);
-        SUMMARY_LOG.info(jobSummaryString);
-        LOG.info("Deleting JobSummary file: [" + summaryFile + "]");
-        intermediateDoneDirFc.delete(summaryFile, false);
-        metaInfo.setSummaryFile(null);
-      } catch (IOException e) {
-        LOG.warn("Failed to process summary file: [" + summaryFile + "]");
-        throw e;
-      }
-    }
-
-    Path targetDir = canonicalHistoryLogPath(jobId, completeTime);
-    addDirectoryToSerialNumberIndex(targetDir);
-    try {
-      makeDoneSubdir(targetDir);
-    } catch (IOException e) {
-      LOG.warn("Failed creating subdirectory: " + targetDir
-          + " while attempting to move files for jobId: " + jobId);
-      throw e;
-    }
-    synchronized (metaInfo) {
-      if (historyFile != null) {
-        Path toPath = doneDirFc.makeQualified(new Path(targetDir, historyFile
-            .getName()));
-        try {
-          moveToDoneNow(historyFile, toPath);
-        } catch (IOException e) {
-          LOG.warn("Failed to move file: " + historyFile + " for jobId: "
-              + jobId);
-          throw e;
-        }
-        metaInfo.setHistoryFile(toPath);
-      }
-      if (confFile != null) {
-        Path toPath = doneDirFc.makeQualified(new Path(targetDir, confFile
-            .getName()));
-        try {
-          moveToDoneNow(confFile, toPath);
-        } catch (IOException e) {
-          LOG.warn("Failed to move file: " + historyFile + " for jobId: "
-              + jobId);
-          throw e;
-        }
-        metaInfo.setConfFile(toPath);
-      }
-    }
-    addToJobListCache(metaInfo);
-    intermediateListCache.remove(jobId);
-  }
-
   private void moveToDoneNow(final Path src, final Path target)
       throws IOException {
     LOG.info("Moving " + src.toString() + " to " + target.toString());
@@ -658,20 +751,9 @@ public class HistoryFileManager extends 
   }
 
   private void makeDoneSubdir(Path path) throws IOException {
-    boolean existsInExistingCache = false;
-    synchronized (existingDoneSubdirs) {
-      if (existingDoneSubdirs.contains(path))
-        existsInExistingCache = true;
-    }
     try {
       doneDirFc.getFileStatus(path);
-      if (!existsInExistingCache) {
-        existingDoneSubdirs.add(path);
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("JobHistory.maybeMakeSubdirectory -- We believed " + path
-              + " already existed, but it didn't.");
-        }
-      }
+      existingDoneSubdirs.add(path);
     } catch (FileNotFoundException fnfE) {
       try {
         FsPermission fsp = new FsPermission(
@@ -685,11 +767,8 @@ public class HistoryFileManager extends 
               + ", " + fsp);
           doneDirFc.setPermission(path, fsp);
         }
-        synchronized (existingDoneSubdirs) {
-          existingDoneSubdirs.add(path);
-        }
-      } catch (FileAlreadyExistsException faeE) { 
-        // Nothing to do.
+        existingDoneSubdirs.add(path);
+      } catch (FileAlreadyExistsException faeE) { // Nothing to do.
       }
     }
   }
@@ -713,16 +792,22 @@ public class HistoryFileManager extends 
     return finishTime;
   }
 
-  private void deleteJobFromDone(MetaInfo metaInfo) throws IOException {
-    jobListCache.remove(metaInfo.getJobId());
-    doneDirFc.delete(doneDirFc.makeQualified(metaInfo.getHistoryFile()), false);
-    doneDirFc.delete(doneDirFc.makeQualified(metaInfo.getConfFile()), false);
+  private void deleteJobFromDone(HistoryFileInfo fileInfo) throws IOException {
+    jobListCache.delete(fileInfo);
+    fileInfo.delete();
   }
 
+  /**
+   * Clean up older history files.
+   * 
+   * @throws IOException
+   *           on any error trying to remove the entries.
+   */
   @SuppressWarnings("unchecked")
-  void clean(long cutoff, HistoryStorage storage) throws IOException {
+  void clean() throws IOException {
     // TODO this should be replaced by something that knows about the directory
     // structure and will put less of a load on HDFS.
+    long cutoff = System.currentTimeMillis() - maxHistoryAge;
     boolean halted = false;
     // TODO Delete YYYY/MM/DD directories.
     List<FileStatus> serialDirList = findTimestampedDirectories();
@@ -737,13 +822,17 @@ public class HistoryFileManager extends 
         long effectiveTimestamp = getEffectiveTimestamp(
             jobIndexInfo.getFinishTime(), historyFile);
         if (effectiveTimestamp <= cutoff) {
-          String confFileName = JobHistoryUtils
-              .getIntermediateConfFileName(jobIndexInfo.getJobId());
-          MetaInfo metaInfo = new MetaInfo(historyFile.getPath(), new Path(
-              historyFile.getPath().getParent(), confFileName), null,
-              jobIndexInfo);
-          storage.jobRemovedFromHDFS(metaInfo.getJobId());
-          deleteJobFromDone(metaInfo);
+          HistoryFileInfo fileInfo = this.jobListCache.get(jobIndexInfo
+              .getJobId());
+          if (fileInfo == null) {
+            String confFileName = JobHistoryUtils
+                .getIntermediateConfFileName(jobIndexInfo.getJobId());
+
+            fileInfo = new HistoryFileInfo(historyFile.getPath(), new Path(
+                historyFile.getPath().getParent(), confFileName), null,
+                jobIndexInfo, true);
+          }
+          deleteJobFromDone(fileInfo);
         } else {
           halted = true;
           break;
@@ -752,9 +841,7 @@ public class HistoryFileManager extends 
       if (!halted) {
         doneDirFc.delete(doneDirFc.makeQualified(serialDir.getPath()), true);
         removeDirectoryFromSerialNumberIndex(serialDir.getPath());
-        synchronized (existingDoneSubdirs) {
-          existingDoneSubdirs.remove(serialDir.getPath());
-        }
+        existingDoneSubdirs.remove(serialDir.getPath());
       } else {
         break; // Don't scan any more directories.
       }

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryStorage.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryStorage.java?rev=1327354&r1=1327353&r2=1327354&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryStorage.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryStorage.java Wed Apr 18 01:59:16 2012
@@ -28,7 +28,12 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.classification.InterfaceStability;
 
 /**
- * Provides an API to query jobs that have finished. 
+ * Provides an API to query jobs that have finished.
+ * 
+ * For those implementing this API be aware that there is no feedback when
+ * files are removed from HDFS.  You may rely on HistoryFileManager to help
+ * you know when that has happened if you have not made a complete backup of
+ * the data stored on HDFS.
  */
 @InterfaceAudience.Public
 @InterfaceStability.Unstable
@@ -71,10 +76,4 @@ public interface HistoryStorage {
    * @return the job, or null if it is not found.
    */
   Job getFullJob(JobId jobId);
-
-  /**
-   * Informs the Storage that a job has been removed from HDFS
-   * @param jobId the ID of the job that was removed.
-   */
-  void jobRemovedFromHDFS(JobId jobId);
 }

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java?rev=1327354&r1=1327353&r2=1327354&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java Wed Apr 18 01:59:16 2012
@@ -21,10 +21,7 @@ package org.apache.hadoop.mapreduce.v2.h
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
 
@@ -37,7 +34,7 @@ import org.apache.hadoop.mapreduce.TypeC
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
-import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.MetaInfo;
+import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.HistoryFileInfo;
 import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobsInfo;
 import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
 import org.apache.hadoop.util.ReflectionUtils;
@@ -66,15 +63,9 @@ public class JobHistory extends Abstract
   // Time interval for the move thread.
   private long moveThreadInterval;
 
-  // Number of move threads.
-  private int numMoveThreads;
-
   private Configuration conf;
 
-  private Thread moveIntermediateToDoneThread = null;
-  private MoveIntermediateToDoneRunnable moveIntermediateToDoneRunnable = null;
-
-  private ScheduledThreadPoolExecutor cleanerScheduledExecutor = null;
+  private ScheduledThreadPoolExecutor scheduledExecutor = null;
 
   private HistoryStorage storage = null;
   private HistoryFileManager hsManager = null;
@@ -91,8 +82,6 @@ public class JobHistory extends Abstract
     moveThreadInterval = conf.getLong(
         JHAdminConfig.MR_HISTORY_MOVE_INTERVAL_MS,
         JHAdminConfig.DEFAULT_MR_HISTORY_MOVE_INTERVAL_MS);
-    numMoveThreads = conf.getInt(JHAdminConfig.MR_HISTORY_MOVE_THREAD_COUNT,
-        JHAdminConfig.DEFAULT_MR_HISTORY_MOVE_THREAD_COUNT);
 
     hsManager = new HistoryFileManager();
     hsManager.init(conf);
@@ -120,27 +109,22 @@ public class JobHistory extends Abstract
       ((Service) storage).start();
     }
 
-    // Start moveIntermediatToDoneThread
-    moveIntermediateToDoneRunnable = new MoveIntermediateToDoneRunnable(
-        moveThreadInterval, numMoveThreads);
-    moveIntermediateToDoneThread = new Thread(moveIntermediateToDoneRunnable);
-    moveIntermediateToDoneThread.setName("MoveIntermediateToDoneScanner");
-    moveIntermediateToDoneThread.start();
+    scheduledExecutor = new ScheduledThreadPoolExecutor(2,
+        new ThreadFactoryBuilder().setNameFormat("Log Scanner/Cleaner #%d")
+            .build());
+
+    scheduledExecutor.scheduleAtFixedRate(new MoveIntermediateToDoneRunnable(),
+        moveThreadInterval, moveThreadInterval, TimeUnit.MILLISECONDS);
 
     // Start historyCleaner
     boolean startCleanerService = conf.getBoolean(
         JHAdminConfig.MR_HISTORY_CLEANER_ENABLE, true);
     if (startCleanerService) {
-      long maxAgeOfHistoryFiles = conf.getLong(
-          JHAdminConfig.MR_HISTORY_MAX_AGE_MS,
-          JHAdminConfig.DEFAULT_MR_HISTORY_MAX_AGE);
-      cleanerScheduledExecutor = new ScheduledThreadPoolExecutor(1,
-          new ThreadFactoryBuilder().setNameFormat("LogCleaner").build());
       long runInterval = conf.getLong(
           JHAdminConfig.MR_HISTORY_CLEANER_INTERVAL_MS,
           JHAdminConfig.DEFAULT_MR_HISTORY_CLEANER_INTERVAL_MS);
-      cleanerScheduledExecutor
-          .scheduleAtFixedRate(new HistoryCleaner(maxAgeOfHistoryFiles),
+      scheduledExecutor
+          .scheduleAtFixedRate(new HistoryCleaner(),
               30 * 1000l, runInterval, TimeUnit.MILLISECONDS);
     }
     super.start();
@@ -149,24 +133,12 @@ public class JobHistory extends Abstract
   @Override
   public void stop() {
     LOG.info("Stopping JobHistory");
-    if (moveIntermediateToDoneThread != null) {
-      LOG.info("Stopping move thread");
-      moveIntermediateToDoneRunnable.stop();
-      moveIntermediateToDoneThread.interrupt();
-      try {
-        LOG.info("Joining on move thread");
-        moveIntermediateToDoneThread.join();
-      } catch (InterruptedException e) {
-        LOG.info("Interrupted while stopping move thread");
-      }
-    }
-
-    if (cleanerScheduledExecutor != null) {
-      LOG.info("Stopping History Cleaner");
-      cleanerScheduledExecutor.shutdown();
+    if (scheduledExecutor != null) {
+      LOG.info("Stopping History Cleaner/Move To Done");
+      scheduledExecutor.shutdown();
       boolean interrupted = false;
       long currentTime = System.currentTimeMillis();
-      while (!cleanerScheduledExecutor.isShutdown()
+      while (!scheduledExecutor.isShutdown()
           && System.currentTimeMillis() > currentTime + 1000l && !interrupted) {
         try {
           Thread.sleep(20);
@@ -174,8 +146,10 @@ public class JobHistory extends Abstract
           interrupted = true;
         }
       }
-      if (!cleanerScheduledExecutor.isShutdown()) {
-        LOG.warn("HistoryCleanerService shutdown may not have succeeded");
+      if (!scheduledExecutor.isShutdown()) {
+        LOG.warn("HistoryCleanerService/move to done shutdown may not have " +
+        		"succeeded, Forcing a shutdown");
+        scheduledExecutor.shutdownNow();
       }
     }
     if (storage instanceof Service) {
@@ -195,68 +169,34 @@ public class JobHistory extends Abstract
   }
 
   private class MoveIntermediateToDoneRunnable implements Runnable {
-
-    private long sleepTime;
-    private ThreadPoolExecutor moveToDoneExecutor = null;
-    private boolean running = false;
-
-    public synchronized void stop() {
-      running = false;
-      notify();
-    }
-
-    MoveIntermediateToDoneRunnable(long sleepTime, int numMoveThreads) {
-      this.sleepTime = sleepTime;
-      ThreadFactory tf = new ThreadFactoryBuilder().setNameFormat(
-          "MoveIntermediateToDone Thread #%d").build();
-      moveToDoneExecutor = new ThreadPoolExecutor(1, numMoveThreads, 1,
-          TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>(), tf);
-      running = true;
-    }
-
     @Override
     public void run() {
-      Thread.currentThread().setName("IntermediateHistoryScanner");
       try {
-        while (true) {
-          LOG.info("Starting scan to move intermediate done files");
-          for (final MetaInfo metaInfo : hsManager.getIntermediateMetaInfos()) {
-            moveToDoneExecutor.execute(new Runnable() {
-              @Override
-              public void run() {
-                try {
-                  hsManager.moveToDone(metaInfo);
-                } catch (IOException e) {
-                  LOG.info(
-                      "Failed to process metaInfo for job: "
-                          + metaInfo.getJobId(), e);
-                }
-              }
-            });
-          }
-          synchronized (this) {
-            try {
-              this.wait(sleepTime);
-            } catch (InterruptedException e) {
-              LOG.info("IntermediateHistoryScannerThread interrupted");
-            }
-            if (!running) {
-              break;
-            }
-          }
-        }
+        LOG.info("Starting scan to move intermediate done files");
+        hsManager.scanIntermediateDirectory();
       } catch (IOException e) {
-        LOG.warn("Unable to get a list of intermediate files to be moved");
-        // TODO Shut down the entire process!!!!
+        LOG.error("Error while scanning intermediate done dir ", e);
       }
     }
   }
+  
+  private class HistoryCleaner implements Runnable {
+    public void run() {
+      LOG.info("History Cleaner started");
+      try {
+        hsManager.clean();
+      } catch (IOException e) {
+        LOG.warn("Error trying to clean up ", e);
+      }
+      LOG.info("History Cleaner complete");
+    }
+  }
 
   /**
    * Helper method for test cases.
    */
-  MetaInfo getJobMetaInfo(JobId jobId) throws IOException {
-    return hsManager.getMetaInfo(jobId);
+  HistoryFileInfo getJobFileInfo(JobId jobId) throws IOException {
+    return hsManager.getFileInfo(jobId);
   }
 
   @Override
@@ -313,25 +253,6 @@ public class JobHistory extends Abstract
         fBegin, fEnd, jobState);
   }
 
-  public class HistoryCleaner implements Runnable {
-    long maxAgeMillis;
-
-    public HistoryCleaner(long maxAge) {
-      this.maxAgeMillis = maxAge;
-    }
-
-    public void run() {
-      LOG.info("History Cleaner started");
-      long cutoff = System.currentTimeMillis() - maxAgeMillis;
-      try {
-        hsManager.clean(cutoff, storage);
-      } catch (IOException e) {
-        LOG.warn("Error trying to clean up ", e);
-      }
-      LOG.info("History Cleaner complete");
-    }
-  }
-
   // TODO AppContext - Not Required
   private ApplicationAttemptId appAttemptID;
 

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java?rev=1327354&r1=1327353&r2=1327354&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java Wed Apr 18 01:59:16 2012
@@ -23,6 +23,7 @@ import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.JobACL;
@@ -166,6 +167,11 @@ public class PartialJob implements org.a
   public Path getConfFile() {
     throw new IllegalStateException("Not implemented yet");
   }
+  
+  @Override
+  public Configuration loadConfFile() {
+    throw new IllegalStateException("Not implemented yet");
+  }
 
   @Override
   public Map<JobACL, AccessControlList> getJobACLs() {

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsWebServices.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsWebServices.java?rev=1327354&r1=1327353&r2=1327354&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsWebServices.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsWebServices.java Wed Apr 18 01:59:16 2012
@@ -65,7 +65,6 @@ import com.google.inject.Inject;
 public class HsWebServices {
   private final HistoryContext ctx;
   private WebApp webapp;
-  private final Configuration conf;
 
   @Context
   UriInfo uriInfo;
@@ -74,7 +73,6 @@ public class HsWebServices {
   public HsWebServices(final HistoryContext ctx, final Configuration conf,
       final WebApp webapp) {
     this.ctx = ctx;
-    this.conf = conf;
     this.webapp = webapp;
   }
 
@@ -222,7 +220,7 @@ public class HsWebServices {
     Job job = AMWebServices.getJobFromJobIdString(jid, ctx);
     ConfInfo info;
     try {
-      info = new ConfInfo(job, this.conf);
+      info = new ConfInfo(job);
     } catch (IOException e) {
       throw new NotFoundException("unable to load configuration for job: "
           + jid);

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEntities.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEntities.java?rev=1327354&r1=1327353&r2=1327354&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEntities.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEntities.java Wed Apr 18 01:59:16 2012
@@ -22,12 +22,15 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.mapreduce.v2.app.job.Task;
 import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.HistoryFileInfo;
 import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameters;
 
+import static org.mockito.Mockito.*;
+
 @RunWith(value = Parameterized.class)
 public class TestJobHistoryEntities {
 
@@ -61,10 +64,12 @@ public class TestJobHistoryEntities {
   /* Verify some expected values based on the history file */
   @Test
   public void testCompletedJob() throws Exception {
+    HistoryFileInfo info = mock(HistoryFileInfo.class);
+    when(info.getConfFile()).thenReturn(fullConfPath);
     //Re-initialize to verify the delayed load.
     completedJob =
       new CompletedJob(conf, jobId, fulleHistoryPath, loadTasks, "user",
-          fullConfPath, jobAclsManager);
+          info, jobAclsManager);
     //Verify tasks loaded based on loadTask parameter.
     assertEquals(loadTasks, completedJob.tasksLoaded.get());
     assertEquals(1, completedJob.getAMInfos().size());
@@ -84,9 +89,11 @@ public class TestJobHistoryEntities {
   
   @Test
   public void testCompletedTask() throws Exception {
+    HistoryFileInfo info = mock(HistoryFileInfo.class);
+    when(info.getConfFile()).thenReturn(fullConfPath);
     completedJob =
       new CompletedJob(conf, jobId, fulleHistoryPath, loadTasks, "user",
-          fullConfPath, jobAclsManager);
+          info, jobAclsManager);
     TaskId mt1Id = MRBuilderUtils.newTaskId(jobId, 0, TaskType.MAP);
     TaskId rt1Id = MRBuilderUtils.newTaskId(jobId, 0, TaskType.REDUCE);
     
@@ -111,9 +118,11 @@ public class TestJobHistoryEntities {
   
   @Test
   public void testCompletedTaskAttempt() throws Exception {
+    HistoryFileInfo info = mock(HistoryFileInfo.class);
+    when(info.getConfFile()).thenReturn(fullConfPath);
     completedJob =
       new CompletedJob(conf, jobId, fulleHistoryPath, loadTasks, "user",
-          fullConfPath, jobAclsManager);
+          info, jobAclsManager);
     TaskId mt1Id = MRBuilderUtils.newTaskId(jobId, 0, TaskType.MAP);
     TaskId rt1Id = MRBuilderUtils.newTaskId(jobId, 0, TaskType.REDUCE);
     TaskAttemptId mta1Id = MRBuilderUtils.newTaskAttemptId(mt1Id, 0);

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java?rev=1327354&r1=1327353&r2=1327354&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java Wed Apr 18 01:59:16 2012
@@ -56,6 +56,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
+import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.HistoryFileInfo;
 import org.apache.hadoop.mapreduce.v2.hs.TestJobHistoryEvents.MRAppWithHistory;
 import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils;
 import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
@@ -84,12 +85,22 @@ public class TestJobHistoryParsing {
 
   @Test
   public void testHistoryParsing() throws Exception {
-    checkHistoryParsing(2, 1, 2);
+    LOG.info("STARTING testHistoryParsing()");
+    try {
+      checkHistoryParsing(2, 1, 2);
+    } finally {
+      LOG.info("FINISHED testHistoryParsing()");
+    }
   }
   
   @Test
   public void testHistoryParsingWithParseErrors() throws Exception {
-    checkHistoryParsing(3, 0, 2);
+    LOG.info("STARTING testHistoryParsingWithParseErrors()");
+    try {
+      checkHistoryParsing(3, 0, 2);
+    } finally {
+      LOG.info("FINISHED testHistoryParsingWithParseErrors()");
+    }
   }
   
   private static String getJobSummary(FileContext fc, Path path) throws IOException {
@@ -124,61 +135,112 @@ public class TestJobHistoryParsing {
 
     String jobhistoryDir = JobHistoryUtils
         .getHistoryIntermediateDoneDirForUser(conf);
-    JobHistory jobHistory = new JobHistory();
-    jobHistory.init(conf);
-
-    JobIndexInfo jobIndexInfo = jobHistory.getJobMetaInfo(jobId)
-        .getJobIndexInfo();
-    String jobhistoryFileName = FileNameIndexUtils
-        .getDoneFileName(jobIndexInfo);
-
-    Path historyFilePath = new Path(jobhistoryDir, jobhistoryFileName);
-    FSDataInputStream in = null;
-    LOG.info("JobHistoryFile is: " + historyFilePath);
+    
     FileContext fc = null;
     try {
       fc = FileContext.getFileContext(conf);
-      in = fc.open(fc.makeQualified(historyFilePath));
     } catch (IOException ioe) {
-      LOG.info("Can not open history file: " + historyFilePath, ioe);
-      throw (new Exception("Can not open History File"));
+      LOG.info("Can not get FileContext", ioe);
+      throw (new Exception("Can not get File Context"));
     }
-
-    JobHistoryParser parser = new JobHistoryParser(in);
-    final EventReader realReader = new EventReader(in);
-    EventReader reader = Mockito.mock(EventReader.class);
+    
     if (numMaps == numSuccessfulMaps) {
-      reader = realReader;
-    } else {
-      final AtomicInteger numFinishedEvents = new AtomicInteger(0);  // Hack!
-      Mockito.when(reader.getNextEvent()).thenAnswer(
-          new Answer<HistoryEvent>() {
-            public HistoryEvent answer(InvocationOnMock invocation) 
-                throws IOException {
-              HistoryEvent event = realReader.getNextEvent();
-              if (event instanceof TaskFinishedEvent) {
-                numFinishedEvents.incrementAndGet();
-              }
-              
-              if (numFinishedEvents.get() <= numSuccessfulMaps) {
-                return event;
-              } else {
-                throw new IOException("test");
+      String summaryFileName = JobHistoryUtils
+          .getIntermediateSummaryFileName(jobId);
+      Path summaryFile = new Path(jobhistoryDir, summaryFileName);
+      String jobSummaryString = getJobSummary(fc, summaryFile);
+      Assert.assertNotNull(jobSummaryString);
+      Assert.assertTrue(jobSummaryString.contains("resourcesPerMap=100"));
+      Assert.assertTrue(jobSummaryString.contains("resourcesPerReduce=100"));
+
+      Map<String, String> jobSummaryElements = new HashMap<String, String>();
+      StringTokenizer strToken = new StringTokenizer(jobSummaryString, ",");
+      while (strToken.hasMoreTokens()) {
+        String keypair = strToken.nextToken();
+        jobSummaryElements.put(keypair.split("=")[0], keypair.split("=")[1]);
+      }
+
+      Assert.assertEquals("JobId does not match", jobId.toString(),
+          jobSummaryElements.get("jobId"));
+      Assert.assertEquals("JobName does not match", "test",
+          jobSummaryElements.get("jobName"));
+      Assert.assertTrue("submitTime should not be 0",
+          Long.parseLong(jobSummaryElements.get("submitTime")) != 0);
+      Assert.assertTrue("launchTime should not be 0",
+          Long.parseLong(jobSummaryElements.get("launchTime")) != 0);
+      Assert.assertTrue("firstMapTaskLaunchTime should not be 0",
+          Long.parseLong(jobSummaryElements.get("firstMapTaskLaunchTime")) != 0);
+      Assert
+      .assertTrue(
+          "firstReduceTaskLaunchTime should not be 0",
+          Long.parseLong(jobSummaryElements.get("firstReduceTaskLaunchTime")) != 0);
+      Assert.assertTrue("finishTime should not be 0",
+          Long.parseLong(jobSummaryElements.get("finishTime")) != 0);
+      Assert.assertEquals("Mismatch in num map slots", numSuccessfulMaps,
+          Integer.parseInt(jobSummaryElements.get("numMaps")));
+      Assert.assertEquals("Mismatch in num reduce slots", numReduces,
+          Integer.parseInt(jobSummaryElements.get("numReduces")));
+      Assert.assertEquals("User does not match", System.getProperty("user.name"),
+          jobSummaryElements.get("user"));
+      Assert.assertEquals("Queue does not match", "default",
+          jobSummaryElements.get("queue"));
+      Assert.assertEquals("Status does not match", "SUCCEEDED",
+          jobSummaryElements.get("status"));
+    }
+
+    JobHistory jobHistory = new JobHistory();
+    jobHistory.init(conf);
+    HistoryFileInfo fileInfo = jobHistory.getJobFileInfo(jobId);
+    JobInfo jobInfo;
+    long numFinishedMaps;
+    
+    synchronized(fileInfo) {
+      Path historyFilePath = fileInfo.getHistoryFile();
+      FSDataInputStream in = null;
+      LOG.info("JobHistoryFile is: " + historyFilePath);
+      try {
+        in = fc.open(fc.makeQualified(historyFilePath));
+      } catch (IOException ioe) {
+        LOG.info("Can not open history file: " + historyFilePath, ioe);
+        throw (new Exception("Can not open History File"));
+      }
+
+      JobHistoryParser parser = new JobHistoryParser(in);
+      final EventReader realReader = new EventReader(in);
+      EventReader reader = Mockito.mock(EventReader.class);
+      if (numMaps == numSuccessfulMaps) {
+        reader = realReader;
+      } else {
+        final AtomicInteger numFinishedEvents = new AtomicInteger(0);  // Hack!
+        Mockito.when(reader.getNextEvent()).thenAnswer(
+            new Answer<HistoryEvent>() {
+              public HistoryEvent answer(InvocationOnMock invocation) 
+              throws IOException {
+                HistoryEvent event = realReader.getNextEvent();
+                if (event instanceof TaskFinishedEvent) {
+                  numFinishedEvents.incrementAndGet();
+                }
+
+                if (numFinishedEvents.get() <= numSuccessfulMaps) {
+                  return event;
+                } else {
+                  throw new IOException("test");
+                }
               }
             }
-          }
         );
-    }
-    
-    JobInfo jobInfo = parser.parse(reader);
-    
-    long numFinishedMaps = 
+      }
+
+      jobInfo = parser.parse(reader);
+
+      numFinishedMaps = 
         computeFinishedMaps(jobInfo, numMaps, numSuccessfulMaps);
-    
-    if (numFinishedMaps != numMaps) {
-      Exception parseException = parser.getParseException();
-      Assert.assertNotNull("Didn't get expected parse exception", 
-          parseException);
+
+      if (numFinishedMaps != numMaps) {
+        Exception parseException = parser.getParseException();
+        Assert.assertNotNull("Didn't get expected parse exception", 
+            parseException);
+      }
     }
     
     Assert.assertEquals("Incorrect username ", System.getProperty("user.name"),
@@ -246,52 +308,6 @@ public class TestJobHistoryParsing {
         }
       }
     }
-
-    if (numMaps == numSuccessfulMaps) {
-
-      String summaryFileName = JobHistoryUtils
-          .getIntermediateSummaryFileName(jobId);
-      Path summaryFile = new Path(jobhistoryDir, summaryFileName);
-      String jobSummaryString = getJobSummary(fc, summaryFile);
-      Assert.assertTrue(jobSummaryString.contains("resourcesPerMap=100"));
-      Assert.assertTrue(jobSummaryString.contains("resourcesPerReduce=100"));
-      Assert.assertNotNull(jobSummaryString);
-
-      Map<String, String> jobSummaryElements = new HashMap<String, String>();
-      StringTokenizer strToken = new StringTokenizer(jobSummaryString, ",");
-      while (strToken.hasMoreTokens()) {
-        String keypair = strToken.nextToken();
-        jobSummaryElements.put(keypair.split("=")[0], keypair.split("=")[1]);
-
-      }
-
-      Assert.assertEquals("JobId does not match", jobId.toString(),
-          jobSummaryElements.get("jobId"));
-      Assert.assertEquals("JobName does not match", "test",
-          jobSummaryElements.get("jobName"));
-      Assert.assertTrue("submitTime should not be 0",
-          Long.parseLong(jobSummaryElements.get("submitTime")) != 0);
-      Assert.assertTrue("launchTime should not be 0",
-          Long.parseLong(jobSummaryElements.get("launchTime")) != 0);
-      Assert.assertTrue("firstMapTaskLaunchTime should not be 0",
-          Long.parseLong(jobSummaryElements.get("firstMapTaskLaunchTime")) != 0);
-      Assert
-      .assertTrue(
-          "firstReduceTaskLaunchTime should not be 0",
-          Long.parseLong(jobSummaryElements.get("firstReduceTaskLaunchTime")) != 0);
-      Assert.assertTrue("finishTime should not be 0",
-          Long.parseLong(jobSummaryElements.get("finishTime")) != 0);
-      Assert.assertEquals("Mismatch in num map slots", numSuccessfulMaps,
-          Integer.parseInt(jobSummaryElements.get("numMaps")));
-      Assert.assertEquals("Mismatch in num reduce slots", numReduces,
-          Integer.parseInt(jobSummaryElements.get("numReduces")));
-      Assert.assertEquals("User does not match", System.getProperty("user.name"),
-          jobSummaryElements.get("user"));
-      Assert.assertEquals("Queue does not match", "default",
-          jobSummaryElements.get("queue"));
-      Assert.assertEquals("Status does not match", "SUCCEEDED",
-          jobSummaryElements.get("status"));
-    }
   }
   
   // Computes finished maps similar to RecoveryService...
@@ -314,6 +330,8 @@ public class TestJobHistoryParsing {
   
   @Test
   public void testHistoryParsingForFailedAttempts() throws Exception {
+    LOG.info("STARTING testHistoryParsingForFailedAttempts");
+    try {
     Configuration conf = new Configuration();
     conf
         .setClass(
@@ -335,7 +353,7 @@ public class TestJobHistoryParsing {
     JobHistory jobHistory = new JobHistory();
     jobHistory.init(conf);
 
-    JobIndexInfo jobIndexInfo = jobHistory.getJobMetaInfo(jobId)
+    JobIndexInfo jobIndexInfo = jobHistory.getJobFileInfo(jobId)
         .getJobIndexInfo();
     String jobhistoryFileName = FileNameIndexUtils
         .getDoneFileName(jobIndexInfo);
@@ -372,6 +390,9 @@ public class TestJobHistoryParsing {
       }
     }
     Assert.assertEquals("No of Failed tasks doesn't match.", 2, noOffailedAttempts);
+    } finally {
+      LOG.info("FINISHED testHistoryParsingForFailedAttempts");
+    }
   }
   
   static class MRAppWithHistoryWithFailedAttempt extends MRAppWithHistory {



Mime
View raw message