hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From maha...@apache.org
Subject svn commit: r1127298 - in /hadoop/mapreduce/branches/MR-279: ./ mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/ ...
Date Tue, 24 May 2011 21:27:23 GMT
Author: mahadev
Date: Tue May 24 21:27:23 2011
New Revision: 1127298

URL: http://svn.apache.org/viewvc?rev=1127298&view=rev
Log:
MAPREDUCE-2522. Security for JobHistory service. (Siddharth Seth via mahadev)

Added:
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHConfig.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/security/
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/security/client/
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/security/client/ClientHSSecurityInfo.java
Modified:
    hadoop/mapreduce/branches/MR-279/CHANGES.txt
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/YarnMRJobConfig.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobACLsManager.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryClientService.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/ApplicationsManagerImpl.java

Modified: hadoop/mapreduce/branches/MR-279/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/CHANGES.txt?rev=1127298&r1=1127297&r2=1127298&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/CHANGES.txt (original)
+++ hadoop/mapreduce/branches/MR-279/CHANGES.txt Tue May 24 21:27:23 2011
@@ -3,6 +3,9 @@ Hadoop MapReduce Change Log
 Trunk (unreleased changes)
 
   MAPREDUCE-279
+   
+    MAPREDUCE-2522. Security for JobHistory service. (Siddharth Seth via 
+    mahadev)
 
     Added metrics for tracking reservations in CapacityScheduler. (Luke Lu via
     acmurthy)

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java?rev=1127298&r1=1127297&r2=1127298&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java Tue May 24 21:27:23 2011
@@ -31,6 +31,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
@@ -40,8 +41,10 @@ import org.apache.hadoop.mapreduce.TypeC
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JHConfig;
 import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
 import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo;
+import org.apache.hadoop.security.SecurityInfo;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.event.EventHandler;
@@ -54,12 +57,14 @@ import org.apache.hadoop.yarn.service.Ab
  * JobHistory implementation is in this package to access package private 
  * classes.
  */
+
 public class JobHistoryEventHandler extends AbstractService
     implements EventHandler<JobHistoryEvent> {
 
   private final AppContext context;
   private final int startCount;
 
+  //TODO Does the FS object need to be different ? 
   private FileSystem logDirFS; // log Dir FileSystem
   private FileSystem doneDirFS; // done Dir FileSystem
 
@@ -68,6 +73,7 @@ public class JobHistoryEventHandler exte
   private Path logDirPath = null;
   private Path doneDirPrefixPath = null; // folder for completed jobs
 
+
   private BlockingQueue<JobHistoryEvent> eventQueue =
     new LinkedBlockingQueue<JobHistoryEvent>();
   private Thread eventHandlingThread;
@@ -97,42 +103,84 @@ public class JobHistoryEventHandler exte
     this.conf = conf;
 
     String logDir = JobHistoryUtils.getConfiguredHistoryLogDirPrefix(conf);
+    String userLogDir = JobHistoryUtils.getHistoryLogDirForUser(conf);
     String  doneDirPrefix = JobHistoryUtils.getConfiguredHistoryIntermediateDoneDirPrefix(conf);
+    String userDoneDirPrefix = JobHistoryUtils.getHistoryIntermediateDoneDirForUser(conf);
 
+    //Check for the existance of the log dir. Maybe create it. 
+    Path path = null;
     try {
-      doneDirPrefixPath = FileSystem.get(conf).makeQualified(
-          new Path(doneDirPrefix));
-      doneDirFS = FileSystem.get(doneDirPrefixPath.toUri(), conf);
-      if (!doneDirFS.exists(doneDirPrefixPath)) {
+      path = FileSystem.get(conf).makeQualified(new Path(logDir));
+      logDirFS = FileSystem.get(path.toUri(), conf);
+      LOG.info("Maybe creating staging history logDir: [" + path + "]");
+      mkdir(logDirFS, path, new FsPermission(JobHistoryUtils.HISTORY_STAGING_DIR_PERMISSIONS));
+    } catch (IOException e) {
+      LOG.error("Failed while checking for/ceating  history staging path: [" + path + "]", e);
+      throw new YarnException(e);
+    }
+
+    //Check for the existance of intermediate done dir.
+    Path doneDirPath = null;
         try {
-          doneDirFS.mkdirs(doneDirPrefixPath, new FsPermission(
-              JobHistoryUtils.HISTORY_DIR_PERMISSION));
-        } catch (FileAlreadyExistsException e) {
-          LOG.info("JobHistory Done Directory: [" + doneDirPrefixPath
-              + "] already exists.");
+      doneDirPath = FileSystem.get(conf).makeQualified(new Path(doneDirPrefix));
+      doneDirFS = FileSystem.get(doneDirPath.toUri(), conf);
+      if (!doneDirFS.exists(doneDirPath)) {
+        // This directory will be in a common location, or this may be a cluster meant for a single user.
+        // Creating based on the conf.
+        // Should ideally be created by the JobHistoryServer or as part of deployment.
+        if (JobHistoryUtils.shouldCreateNonUserDirectory(conf)) {
+          LOG.info("Creating intermediate history logDir: [" + doneDirPath + "] + based on conf. Should ideally be created by the JobHistoryServer: " + JHConfig.CREATE_HISTORY_INTERMEDIATE_BASE_DIR_KEY);
+          mkdir(doneDirFS, doneDirPath, new FsPermission(JobHistoryUtils.HISTORY_INTERMEDIATE_DONE_DIR_PERMISSIONS.toShort()));
+          //TODO Temporary toShort till new FsPermission(FsPermissions) respects sticky
+        } else {
+          LOG.error("Not creating intermediate history logDir: [" + doneDirPath + "] based on conf: " + JHConfig.CREATE_HISTORY_INTERMEDIATE_BASE_DIR_KEY + ". Either set to true or pre-create this directory with appropriate permissions");
+          throw new YarnException("Not creating intermediate history logDir: [" + doneDirPath + "] based on conf: " + JHConfig.CREATE_HISTORY_INTERMEDIATE_BASE_DIR_KEY + ". Either set to true or pre-create this directory with appropriate permissions");
         }
       }
     } catch (IOException e) {
-          LOG.info("error creating done directory on dfs " + e);
+      LOG.error("Failed checking for the existance of history intermediate done directory: [" + doneDirPath + "]");
           throw new YarnException(e);
     }
+        
+    //Check/create staging directory.
     try {
-      logDirPath = FileSystem.get(conf).makeQualified(
-          new Path(logDir));
-      logDirFS = FileSystem.get(logDirPath.toUri(), conf);
-      if (!logDirFS.exists(logDirPath)) {
+      logDirPath = FileSystem.get(conf).makeQualified(new Path(userLogDir));
+      mkdir(logDirFS, logDirPath, new FsPermission(JobHistoryUtils.HISTORY_STAGING_USER_DIR_PERMISSIONS));
+    } catch (IOException e) {
+      LOG.error("Error creating user staging history directory: [" + logDirPath + "]", e);
+      throw new YarnException(e);
+    }
+
+    //Check/create user directory under intermediate done dir.
         try {
-          logDirFS.mkdirs(logDirPath, new FsPermission(JobHistoryUtils.HISTORY_DIR_PERMISSION));
-        } catch (FileAlreadyExistsException e) {
-          LOG.info("JobHistory Log Directory: [" + doneDirPrefixPath
-              + "] already exists.");
+      doneDirPrefixPath = FileSystem.get(conf).makeQualified(
+          new Path(userDoneDirPrefix));
+      mkdir(doneDirFS, doneDirPrefixPath, new FsPermission(JobHistoryUtils.HISTORY_INTERMEDIATE_USER_DIR_PERMISSIONS));
+    } catch (IOException e) {
+          LOG.error("Error creating user intermediate history done directory: [ " + doneDirPrefixPath + "]", e);
+          throw new YarnException(e);
+        }
+
+    super.init(conf);
+  }
+
+  private void mkdir(FileSystem fs, Path path, FsPermission fsp)
+      throws IOException {
+    if (!fs.exists(path)) {
+      try {
+        fs.mkdirs(path, fsp);
+        FileStatus fsStatus = fs.getFileStatus(path);
+        LOG.info("Perms after creating " + fsStatus.getPermission().toShort()
+            + ", Expected: " + fsp.toShort());
+        if (fsStatus.getPermission().toShort() != fsp.toShort()) {
+          LOG.info("Explicitly setting permissions to : " + fsp.toShort()
+              + ", " + fsp);
+          fs.setPermission(path, fsp);
         }
+      } catch (FileAlreadyExistsException e) {
+        LOG.info("Directory: [" + path + "] already exists.");
       }
-    } catch (IOException ioe) {
-      LOG.info("Mkdirs failed to create " + logDirPath.toString());
-      throw new YarnException(ioe);
     }
-    super.init(conf);
   }
 
   @Override
@@ -242,7 +290,6 @@ public class JobHistoryEventHandler exte
       }
     }
     
-    //This could be done at the end as well in moveToDone
     Path logDirConfPath = null;
     if (conf != null) {
       logDirConfPath = JobHistoryUtils.getStagingConfFile(logDirPath, jobId, startCount);
@@ -295,7 +342,7 @@ public class JobHistoryEventHandler exte
       try {
         setupEventWriter(event.getJobID());
       } catch (IOException ioe) {
-        LOG.error("Error JobHistoryEventHandler in handle " + ioe);
+        LOG.error("Error JobHistoryEventHandler in handleEvent: " + event, ioe);
         throw new YarnException(ioe);
       }
     }
@@ -305,7 +352,7 @@ public class JobHistoryEventHandler exte
       mi.writeEvent(historyEvent);
       LOG.info("In HistoryEventHandler " + event.getHistoryEvent().getEventType());
     } catch (IOException e) {
-      LOG.error("Error writing History Event " + e);
+      LOG.error("Error writing History Event: " + event.getHistoryEvent(), e);
       throw new YarnException(e);
     }
     // check for done
@@ -323,6 +370,7 @@ public class JobHistoryEventHandler exte
   }
   }
 
+  //TODO Path is intermediate_done/user -> Work with this throughout.
   protected void closeEventWriter(JobId jobId) throws IOException {
     final MetaInfo mi = fileMap.get(jobId);
     
@@ -343,38 +391,32 @@ public class JobHistoryEventHandler exte
       LOG.warn("No file for jobconf with " + jobId + " found in cache!");
       }
       
-    String doneDir = JobHistoryUtils.getCurrentDoneDir(doneDirPrefixPath
-        .toString());
-    Path doneDirPath = doneDirFS.makeQualified(new Path(doneDir));
+    Path qualifiedDoneFile = null;
         try {
-      if (!pathExists(doneDirFS, doneDirPath)) {
-        doneDirFS.mkdirs(doneDirPath, new FsPermission(
-            JobHistoryUtils.HISTORY_DIR_PERMISSION));
-      }
-
       if (mi.getHistoryFile() != null) {
       Path logFile = mi.getHistoryFile();
       Path qualifiedLogFile = logDirFS.makeQualified(logFile);
-        String doneJobHistoryFileName = FileNameIndexUtils.getDoneFileName(mi
-            .getJobIndexInfo());
-      Path qualifiedDoneFile = doneDirFS.makeQualified(new Path(doneDirPath,
-            doneJobHistoryFileName));
+        String doneJobHistoryFileName = getTempFileName(FileNameIndexUtils.getDoneFileName(mi
+            .getJobIndexInfo()));
+        qualifiedDoneFile = doneDirFS.makeQualified(new Path(
+            doneDirPrefixPath, doneJobHistoryFileName));
       moveToDoneNow(qualifiedLogFile, qualifiedDoneFile);
       }
       
+      Path qualifiedConfDoneFile = null;
       if (mi.getConfFile() != null) {
       Path confFile = mi.getConfFile();
       Path qualifiedConfFile = logDirFS.makeQualified(confFile);
-        String doneConfFileName = JobHistoryUtils
-            .getIntermediateConfFileName(jobId);
-        Path qualifiedConfDoneFile = doneDirFS.makeQualified(new Path(
-            doneDirPath, doneConfFileName));
+        String doneConfFileName = getTempFileName(JobHistoryUtils
+            .getIntermediateConfFileName(jobId));
+        qualifiedConfDoneFile = doneDirFS.makeQualified(new Path(
+            doneDirPrefixPath, doneConfFileName));
       moveToDoneNow(qualifiedConfFile, qualifiedConfDoneFile);
       }
-      String doneFileName = JobHistoryUtils.getIntermediateDoneFileName(jobId);
-      Path doneFilePath = doneDirFS.makeQualified(new Path(doneDirPath,
-          doneFileName));
-      touchFile(doneFilePath);
+      
+      
+      moveTmpToDone(qualifiedConfDoneFile);
+      moveTmpToDone(qualifiedDoneFile);
     } catch (IOException e) {
       LOG.error("Error closing writer for JobID: " + jobId);
       throw e;
@@ -420,42 +462,58 @@ public class JobHistoryEventHandler exte
   }
   }
 
+  private void moveTmpToDone(Path tmpPath) throws IOException {
+    if (tmpPath != null) {
+      String tmpFileName = tmpPath.getName();
+      String fileName = getFileNameFromTmpFN(tmpFileName);
+      Path path = new Path(tmpPath.getParent(), fileName);
+      doneDirFS.rename(tmpPath, path);
+      LOG.info("Moved tmp to done: " + tmpPath + " to " + path);
+    }
+  }
+  
+  // TODO If the FS objects are the same, this should be a rename instead of a
+  // copy.
   private void moveToDoneNow(Path fromPath, Path toPath) throws IOException {
-    //check if path exists, in case of retries it may not exist
+    // check if path exists, in case of retries it may not exist
     if (logDirFS.exists(fromPath)) {
-      LOG.info("Moving " + fromPath.toString() + " to " +
-          toPath.toString());
-      //TODO temporarily removing the existing dst
+      LOG.info("Moving " + fromPath.toString() + " to " + toPath.toString());
+      // TODO temporarily removing the existing dst
       if (doneDirFS.exists(toPath)) {
         doneDirFS.delete(toPath, true);
       }
-      boolean copied =
-          FileUtil.copy(logDirFS, fromPath, doneDirFS, toPath, false, conf);
+      boolean copied = FileUtil.copy(logDirFS, fromPath, doneDirFS, toPath,
+          false, conf);
+
       if (copied)
-          LOG.info("Copied to done location: "+ toPath);
+        LOG.info("Copied to done location: " + toPath);
       else 
           LOG.info("copy failed");
-      doneDirFS.setPermission(toPath,
-          new FsPermission(JobHistoryUtils.HISTORY_FILE_PERMISSION));
+      doneDirFS.setPermission(toPath, new FsPermission(
+          JobHistoryUtils.HISTORY_INTERMEDIATE_FILE_PERMISSIONS));
       
       logDirFS.delete(fromPath, false);
     }
     }
 
-  private void touchFile(Path path) throws IOException {
-    doneDirFS.createNewFile(path);
-    doneDirFS.setPermission(path, JobHistoryUtils.HISTORY_DIR_PERMISSION);
-  }
-
   boolean pathExists(FileSystem fileSys, Path path) throws IOException {
     return fileSys.exists(path);
   }
 
+  private String getTempFileName(String srcFile) {
+    return srcFile + "_tmp";
+  }
+  
+  private String getFileNameFromTmpFN(String tmpFileName) {
+    //TODO. Some error checking here.
+    return tmpFileName.substring(0, tmpFileName.length()-4);
+  }
+
   private void writeStatus(String statusstoredir, HistoryEvent event) throws IOException {
     try {
       Path statusstorepath = doneDirFS.makeQualified(new Path(statusstoredir));
       doneDirFS.mkdirs(statusstorepath,
-         new FsPermission(JobHistoryUtils.HISTORY_DIR_PERMISSION));
+         new FsPermission(JobHistoryUtils.HISTORY_DONE_DIR_PERMISSION));
       Path toPath = new Path(statusstoredir, "jobstats");
       FSDataOutputStream out = doneDirFS.create(toPath, true);
       EventWriter writer = new EventWriter(out);

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java?rev=1127298&r1=1127297&r2=1127298&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java Tue May 24 21:27:23 2011
@@ -151,13 +151,7 @@ public class RecoveryService extends Com
   private void parse() throws IOException {
     // TODO: parse history file based on startCount
     String jobName = TypeConverter.fromYarn(appID).toString();
-//    String defaultStagingDir = getConfig().get(
-//        YARNApplicationConstants.APPS_STAGING_DIR_KEY)
-//        + "/history/staging";
-    
-//    String jobhistoryDir = getConfig().get(
-//        YarnMRJobConfig.HISTORY_STAGING_DIR_KEY, defaultStagingDir);
-    String jobhistoryDir = JobHistoryUtils.getConfiguredHistoryLogDirPrefix(getConfig());
+    String jobhistoryDir = JobHistoryUtils.getHistoryLogDirForUser(getConfig());
     FSDataInputStream in = null;
     Path historyFile = null;
     Path histDirPath = FileContext.getFileContext(getConfig()).makeQualified(

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java?rev=1127298&r1=1127297&r2=1127298&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java Tue May 24 21:27:23 2011
@@ -59,6 +59,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
 import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleaner;
 import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleanupEvent;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JHConfig;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.yarn.Clock;
 import org.apache.hadoop.yarn.YarnException;
@@ -129,6 +130,7 @@ public class MRApp extends MRAppMaster {
     String user = conf.get(MRJobConfig.USER_NAME, "mapred");
     conf.set(MRJobConfig.USER_NAME, user);
     conf.set(YARNApplicationConstants.APPS_HISTORY_STAGING_DIR_KEY, testAbsPath.toString());
+    conf.setBoolean(JHConfig.CREATE_HISTORY_INTERMEDIATE_BASE_DIR_KEY, true);
 
     init(conf);
     start();

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/YarnMRJobConfig.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/YarnMRJobConfig.java?rev=1127298&r1=1127297&r2=1127298&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/YarnMRJobConfig.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/YarnMRJobConfig.java Tue May 24 21:27:23 2011
@@ -35,78 +35,6 @@ public class YarnMRJobConfig {
       = "yarn.mapreduce.job.task.runtime.estimator.exponential.smooth.lambda";
   public static final String EXPONENTIAL_SMOOTHING_SMOOTH_RATE
       = "yarn.mapreduce.job.task.runtime.estimator.exponential.smooth.smoothsrate";
-  public static final String HS_PREFIX = "yarn.server.historyserver.";
-
-  public static final String DEFAULT_HS_BIND_ADDRESS = "0.0.0.0:10020";
-
-  /** host:port address to which to bind to **/
-  public static final String HS_BIND_ADDRESS = HS_PREFIX + "address";
-
-  /** Staging Dir for AppMaster **/
-  public static final String HISTORY_STAGING_DIR_KEY =
-       "yarn.historyfile.stagingDir";
-
-  /** Done Dir for for AppMaster **/
-  public static final String HISTORY_INTERMEDIATE_DONE_DIR_KEY =
-       "yarn.historyfile.intermediateDoneDir";
-  
-  /** Done Dir for for AppMaster **/
-  public static final String HISTORY_DONE_DIR_KEY =
-       "yarn.historyfile.doneDir";
-  
-  /** Done Dir for history server. **/
-  public static final String HISTORY_SERVER_DONE_DIR_KEY = 
-       HS_PREFIX + ".historyfile.doneDir";
-  
-  /**
-   * Size of the job list cache.
-   */
-  public static final String HISTORY_SERVER_JOBLIST_CACHE_SIZE_KEY =
-    HS_PREFIX + ".joblist.cache.size";
-     
-  /**
-   * Size of the loaded job cache.
-   */
-  public static final String HISTORY_SERVER_LOADED_JOB_CACHE_SIZE_KEY = 
-    HS_PREFIX + ".loadedjobs.cache.size";
-  
-  /**
-   * Size of the date string cache. Effects the number of directories
-   * which will be scanned to find a job.
-   */
-  public static final String HISTORY_SERVER_DATESTRING_CACHE_SIZE_KEY = 
-    HS_PREFIX + ".datestring.cache.size";
-  
-  /**
-   * The time interval in milliseconds for the history server
-   * to wake up and scan for files to be moved.
-   */
-  public static final String HISTORY_SERVER_MOVE_THREAD_INTERVAL = 
-    HS_PREFIX + ".move.thread.interval";
-  
-  /**
-   * The number of threads used to move files.
-   */
-  public static final String HISTORY_SERVER_NUM_MOVE_THREADS = 
-    HS_PREFIX + ".move.threads.count";
-  
-  // Equivalent to 0.20 mapreduce.jobhistory.debug.mode
-  public static final String HISTORY_DEBUG_MODE_KEY = HS_PREFIX + ".debug.mode";
-  
-  public static final String HISTORY_MAXAGE =
-	  "yarn.historyfile.maxage";
-  
-  /**
-   * Run interval for the History Cleaner thread.
-   */
-  public static final String HISTORY_CLEANER_RUN_INTERVAL = 
-    HS_PREFIX + ".cleaner.run.interval";
-  
-  public static final String HS_WEBAPP_BIND_ADDRESS = HS_PREFIX +
-      "address.webapp";
-  public static final String DEFAULT_HS_WEBAPP_BIND_ADDRESS =
-	  "0.0.0.0:19888";
-
   public static final String RECOVERY_ENABLE
       = "yarn.mapreduce.job.recovery.enable";
 }

Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHConfig.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHConfig.java?rev=1127298&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHConfig.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHConfig.java Tue May 24 21:27:23 2011
@@ -0,0 +1,93 @@
+package org.apache.hadoop.mapreduce.v2.jobhistory;
+
+public class JHConfig {
+  public static final String HS_PREFIX = "yarn.server.historyserver.";
+  /** host:port address to which to bind to **/
+  public static final String HS_BIND_ADDRESS = HS_PREFIX + "address";
+
+  public static final String HS_USER_NAME = HS_PREFIX + "kerberos.principal";
+  
+  public static final String HS_KEYTAB_FILE = HS_PREFIX + "jeytab.file";
+  
+  public static final String DEFAULT_HS_BIND_ADDRESS = "0.0.0.0:10020";
+
+  /** Staging Dir for AppMaster **/
+  public static final String HISTORY_STAGING_DIR_KEY =
+       "yarn.historyfile.stagingDir";
+
+  /** Done Dir for for AppMaster **/
+  public static final String HISTORY_INTERMEDIATE_DONE_DIR_KEY =
+       "yarn.historyfile.intermediateDoneDir";
+
+  /** Done Dir for for AppMaster **/
+  public static final String HISTORY_DONE_DIR_KEY =
+       "yarn.historyfile.doneDir";
+
+  /**
+   * Boolean. Create the base dirs in the JobHistoryEventHandler
+   * Set to false for multi-user clusters.
+   */
+  public static final String CREATE_HISTORY_INTERMEDIATE_BASE_DIR_KEY = 
+    "yarn.history.create.intermediate.base.dir";
+  
+  /** Done Dir for history server. **/
+  public static final String HISTORY_SERVER_DONE_DIR_KEY = 
+       HS_PREFIX + "historyfile.doneDir";
+  
+  /**
+   * Size of the job list cache.
+   */
+  public static final String HISTORY_SERVER_JOBLIST_CACHE_SIZE_KEY =
+    HS_PREFIX + "joblist.cache.size";
+     
+  /**
+   * Size of the loaded job cache.
+   */
+  public static final String HISTORY_SERVER_LOADED_JOB_CACHE_SIZE_KEY = 
+    HS_PREFIX + "loadedjobs.cache.size";
+  
+  /**
+   * Size of the date string cache. Effects the number of directories
+   * which will be scanned to find a job.
+   */
+  public static final String HISTORY_SERVER_DATESTRING_CACHE_SIZE_KEY = 
+    HS_PREFIX + "datestring.cache.size";
+  
+  /**
+   * The time interval in milliseconds for the history server
+   * to wake up and scan for files to be moved.
+   */
+  public static final String HISTORY_SERVER_MOVE_THREAD_INTERVAL = 
+    HS_PREFIX + "move.thread.interval";
+  
+  /**
+   * The number of threads used to move files.
+   */
+  public static final String HISTORY_SERVER_NUM_MOVE_THREADS = 
+    HS_PREFIX + "move.threads.count";
+  
+  // Equivalent to 0.20 mapreduce.jobhistory.debug.mode
+  public static final String HISTORY_DEBUG_MODE_KEY = HS_PREFIX + "debug.mode";
+  
+  public static final String HISTORY_MAXAGE =
+    "yarn.historyfile.maxage";
+  
+  //TODO Move some of the HistoryServer specific out into a separate configuration class.
+  public static final String HS_KEYTAB_KEY = HS_PREFIX + "keytab";
+  
+  public static final String HS_SERVER_PRINCIPAL_KEY = "yarn.historyserver.principal";
+  
+  public static final String RUN_HISTORY_CLEANER_KEY = 
+    HS_PREFIX + "cleaner.run";
+  
+  /**
+   * Run interval for the History Cleaner thread.
+   */
+  public static final String HISTORY_CLEANER_RUN_INTERVAL = 
+    HS_PREFIX + "cleaner.run.interval";
+  
+  public static final String HS_WEBAPP_BIND_ADDRESS = HS_PREFIX +
+      "address.webapp";
+  public static final String DEFAULT_HS_WEBAPP_BIND_ADDRESS =
+    "0.0.0.0:19888";
+}

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java?rev=1127298&r1=1127297&r2=1127298&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java Tue May 24 21:27:23 2011
@@ -36,30 +36,56 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TypeConverter;
-import org.apache.hadoop.mapreduce.v2.YarnMRJobConfig;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.yarn.conf.YARNApplicationConstants;
 
 public class JobHistoryUtils {
   
-  private static final Log LOG = LogFactory.getLog(JobHistoryUtils.class);
+  /**
+   * Permissions for the history staging dir while JobInProgress.
+   */
+  public static final FsPermission HISTORY_STAGING_DIR_PERMISSIONS =
+    
+    FsPermission.createImmutable( (short) 0700);
+  
+  /**
+   * Permissions for the user directory under the staging directory.
+   */
+  public static final FsPermission HISTORY_STAGING_USER_DIR_PERMISSIONS = 
+    FsPermission.createImmutable((short) 0700);
   
-  public static final FsPermission HISTORY_DIR_PERMISSION =
-    FsPermission.createImmutable((short) 0750); // rwxr-x---
   
-  public static final FsPermission HISTORY_FILE_PERMISSION =
-    FsPermission.createImmutable((short) 0740); // rwxr-----
   
   /**
-   * Suffix for configuration files.
+   * Permissions for the history done dir and derivatives.
    */
-  public static final String CONF_FILE_NAME_SUFFIX = "_conf.xml";
+  public static final FsPermission HISTORY_DONE_DIR_PERMISSION =
+    FsPermission.createImmutable((short) 0770); 
+
+  public static final FsPermission HISTORY_DONE_FILE_PERMISSION =
+    FsPermission.createImmutable((short) 0770); // rwx------
   
   /**
-   * Suffix for done files.
+   * Permissions for the intermediate done directory.
    */
-  public static final String DONE_FILE_NAME_SUFFIX = ".done";
+  public static final FsPermission HISTORY_INTERMEDIATE_DONE_DIR_PERMISSIONS = 
+    FsPermission.createImmutable((short) 01777);
+  
+  /**
+   * Permissions for the user directory under the intermediate done directory.
+   */
+  public static final FsPermission HISTORY_INTERMEDIATE_USER_DIR_PERMISSIONS = 
+    FsPermission.createImmutable((short) 0770);
+  
+  public static final FsPermission HISTORY_INTERMEDIATE_FILE_PERMISSIONS = 
+    FsPermission.createImmutable((short) 0770); // rwx------
+  
+  /**
+   * Suffix for configuration files.
+   */
+  public static final String CONF_FILE_NAME_SUFFIX = "_conf.xml";
   
   /**
    * Job History File extension.
@@ -110,12 +136,13 @@ public class JobHistoryUtils {
     return JOB_HISTORY_FILE_FILTER;
   }
   
+  //The version string may need to be removed.
   /**
    * Returns the current done directory.
    * @param doneDirPrefix the prefix for the done directory.
    * @return A string representation of the done directory.
    */
-  public static String getCurrentDoneDir(String doneDirPrefix) {
+  private static String getCurrentDoneDir(String doneDirPrefix) {
     return doneDirPrefix + File.separator + LOG_VERSION_STRING + File.separator;
   }
 
@@ -126,8 +153,9 @@ public class JobHistoryUtils {
    */
   public static String getConfiguredHistoryLogDirPrefix(Configuration conf) {
     String defaultLogDir = conf.get(
-        YARNApplicationConstants.APPS_HISTORY_STAGING_DIR_KEY) + "/history/staging";
-    String logDir = conf.get(YarnMRJobConfig.HISTORY_STAGING_DIR_KEY,
+        //TODO Change this to staging directory
+        YARNApplicationConstants.APPS_HISTORY_STAGING_DIR_KEY, "/tmp") + "/history/staging";
+    String logDir = conf.get(JHConfig.HISTORY_STAGING_DIR_KEY,
       defaultLogDir);
     return logDir;
   }
@@ -139,9 +167,9 @@ public class JobHistoryUtils {
    */
   public static String getConfiguredHistoryIntermediateDoneDirPrefix(Configuration conf) {
     String defaultDoneDir = conf.get(
-        YARNApplicationConstants.APPS_HISTORY_STAGING_DIR_KEY) + "/history/done_intermediate";
+        YARNApplicationConstants.APPS_HISTORY_STAGING_DIR_KEY, "/tmp") + "/history/done_intermediate";
     String  doneDirPrefix =
-      conf.get(YarnMRJobConfig.HISTORY_INTERMEDIATE_DONE_DIR_KEY,
+      conf.get(JHConfig.HISTORY_INTERMEDIATE_DONE_DIR_KEY,
           defaultDoneDir);
     return doneDirPrefix;
   }
@@ -153,11 +181,37 @@ public class JobHistoryUtils {
    */
   public static String getConfiguredHistoryServerDoneDirPrefix(Configuration conf) {
     String defaultDoneDir = conf.get(
-        YARNApplicationConstants.APPS_HISTORY_STAGING_DIR_KEY) + "/history/done";
+        YARNApplicationConstants.APPS_HISTORY_STAGING_DIR_KEY, "/tmp") + "/history/done";
     String  doneDirPrefix =
-      conf.get(YarnMRJobConfig.HISTORY_DONE_DIR_KEY,
+      conf.get(JHConfig.HISTORY_DONE_DIR_KEY,
           defaultDoneDir);
-    return doneDirPrefix; 
+    return getCurrentDoneDir(doneDirPrefix);
+  }
+
+  /**
+   * Gets the user directory for In progress history files.
+   * @param conf
+   * @return
+   */
+  public static String getHistoryLogDirForUser(Configuration conf) {
+    return getConfiguredHistoryLogDirPrefix(conf) + File.separator
+        + conf.get(MRJobConfig.USER_NAME);
+  }
+  
+  /**
+   * Gets the user directory for intermediate done history files.
+   * @param conf
+   * @return
+   */
+  public static String getHistoryIntermediateDoneDirForUser(Configuration conf) {
+    return getConfiguredHistoryIntermediateDoneDirPrefix(conf) + File.separator
+        + conf.get(MRJobConfig.USER_NAME);
+  }
+
+  public static boolean shouldCreateNonUserDirectory(Configuration conf) {
+    // Returning true by default to allow non secure single node clusters to work
+    // without any configuration change.
+    return conf.getBoolean(JHConfig.CREATE_HISTORY_INTERMEDIATE_BASE_DIR_KEY, true); 
   }
 
   /**
@@ -184,10 +238,6 @@ public class JobHistoryUtils {
     return TypeConverter.fromYarn(jobId).toString() + CONF_FILE_NAME_SUFFIX;
   }
   
-  public static String getIntermediateDoneFileName(JobId jobId) {
-    return TypeConverter.fromYarn(jobId).toString() + DONE_FILE_NAME_SUFFIX;
-  }
-  
   /**
    * Gets the conf file path for jobs in progress.
    * 
@@ -240,11 +290,12 @@ public class JobHistoryUtils {
    * @return
    */
   public static String historyLogSubdirectory(JobId id, String timestampComponent, String serialNumberFormat) {
-    String result = LOG_VERSION_STRING;
+//    String result = LOG_VERSION_STRING;
+    String result = "";
     String serialNumberDirectory = serialNumberDirectoryComponent(id, serialNumberFormat);
     
     result = result 
-      + File.separator + timestampComponent
+      + timestampComponent
       + File.separator + serialNumberDirectory
       + File.separator;
     
@@ -273,11 +324,8 @@ public class JobHistoryUtils {
   }
   
   public static String doneSubdirsBeforeSerialTail() {
-    // Version Info
-    String result = ("/" + LOG_VERSION_STRING);
-
     // date
-    result = result + "/*/*/*"; // YYYY/MM/DD ;
+    String result = "/*/*/*"; // YYYY/MM/DD ;
     return result;
   }
   

Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/security/client/ClientHSSecurityInfo.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/security/client/ClientHSSecurityInfo.java?rev=1127298&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/security/client/ClientHSSecurityInfo.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/security/client/ClientHSSecurityInfo.java Tue May 24 21:27:23 2011
@@ -0,0 +1,56 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.security.client;
+
+import java.lang.annotation.Annotation;
+
+import org.apache.hadoop.mapreduce.v2.jobhistory.JHConfig;
+import org.apache.hadoop.security.KerberosInfo;
+import org.apache.hadoop.security.SecurityInfo;
+import org.apache.hadoop.security.token.TokenInfo;
+
+public class ClientHSSecurityInfo implements SecurityInfo {
+
+  @Override
+  public KerberosInfo getKerborosInfo(Class<?> protocol) {
+    return new KerberosInfo() {
+
+      @Override
+      public Class<? extends Annotation> annotationType() {
+        return null;
+      }
+
+      @Override
+      public String serverPrincipal() {
+        return JHConfig.HS_SERVER_PRINCIPAL_KEY;
+      }
+
+      @Override
+      public String clientPrincipal() {
+        return null;
+      }
+    };
+  }
+
+  @Override
+  public TokenInfo getTokenInfo(Class<?> protocol) {
+    return null;
+  }
+
+}

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobACLsManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobACLsManager.java?rev=1127298&r1=1127297&r2=1127298&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobACLsManager.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobACLsManager.java Tue May 24 21:27:23 2011
@@ -100,6 +100,7 @@ public class JobACLsManager {
       return true;
     }
 
+    //TODO Shouldn't this be doing some kind of a check to verify JobACL jobOperation?
     return false;
   }
 }

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java?rev=1127298&r1=1127297&r2=1127298&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java Tue May 24 21:27:23 2011
@@ -27,8 +27,6 @@ import java.util.Map;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobACLsManager;
 import org.apache.hadoop.mapreduce.JobACL;
@@ -45,7 +43,6 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.mapreduce.v2.app.job.Task;
-import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.yarn.YarnException;
@@ -138,16 +135,6 @@ public class CompletedJob implements org
     if (jobInfo != null) {
       return; //data already loaded
     }
-    String user = conf.get(MRJobConfig.USER_NAME);
-    if (user == null) {
-      LOG.error("user null is not allowed");
-    }
-    String jobName = TypeConverter.fromYarn(jobId).toString();
-    
-    String  jobhistoryDir = JobHistoryUtils.getConfiguredHistoryIntermediateDoneDirPrefix(conf);
-      
-    
-    String currentJobHistoryDir = JobHistoryUtils.getCurrentDoneDir(jobhistoryDir);
     
     if (historyFileAbsolute != null) {
       try {
@@ -158,26 +145,6 @@ public class CompletedJob implements org
             e);
       }
     }
-    else {
-    FSDataInputStream in = null;
-    Path historyFile = null;
-    try {
-      Path doneDirPath = FileContext.getFileContext(conf).makeQualified(
-          new Path(currentJobHistoryDir));
-      FileContext fc =
-        FileContext.getFileContext(doneDirPath.toUri(),conf);
-      //TODO_JH_There could be multiple instances
-      //TODO_JH_FileName
-      historyFile =
-        fc.makeQualified(new Path(doneDirPath, jobName + JobHistoryUtils.JOB_HISTORY_FILE_EXTENSION));
-      in = fc.open(historyFile);
-      JobHistoryParser parser = new JobHistoryParser(in);
-      jobInfo = parser.parse();
-    } catch (IOException e) {
-      throw new YarnException("Could not load history file " + historyFile,
-          e);
-    }
-    }
     
     if (loadTasks) {
     // populate the tasks

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryClientService.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryClientService.java?rev=1127298&r1=1127297&r2=1127298&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryClientService.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryClientService.java Tue May 24 21:27:23 2011
@@ -18,9 +18,12 @@
 
 package org.apache.hadoop.mapreduce.v2.hs;
 
+import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
+import java.security.AccessControlException;
+import java.security.PrivilegedExceptionAction;
 import java.util.Arrays;
 import java.util.Collection;
 
@@ -28,6 +31,8 @@ import org.apache.avro.ipc.Server;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.mapreduce.JobACL;
 import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptRequest;
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptResponse;
@@ -58,9 +63,12 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
 import org.apache.hadoop.mapreduce.v2.app.job.Task;
 import org.apache.hadoop.mapreduce.v2.hs.webapp.HSWebApp;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JHConfig;
+import org.apache.hadoop.mapreduce.v2.security.client.ClientHSSecurityInfo;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.SecurityInfo;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.YarnException;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@@ -69,7 +77,6 @@ import org.apache.hadoop.yarn.ipc.YarnRP
 import org.apache.hadoop.yarn.service.AbstractService;
 import org.apache.hadoop.yarn.webapp.WebApp;
 import org.apache.hadoop.yarn.webapp.WebApps;
-import org.apache.hadoop.mapreduce.v2.YarnMRJobConfig;
 
 /**
  * This module is responsible for talking to the 
@@ -93,15 +100,18 @@ public class HistoryClientService extend
   }
 
   public void start() {
+    YarnRPC rpc = YarnRPC.create(getConfig());
     Configuration conf = new Configuration(getConfig());
-    YarnRPC rpc = YarnRPC.create(conf);
-    initializeWebApp(conf);
-    String serviceAddr = conf.get(YarnMRJobConfig.HS_BIND_ADDRESS,
-        YarnMRJobConfig.DEFAULT_HS_BIND_ADDRESS);
+    conf.setClass(
+        CommonConfigurationKeys.HADOOP_SECURITY_INFO_CLASS_NAME,
+        ClientHSSecurityInfo.class, SecurityInfo.class);
+    initializeWebApp(getConfig());
+    String serviceAddr = conf.get(JHConfig.HS_BIND_ADDRESS,
+        JHConfig.DEFAULT_HS_BIND_ADDRESS);
     InetSocketAddress address = NetUtils.createSocketAddr(serviceAddr);
     InetAddress hostNameResolved = null;
     try {
-      hostNameResolved = address.getAddress().getLocalHost();
+      hostNameResolved = InetAddress.getLocalHost(); //address.getAddress().getLocalHost();
     } catch (UnknownHostException e) {
       throw new YarnException(e);
     }
@@ -122,8 +132,8 @@ public class HistoryClientService extend
 
   private void initializeWebApp(Configuration conf) {
     webApp = new HSWebApp(history);
-    String bindAddress = conf.get(YarnMRJobConfig.HS_WEBAPP_BIND_ADDRESS,
-        YarnMRJobConfig.DEFAULT_HS_WEBAPP_BIND_ADDRESS);
+    String bindAddress = conf.get(JHConfig.HS_WEBAPP_BIND_ADDRESS,
+        JHConfig.DEFAULT_HS_WEBAPP_BIND_ADDRESS);
     WebApps.$for("yarn", this).at(bindAddress).start(webApp); 
   }
 
@@ -142,18 +152,37 @@ public class HistoryClientService extend
 
     private RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
     
-    private Job getJob(JobId jobId) throws YarnRemoteException {
-      Job job = history.getJob(jobId);
+    private Job verifyAndGetJob(final JobId jobID) throws YarnRemoteException {
+      UserGroupInformation loginUgi = null;
+      Job job = null;
+      try {
+        loginUgi = UserGroupInformation.getLoginUser();
+        job = loginUgi.doAs(new PrivilegedExceptionAction<Job>() {
+
+          @Override
+          public Job run() throws Exception {
+            Job job = history.getJob(jobID);
+            return job;
+          }
+        });
+      } catch (IOException e) {
+        throw RPCUtil.getRemoteException(e);
+      } catch (InterruptedException e) {
+        throw RPCUtil.getRemoteException(e);
+      }
       if (job == null) {
-        throw RPCUtil.getRemoteException("Unknown job " + jobId);
+        throw RPCUtil.getRemoteException("Unknown job " + jobID);
       }
+      JobACL operation = JobACL.VIEW_JOB;
+      //TODO disable check access for now.
+      checkAccess(job, operation);
       return job;
     }
 
     @Override
     public GetCountersResponse getCounters(GetCountersRequest request) throws YarnRemoteException {
       JobId jobId = request.getJobId();
-      Job job = getJob(jobId);
+      Job job = verifyAndGetJob(jobId);
       GetCountersResponse response = recordFactory.newRecordInstance(GetCountersResponse.class);
       response.setCounters(job.getCounters());
       return response;
@@ -162,7 +191,7 @@ public class HistoryClientService extend
     @Override
     public GetJobReportResponse getJobReport(GetJobReportRequest request) throws YarnRemoteException {
       JobId jobId = request.getJobId();
-      Job job = getJob(jobId);
+      Job job = verifyAndGetJob(jobId);
       GetJobReportResponse response = recordFactory.newRecordInstance(GetJobReportResponse.class);
       response.setJobReport(job.getReport());
       return response;
@@ -171,7 +200,7 @@ public class HistoryClientService extend
     @Override
     public GetTaskAttemptReportResponse getTaskAttemptReport(GetTaskAttemptReportRequest request) throws YarnRemoteException {
       TaskAttemptId taskAttemptId = request.getTaskAttemptId();
-      Job job = getJob(taskAttemptId.getTaskId().getJobId());
+      Job job = verifyAndGetJob(taskAttemptId.getTaskId().getJobId());
       GetTaskAttemptReportResponse response = recordFactory.newRecordInstance(GetTaskAttemptReportResponse.class);
       response.setTaskAttemptReport(job.getTask(taskAttemptId.getTaskId()).getAttempt(taskAttemptId).getReport());
       return response;
@@ -180,7 +209,7 @@ public class HistoryClientService extend
     @Override
     public GetTaskReportResponse getTaskReport(GetTaskReportRequest request) throws YarnRemoteException {
       TaskId taskId = request.getTaskId();
-      Job job = getJob(taskId.getJobId());
+      Job job = verifyAndGetJob(taskId.getJobId());
       GetTaskReportResponse response = recordFactory.newRecordInstance(GetTaskReportResponse.class);
       response.setTaskReport(job.getTask(taskId).getReport());
       return response;
@@ -192,7 +221,7 @@ public class HistoryClientService extend
       int fromEventId = request.getFromEventId();
       int maxEvents = request.getMaxEvents();
       
-      Job job = getJob(jobId);
+      Job job = verifyAndGetJob(jobId);
       GetTaskAttemptCompletionEventsResponse response = recordFactory.newRecordInstance(GetTaskAttemptCompletionEventsResponse.class);
       response.addAllCompletionEvents(Arrays.asList(job.getTaskAttemptCompletionEvents(fromEventId, maxEvents)));
       return response;
@@ -200,21 +229,16 @@ public class HistoryClientService extend
       
     @Override
     public KillJobResponse killJob(KillJobRequest request) throws YarnRemoteException {
-      JobId jobId = request.getJobId();
       throw RPCUtil.getRemoteException("Invalid operation on completed job");
     }
     
     @Override
     public KillTaskResponse killTask(KillTaskRequest request) throws YarnRemoteException {
-      TaskId taskId = request.getTaskId();
-      getJob(taskId.getJobId());
       throw RPCUtil.getRemoteException("Invalid operation on completed job");
     }
     
     @Override
     public KillTaskAttemptResponse killTaskAttempt(KillTaskAttemptRequest request) throws YarnRemoteException {
-      TaskAttemptId taskAttemptId = request.getTaskAttemptId();
-      getJob(taskAttemptId.getTaskId().getJobId());
       throw RPCUtil.getRemoteException("Invalid operation on completed job");
     }
 
@@ -222,7 +246,7 @@ public class HistoryClientService extend
     public GetDiagnosticsResponse getDiagnostics(GetDiagnosticsRequest request) throws YarnRemoteException {
       TaskAttemptId taskAttemptId = request.getTaskAttemptId();
     
-      Job job = getJob(taskAttemptId.getTaskId().getJobId());
+      Job job = verifyAndGetJob(taskAttemptId.getTaskId().getJobId());
       
       GetDiagnosticsResponse response = recordFactory.newRecordInstance(GetDiagnosticsResponse.class);
       response.addAllDiagnostics(job.getTask(taskAttemptId.getTaskId()).getAttempt(taskAttemptId).getDiagnostics());
@@ -231,8 +255,6 @@ public class HistoryClientService extend
 
     @Override 
     public FailTaskAttemptResponse failTaskAttempt(FailTaskAttemptRequest request) throws YarnRemoteException {
-      TaskAttemptId taskAttemptId = request.getTaskAttemptId();
-      getJob(taskAttemptId.getTaskId().getJobId());
       throw RPCUtil.getRemoteException("Invalid operation on completed job");
     }
 
@@ -242,7 +264,7 @@ public class HistoryClientService extend
       TaskType taskType = request.getTaskType();
       
       GetTaskReportsResponse response = recordFactory.newRecordInstance(GetTaskReportsResponse.class);
-      Job job = getJob(jobId);
+      Job job = verifyAndGetJob(jobId);
       Collection<Task> tasks = job.getTasks(taskType).values();
       for (Task task : tasks) {
         response.addTaskReport(task.getReport());
@@ -250,6 +272,22 @@ public class HistoryClientService extend
       return response;
     }
 
+    private void checkAccess(Job job, JobACL jobOperation)
+        throws YarnRemoteException {
+      if (!UserGroupInformation.isSecurityEnabled()) {
+        return;
+      }
+      UserGroupInformation callerUGI;
+      try {
+        callerUGI = UserGroupInformation.getCurrentUser();
+      } catch (IOException e) {
+        throw RPCUtil.getRemoteException(e);
+      }
+      if (!job.checkAccess(callerUGI, jobOperation)) {
+        throw RPCUtil.getRemoteException(new AccessControlException("User "
+            + callerUGI.getShortUserName() + " cannot perform operation "
+            + jobOperation.name() + " on " + job.getID()));
+      }
+    }
   }
-
 }

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java?rev=1127298&r1=1127297&r2=1127298&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java Tue May 24 21:27:23 2011
@@ -45,16 +45,17 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Options;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TypeConverter;
-import org.apache.hadoop.mapreduce.v2.YarnMRJobConfig;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
 import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JHConfig;
 import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
 import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo;
 import org.apache.hadoop.yarn.Clock;
@@ -127,6 +128,11 @@ public class JobHistory extends Abstract
   private final SortedMap<JobId, Job> loadedJobCache = new ConcurrentSkipListMap<JobId, Job>(
       JOB_ID_COMPARATOR);
 
+  /**
+   * Maintains a mapping between intermediate user directories and the last known modification time.
+   */
+  private Map<String, Long> userDirModificationTimeMap = new HashMap<String, Long>();
+  
   //The number of jobs to maintain in the job list cache.
   private int jobListCacheSize;
   
@@ -153,17 +159,12 @@ public class JobHistory extends Abstract
   private FileContext doneDirFc; // done Dir FileContext
   
   private Path intermediateDoneDirPath = null; //Intermediate Done Dir Path
-  private FileContext intermediaDoneDirFc; //Intermediate Done Dir FileContext
+  private FileContext intermediateDoneDirFc; //Intermediate Done Dir FileContext
 
   private Thread moveIntermediateToDoneThread = null;
   private MoveIntermediateToDoneRunnable moveIntermediateToDoneRunnable = null;
   private ScheduledThreadPoolExecutor cleanerScheduledExecutor = null;
   
-  /*
-   * TODO
-   * Fix completion time in JobFinishedEvent
-   */
-  
   /**
    * Writes out files to the path
    * .....${DONE_DIR}/VERSION_STRING/YYYY/MM/DD/HH/SERIAL_NUM/jh{index_entries}.jhist
@@ -173,7 +174,7 @@ public class JobHistory extends Abstract
   public void init(Configuration conf) throws YarnException {
     LOG.info("JobHistory Init");
     this.conf = conf;
-    debugMode = conf.getBoolean(YarnMRJobConfig.HISTORY_DEBUG_MODE_KEY, false);
+    debugMode = conf.getBoolean(JHConfig.HISTORY_DEBUG_MODE_KEY, false);
     serialNumberLowDigits = debugMode ? 1 : 3;
     serialNumberFormat = ("%0"
         + (JobHistoryUtils.SERIAL_NUMBER_DIRECTORY_DIGITS + serialNumberLowDigits) + "d");
@@ -185,68 +186,31 @@ public class JobHistory extends Abstract
       doneDirPrefixPath = FileContext.getFileContext(conf).makeQualified(
           new Path(doneDirPrefix));
       doneDirFc = FileContext.getFileContext(doneDirPrefixPath.toUri(), conf);
-      if (!doneDirFc.util().exists(doneDirPrefixPath)) {
-        try {
-          doneDirFc.mkdir(doneDirPrefixPath, new FsPermission(
-              JobHistoryUtils.HISTORY_DIR_PERMISSION), true);
-        } catch (FileAlreadyExistsException e) {
-          LOG.info("JobHistory Done Directory: [" + doneDirPrefixPath
-              + "] already exists.");
-        }
-      }
-    } catch (IOException e) {
-      throw new YarnException("error creating done directory on dfs ", e);
-    }
-
-    String doneDirWithVersion = JobHistoryUtils
-        .getCurrentDoneDir(doneDirPrefix);
-    try {
-      Path doneDirWithVersionPath = FileContext.getFileContext(conf)
-          .makeQualified(new Path(doneDirWithVersion));
-      if (!doneDirFc.util().exists(doneDirWithVersionPath)) {
-        try {
-          doneDirFc.mkdir(doneDirWithVersionPath, new FsPermission(
-              JobHistoryUtils.HISTORY_DIR_PERMISSION), true);
-        } catch (FileAlreadyExistsException e) {
-          LOG.info("JobHistory Done Directory: [" + doneDirPrefixPath
-              + "] already exists.");
-        }
-      }
+      mkdir(doneDirFc, doneDirPrefixPath, new FsPermission(JobHistoryUtils.HISTORY_DONE_DIR_PERMISSION));
     } catch (IOException e) {
-      throw new YarnException("error creating done_version directory on dfs", e);
+      throw new YarnException("Error creating done directory: [" + doneDirPrefixPath + "]", e);
     }
 
     String intermediateDoneDirPrefix = JobHistoryUtils
     .getConfiguredHistoryIntermediateDoneDirPrefix(conf);
-    String intermediateDoneDir = JobHistoryUtils
-    .getCurrentDoneDir(intermediateDoneDirPrefix);
     try {
       intermediateDoneDirPath = FileContext.getFileContext(conf)
-          .makeQualified(new Path(intermediateDoneDir));
-      intermediaDoneDirFc = FileContext.getFileContext(
+          .makeQualified(new Path(intermediateDoneDirPrefix));
+      intermediateDoneDirFc = FileContext.getFileContext(
           intermediateDoneDirPath.toUri(), conf);
-      if (!intermediaDoneDirFc.util().exists(intermediateDoneDirPath)) {
-        try {
-          intermediaDoneDirFc.mkdir(intermediateDoneDirPath,
-              new FsPermission(JobHistoryUtils.HISTORY_DIR_PERMISSION), true);
-        } catch (FileAlreadyExistsException e) {
-          LOG.info("Intermediate JobHistory Done Directory: ["
-              + intermediateDoneDirPath + "] already exists.");
-        }
-      }
-
+      mkdir(intermediateDoneDirFc, intermediateDoneDirPath, new FsPermission(JobHistoryUtils.HISTORY_INTERMEDIATE_DONE_DIR_PERMISSIONS.toShort()));
     } catch (IOException e) {
       LOG.info("error creating done directory on dfs " + e);
-      throw new YarnException("error creating done directory on dfs ", e);
+      throw new YarnException("Error creating intermediate done directory: [" + intermediateDoneDirPath + "]", e);
     }
     
     
     
-    jobListCacheSize = conf.getInt(YarnMRJobConfig.HISTORY_SERVER_JOBLIST_CACHE_SIZE_KEY, DEFAULT_JOBLIST_CACHE_SIZE);
-    loadedJobCacheSize = conf.getInt(YarnMRJobConfig.HISTORY_SERVER_LOADED_JOB_CACHE_SIZE_KEY, DEFAULT_LOADEDJOB_CACHE_SIZE);
-    dateStringCacheSize = conf.getInt(YarnMRJobConfig.HISTORY_SERVER_DATESTRING_CACHE_SIZE_KEY, DEFAULT_DATESTRING_CACHE_SIZE);
-    moveThreadInterval = conf.getLong(YarnMRJobConfig.HISTORY_SERVER_DATESTRING_CACHE_SIZE_KEY, DEFAULT_MOVE_THREAD_INTERVAL);
-    numMoveThreads = conf.getInt(YarnMRJobConfig.HISTORY_SERVER_NUM_MOVE_THREADS, DEFAULT_MOVE_THREAD_COUNT);
+    jobListCacheSize = conf.getInt(JHConfig.HISTORY_SERVER_JOBLIST_CACHE_SIZE_KEY, DEFAULT_JOBLIST_CACHE_SIZE);
+    loadedJobCacheSize = conf.getInt(JHConfig.HISTORY_SERVER_LOADED_JOB_CACHE_SIZE_KEY, DEFAULT_LOADEDJOB_CACHE_SIZE);
+    dateStringCacheSize = conf.getInt(JHConfig.HISTORY_SERVER_DATESTRING_CACHE_SIZE_KEY, DEFAULT_DATESTRING_CACHE_SIZE);
+    moveThreadInterval = conf.getLong(JHConfig.HISTORY_SERVER_DATESTRING_CACHE_SIZE_KEY, DEFAULT_MOVE_THREAD_INTERVAL);
+    numMoveThreads = conf.getInt(JHConfig.HISTORY_SERVER_NUM_MOVE_THREADS, DEFAULT_MOVE_THREAD_COUNT);
     try {
     initExisting();
     } catch (IOException e) {
@@ -255,6 +219,26 @@ public class JobHistory extends Abstract
     super.init(conf);
   }
   
+  private void mkdir(FileContext fc, Path path, FsPermission fsp)
+      throws IOException {
+    if (!fc.util().exists(path)) {
+      try {
+        fc.mkdir(path, fsp, true);
+
+        FileStatus fsStatus = fc.getFileStatus(path);
+        LOG.info("Perms after creating " + fsStatus.getPermission().toShort()
+            + ", Expected: " + fsp.toShort());
+        if (fsStatus.getPermission().toShort() != fsp.toShort()) {
+          LOG.info("Explicitly setting permissions to : " + fsp.toShort()
+              + ", " + fsp);
+          fc.setPermission(path, fsp);
+        }
+      } catch (FileAlreadyExistsException e) {
+        LOG.info("Directory: [" + path + "] already exists.");
+      }
+    }
+  }
+
   @Override
   public void start() {
     //Start moveIntermediatToDoneThread
@@ -264,11 +248,17 @@ public class JobHistory extends Abstract
     moveIntermediateToDoneThread.start();
     
     //Start historyCleaner
-    long maxAgeOfHistoryFiles = conf.getLong(
-        YarnMRJobConfig.HISTORY_MAXAGE, DEFAULT_HISTORY_MAX_AGE);
+    boolean startCleanerService = conf.getBoolean(JHConfig.RUN_HISTORY_CLEANER_KEY, true);
+    if (startCleanerService) {
+      long maxAgeOfHistoryFiles = conf.getLong(JHConfig.HISTORY_MAXAGE,
+          DEFAULT_HISTORY_MAX_AGE);
     cleanerScheduledExecutor = new ScheduledThreadPoolExecutor(1);
-    long runInterval = conf.getLong(YarnMRJobConfig.HISTORY_CLEANER_RUN_INTERVAL, DEFAULT_RUN_INTERVAL);
-    cleanerScheduledExecutor.scheduleAtFixedRate(new HistoryCleaner(maxAgeOfHistoryFiles), 30*1000l, runInterval, TimeUnit.MILLISECONDS);
+      long runInterval = conf.getLong(JHConfig.HISTORY_CLEANER_RUN_INTERVAL,
+          DEFAULT_RUN_INTERVAL);
+      cleanerScheduledExecutor
+          .scheduleAtFixedRate(new HistoryCleaner(maxAgeOfHistoryFiles),
+              30 * 1000l, runInterval, TimeUnit.MILLISECONDS);
+    }
     super.start();
   }
   
@@ -441,40 +431,51 @@ public class JobHistory extends Abstract
     }
   }
   
+  
   /**
-   * Populates files from the intermediate directory into the intermediate cache.
+   * Scans the intermediate directory to find user directories. Scans these
+   * for history files if the modification time for the directory has changed.
    * @throws IOException
    */
   private void scanIntermediateDirectory() throws IOException {
-    List<FileStatus> fileStatusList = scanDirectoryForHistoryFiles(intermediateDoneDirPath, intermediaDoneDirFc);
-    for (FileStatus fs : fileStatusList) {
-      JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath().getName());
-      String doneFileName = JobHistoryUtils.getIntermediateDoneFileName(jobIndexInfo.getJobId());
-      if (intermediaDoneDirFc.util().exists(intermediaDoneDirFc.makeQualified(new Path(intermediateDoneDirPath, doneFileName)))) {
-        String confFileName = JobHistoryUtils.getIntermediateConfFileName(jobIndexInfo.getJobId());
-        MetaInfo metaInfo = new MetaInfo(fs.getPath(), new Path(fs.getPath().getParent(), confFileName), jobIndexInfo);
-        if (!intermediateListCache.containsKey(jobIndexInfo.getJobId())) {
-          intermediateListCache.put(jobIndexInfo.getJobId(), metaInfo);
+    List<FileStatus> userDirList = JobHistoryUtils.localGlobber(intermediateDoneDirFc, intermediateDoneDirPath, "");
+    
+    for (FileStatus userDir : userDirList) {
+      String name = userDir.getPath().getName();
+      long newModificationTime = userDir.getModificationTime();
+      boolean shouldScan = false;
+      synchronized (userDirModificationTimeMap) {
+        if (!userDirModificationTimeMap.containsKey(name) || newModificationTime > userDirModificationTimeMap.get(name)) {
+            shouldScan = true;
+            userDirModificationTimeMap.put(name, newModificationTime);
+        }  
         }  
+      if (shouldScan) {
+        scanIntermediateDirectory(userDir.getPath());
       }
     }
   }
 
   /**
-   * Checks for the existance of the done file in the intermediate done
-   * directory for the specified jobId.
-   * 
-   * @param jobId the jobId.
-   * @return true if a done file exists for the specified jobId.
+   * Scans the specified path and populates the intermediate cache.
+   * @param absPath
    * @throws IOException
    */
-  private boolean doneFileExists(JobId jobId) throws IOException {
-    String doneFileName = JobHistoryUtils.getIntermediateDoneFileName(jobId);
-    Path qualifiedDoneFilePath = intermediaDoneDirFc.makeQualified(new Path(intermediateDoneDirPath, doneFileName));
-    if (intermediaDoneDirFc.util().exists(qualifiedDoneFilePath)) {
-      return true;
+  private void scanIntermediateDirectory(final Path absPath)
+      throws IOException {
+    List<FileStatus> fileStatusList = scanDirectoryForHistoryFiles(absPath,
+        intermediateDoneDirFc);
+    for (FileStatus fs : fileStatusList) {
+      JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath()
+          .getName());
+      String confFileName = JobHistoryUtils
+          .getIntermediateConfFileName(jobIndexInfo.getJobId());
+      MetaInfo metaInfo = new MetaInfo(fs.getPath(), new Path(fs.getPath()
+          .getParent(), confFileName), jobIndexInfo);
+      if (!intermediateListCache.containsKey(jobIndexInfo.getJobId())) {
+        intermediateListCache.put(jobIndexInfo.getJobId(), metaInfo);
+      }
     }
-    return false;
   }
   
   /**
@@ -486,15 +487,10 @@ public class JobHistory extends Abstract
    * @return A MetaInfo object for the jobId, null if not found.
    * @throws IOException
    */
-  private MetaInfo getJobMetaInfo(List<FileStatus> fileStatusList, JobId jobId, boolean checkForDoneFile) throws IOException {
+  private MetaInfo getJobMetaInfo(List<FileStatus> fileStatusList, JobId jobId) throws IOException {
     for (FileStatus fs : fileStatusList) {
       JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath().getName());
       if (jobIndexInfo.getJobId().equals(jobId)) {
-        if (checkForDoneFile) {
-          if (!doneFileExists(jobIndexInfo.getJobId())) {
-            return null;
-          }
-        }
         String confFileName = JobHistoryUtils
             .getIntermediateConfFileName(jobIndexInfo.getJobId());
         MetaInfo metaInfo = new MetaInfo(fs.getPath(), new Path(fs.getPath()
@@ -524,7 +520,7 @@ public class JobHistory extends Abstract
     for (String timestampPart : dateStringSet) {
       Path logDir = canonicalHistoryLogPath(jobId, timestampPart);
       List<FileStatus> fileStatusList = scanDirectoryForHistoryFiles(logDir, doneDirFc);
-      MetaInfo metaInfo = getJobMetaInfo(fileStatusList, jobId, false);
+      MetaInfo metaInfo = getJobMetaInfo(fileStatusList, jobId);
       if (metaInfo != null) {
         return metaInfo;
       }
@@ -539,21 +535,8 @@ public class JobHistory extends Abstract
    * @throws IOException
    */
   private MetaInfo scanIntermediateForJob(JobId jobId) throws IOException {
-    MetaInfo matchedMi = null;
-    List<FileStatus> fileStatusList = scanDirectoryForHistoryFiles(intermediateDoneDirPath, intermediaDoneDirFc);
-    
-    MetaInfo metaInfo = getJobMetaInfo(fileStatusList, jobId, true);
-    if (metaInfo == null) {
-      return null;
-    }
-    JobIndexInfo jobIndexInfo = metaInfo.getJobIndexInfo();
-    if (!intermediateListCache.containsKey(jobIndexInfo.getJobId())) {
-      intermediateListCache.put(jobIndexInfo.getJobId(), metaInfo);
-      matchedMi = metaInfo;
-    } else {
-      matchedMi = intermediateListCache.get(jobId);
-    }
-    return matchedMi;
+    scanIntermediateDirectory();
+    return intermediateListCache.get(jobId);
   }
   
   
@@ -862,7 +845,7 @@ public class JobHistory extends Abstract
         try {
           moveToDoneNow(historyFile, toPath);
         } catch (IOException e) {
-          LOG.info("Failed to move file: " + historyFile + " for jobId: " + jobId);
+          LOG.info("Failed to move file: " + historyFile + " for jobId: " + jobId, e);
           return;
         }
         metaInfo.setHistoryFile(toPath);
@@ -872,29 +855,24 @@ public class JobHistory extends Abstract
         try {
           moveToDoneNow(confFile, toPath);
         } catch (IOException e) {
-          LOG.info("Failed to move file: " + historyFile + " for jobId: " + jobId);
+          LOG.info("Failed to move file: " + historyFile + " for jobId: " + jobId, e);
           return;
         }
         metaInfo.setConfFile(toPath);
       }
     }
-    //TODO Does this need to be synchronized ?
-    Path doneFileToDelete = intermediaDoneDirFc.makeQualified(new Path(intermediateDoneDirPath, JobHistoryUtils.getIntermediateDoneFileName(jobId)));
-    try {
-      intermediaDoneDirFc.delete(doneFileToDelete, false);
-    } catch (IOException e) {
-      LOG.info("Unable to remove done file: " + doneFileToDelete);
-    }
     addToJobListCache(jobId, metaInfo);
     intermediateListCache.remove(jobId);
   }
   
-  private void moveToDoneNow(Path src, Path target) throws IOException {
+  private void moveToDoneNow(final Path src, final Path target)
+      throws IOException {
     LOG.info("Moving " + src.toString() + " to " + target.toString());
-    intermediaDoneDirFc.util().copy(src, target);
-    intermediaDoneDirFc.delete(src, false);
-    doneDirFc.setPermission(target,
-        new FsPermission(JobHistoryUtils.HISTORY_FILE_PERMISSION));
+    intermediateDoneDirFc.rename(src, target, Options.Rename.NONE);
+    // fc.util().copy(src, target);
+    //fc.delete(src, false);
+    //intermediateDoneDirFc.setPermission(target, new FsPermission(
+    //JobHistoryUtils.HISTORY_DONE_FILE_PERMISSION));
   }
   
   private void maybeMakeSubdirectory(Path path) throws IOException {
@@ -913,7 +891,16 @@ public class JobHistory extends Abstract
       }
     } catch (FileNotFoundException fnfE) {
       try {
-        doneDirFc.mkdir(path, new FsPermission(JobHistoryUtils.HISTORY_DIR_PERMISSION), true);
+        FsPermission fsp = new FsPermission(JobHistoryUtils.HISTORY_DONE_DIR_PERMISSION);
+        doneDirFc.mkdir(path, fsp, true);
+        FileStatus fsStatus = doneDirFc.getFileStatus(path);
+        LOG.info("Perms after creating " + fsStatus.getPermission().toShort()
+            + ", Expected: " + fsp.toShort());
+        if (fsStatus.getPermission().toShort() != fsp.toShort()) {
+          LOG.info("Explicitly setting permissions to : " + fsp.toShort()
+              + ", " + fsp);
+          doneDirFc.setPermission(path, fsp);
+        }
         synchronized(existingDoneSubdirs) {
           existingDoneSubdirs.add(path);
         }
@@ -931,6 +918,7 @@ public class JobHistory extends Abstract
     return new Path(doneDirPrefixPath, JobHistoryUtils.historyLogSubdirectory(id, timestampComponent, serialNumberFormat));
   }  
   
+
   @Override
   public synchronized Job getJob(JobId jobId) {
     Job job = null;
@@ -974,7 +962,6 @@ public class JobHistory extends Abstract
   
   
   
-  
   static class MetaInfo {
     private Path historyFile;
     private Path confFile; 
@@ -1065,28 +1052,19 @@ public class JobHistory extends Abstract
       deleteFile(metaInfo.getConfFile());
       jobListCache.remove(metaInfo.getJobIndexInfo().getJobId());
       loadedJobCache.remove(metaInfo.getJobIndexInfo().getJobId());
-      //TODO Get rid of entries in the cache.
     }
     
-    private void deleteFile(Path path) throws IOException {
-      delete(path, false);
+    private void deleteFile(final Path path) throws IOException {
+      doneDirFc.delete(doneDirFc.makeQualified(path), false);
       filesDeleted++;
     }
     
     private void deleteDir(Path path) throws IOException {
-      delete(path, true);
+      doneDirFc.delete(doneDirFc.makeQualified(path), true);
       dirsDeleted++;
     }
-    
-    private void delete(Path path, boolean recursive) throws IOException {
-      doneDirFc.delete(doneDirFc.makeQualified(path), recursive);
     }
     
-  }
-  
-  
-  
-  
   
   
   

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java?rev=1127298&r1=1127297&r2=1127298&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java Tue May 24 21:27:23 2011
@@ -18,11 +18,16 @@
 
 package org.apache.hadoop.mapreduce.v2.hs;
 
+import java.io.IOException;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JHConfig;
+import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.service.CompositeService;
 
@@ -46,8 +51,14 @@ public class JobHistoryServer extends Co
     super(JobHistoryServer.class.getName());
   }
 
+  @Override
   public synchronized void init(Configuration conf) {
     Configuration config = new YarnConfiguration(conf);
+    try {
+      doSecureLogin(conf);
+    } catch(IOException ie) {
+      throw new YarnException("History Server Failed to login", ie);
+    }
     jobHistoryService = new JobHistory();
     historyContext = (HistoryContext)jobHistoryService;
     clientService = new HistoryClientService(historyContext);
@@ -56,6 +67,11 @@ public class JobHistoryServer extends Co
     super.init(config);
   }
 
+  protected void doSecureLogin(Configuration conf) throws IOException {
+    SecurityUtil.login(conf, JHConfig.HS_KEYTAB_KEY,
+        JHConfig.HS_SERVER_PRINCIPAL_KEY);
+  }
+
   public static void main(String[] args) {
     StringUtils.startupShutdownMessage(JobHistoryServer.class, args, LOG);
     JobHistoryServer server = null;
@@ -69,5 +85,4 @@ public class JobHistoryServer extends Co
       System.exit(-1);
     }
   }
-
 }

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java?rev=1127298&r1=1127297&r2=1127298&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java Tue May 24 21:27:23 2011
@@ -60,15 +60,14 @@ public class TestJobHistoryParsing {
     //make sure all events are flushed
     app.waitForState(Service.STATE.STOPPED);
     
-    String jobhistoryDir = JobHistoryUtils.getConfiguredHistoryIntermediateDoneDirPrefix(conf);
+    String jobhistoryDir = JobHistoryUtils.getHistoryIntermediateDoneDirForUser(conf);
     JobHistory jobHistory = new JobHistory();
     jobHistory.init(conf);
     
-    String currentJobHistoryDir = JobHistoryUtils.getCurrentDoneDir(jobhistoryDir);
     JobIndexInfo jobIndexInfo = jobHistory.getJobMetaInfo(jobId).getJobIndexInfo();
     String jobhistoryFileName = FileNameIndexUtils.getDoneFileName(jobIndexInfo);
     
-    Path historyFilePath = new Path(currentJobHistoryDir, jobhistoryFileName);
+    Path historyFilePath = new Path(jobhistoryDir, jobhistoryFileName);
     FSDataInputStream in = null;
     LOG.info("JobHistoryFile is: " + historyFilePath);
     try {

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java?rev=1127298&r1=1127297&r2=1127298&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java Tue May 24 21:27:23 2011
@@ -25,6 +25,7 @@ import java.util.List;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.JobID;
@@ -32,7 +33,6 @@ import org.apache.hadoop.mapreduce.JobSt
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.TypeConverter;
-import org.apache.hadoop.mapreduce.v2.YarnMRJobConfig;
 import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptRequest;
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetCountersRequest;
@@ -43,6 +43,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillJobRequest;
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskAttemptRequest;
 import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JHConfig;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.SecurityInfo;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -58,6 +59,7 @@ import org.apache.hadoop.yarn.ipc.RPCUti
 import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier;
 import org.apache.hadoop.yarn.security.SchedulerSecurityInfo;
+import org.apache.hadoop.yarn.security.client.ClientRMSecurityInfo;
 
 public class ClientServiceDelegate {
   private static final Log LOG = LogFactory.getLog(ClientServiceDelegate.class);
@@ -116,7 +118,7 @@ public class ClientServiceDelegate {
             UserGroupInformation.getCurrentUser().addToken(clientToken);
         }
         LOG.info("Connecting to " + serviceAddr);
-        instantiateProxy(serviceAddr);
+        instantiateAMProxy(serviceAddr);
         return;
       } catch (Exception e) {
         //possibly
@@ -132,15 +134,16 @@ public class ClientServiceDelegate {
         appMaster = rm.getApplicationMaster(currentAppId);
       }
     }
+    //TODO Should this be additional states ?
     if (ApplicationState.COMPLETED.equals(appMaster.getState())) {
-      serviceAddr = conf.get(YarnMRJobConfig.HS_BIND_ADDRESS,
-          YarnMRJobConfig.DEFAULT_HS_BIND_ADDRESS);
+      serviceAddr = conf.get(JHConfig.HS_BIND_ADDRESS,
+          JHConfig.DEFAULT_HS_BIND_ADDRESS);
       LOG.info("Application state is completed. " +
             "Redirecting to job history server " + serviceAddr);
       //TODO:
       serviceHttpAddr = "";
       try {
-        instantiateProxy(serviceAddr);
+        instantiateHistoryProxy(serviceAddr);
         return;
       } catch (IOException e) {
         throw new YarnException(e);
@@ -151,8 +154,9 @@ public class ClientServiceDelegate {
         "Cannot connect to Application with state " + appMaster.getState());
   }
 
-  private void instantiateProxy(final String serviceAddr) throws IOException {
+  private void instantiateAMProxy(final String serviceAddr) throws IOException {
     UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
+    LOG.trace("Connecting to ApplicationMaster at: " + serviceAddr);
     realProxy = currentUser.doAs(new PrivilegedAction<MRClientProtocol>() {
       @Override
       public MRClientProtocol run() {
@@ -165,6 +169,20 @@ public class ClientServiceDelegate {
         NetUtils.createSocketAddr(serviceAddr), myConf);
       }
     });
+    LOG.trace("Connected to ApplicationMaster at: " + serviceAddr);
+  }
+
+  private void instantiateHistoryProxy(final String serviceAddr)
+      throws IOException {
+    LOG.trace("Connecting to HistoryServer at: " + serviceAddr);
+    Configuration myConf = new Configuration(conf);
+    //TODO This should ideally be using it's own class (instead of ClientRMSecurityInfo)
+    myConf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_INFO_CLASS_NAME,
+        ClientRMSecurityInfo.class, SecurityInfo.class);
+    YarnRPC rpc = YarnRPC.create(myConf);
+    realProxy = (MRClientProtocol) rpc.getProxy(MRClientProtocol.class,
+        NetUtils.createSocketAddr(serviceAddr), myConf);
+    LOG.trace("Connected to HistoryServer at: " + serviceAddr);
   }
 
   public org.apache.hadoop.mapreduce.Counters getJobCounters(JobID arg0) throws IOException,

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java?rev=1127298&r1=1127297&r2=1127298&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java Tue May 24 21:27:23 2011
@@ -28,7 +28,6 @@ import org.apache.avro.ipc.Server;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.v2.YarnMRJobConfig;
 import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptRequest;
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptResponse;
@@ -57,6 +56,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JHConfig;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.yarn.YarnException;
@@ -115,7 +115,7 @@ public class TestClientRedirect {
     
     Configuration conf = new YarnConfiguration();
     conf.set(YarnConfiguration.APPSMANAGER_ADDRESS, RMADDRESS);
-    conf.set(YarnMRJobConfig.HS_BIND_ADDRESS, HSHOSTADDRESS);
+    conf.set(JHConfig.HS_BIND_ADDRESS, HSHOSTADDRESS);
     RMService rmService = new RMService("test");
     rmService.init(conf);
     rmService.start();

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/ApplicationsManagerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/ApplicationsManagerImpl.java?rev=1127298&r1=1127297&r2=1127298&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/ApplicationsManagerImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/ApplicationsManagerImpl.java Tue May 24 21:27:23 2011
@@ -142,6 +142,7 @@ public class ApplicationsManagerImpl ext
   public synchronized ApplicationMaster getApplicationMaster(ApplicationId applicationId) {
     ApplicationMaster appMaster =
       amTracker.get(applicationId).getMaster();
+    //TODO NPE (When the RM is restarted - it doesn't know about previous AMs)
     return appMaster;
   }
   



Mime
View raw message