hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From szets...@apache.org
Subject svn commit: r1399950 [5/11] - in /hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project: ./ bin/ conf/ dev-support/ hadoop-mapreduce-client/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/ hadoop-mapreduc...
Date Fri, 19 Oct 2012 02:28:42 GMT
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java Fri Oct 19 02:25:55 2012
@@ -23,14 +23,14 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadFactory;
@@ -62,6 +62,7 @@ import org.apache.hadoop.mapreduce.v2.jo
 import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.service.AbstractService;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 /**
@@ -77,7 +78,7 @@ public class HistoryFileManager extends 
   private static enum HistoryInfoState {
     IN_INTERMEDIATE, IN_DONE, DELETED, MOVE_FAILED
   };
-
+  
   private static String DONE_BEFORE_SERIAL_TAIL = JobHistoryUtils
       .doneSubdirsBeforeSerialTail();
 
@@ -130,7 +131,7 @@ public class HistoryFileManager extends 
     }
   }
 
-  private static class JobListCache {
+  static class JobListCache {
     private ConcurrentSkipListMap<JobId, HistoryFileInfo> cache;
     private int maxSize;
     private long maxAge;
@@ -199,6 +200,29 @@ public class HistoryFileManager extends 
     }
   }
 
+  /**
+   * This class represents a user dir in the intermediate done directory.  This
+   * is mostly for locking purposes. 
+   */
+  private class UserLogDir {
+    long modTime = 0;
+    
+    public synchronized void scanIfNeeded(FileStatus fs) {
+      long newModTime = fs.getModificationTime();
+      if (modTime != newModTime) {
+        Path p = fs.getPath();
+        try {
+          scanIntermediateDirectory(p);
+          //If scanning fails, we will scan again.  We assume the failure is
+          // temporary.
+          modTime = newModTime;
+        } catch (IOException e) {
+          LOG.error("Error while trying to scan the directory " + p, e);
+        }
+      }
+    }
+  }
+  
   public class HistoryFileInfo {
     private Path historyFile;
     private Path confFile;
@@ -216,12 +240,14 @@ public class HistoryFileManager extends 
           : HistoryInfoState.IN_INTERMEDIATE;
     }
 
-    private synchronized boolean isMovePending() {
+    @VisibleForTesting
+    synchronized boolean isMovePending() {
       return state == HistoryInfoState.IN_INTERMEDIATE
           || state == HistoryInfoState.MOVE_FAILED;
     }
 
-    private synchronized boolean didMoveFail() {
+    @VisibleForTesting
+    synchronized boolean didMoveFail() {
       return state == HistoryInfoState.MOVE_FAILED;
     }
     
@@ -336,13 +362,13 @@ public class HistoryFileManager extends 
     public synchronized Configuration loadConfFile() throws IOException {
       FileContext fc = FileContext.getFileContext(confFile.toUri(), conf);
       Configuration jobConf = new Configuration(false);
-      jobConf.addResource(fc.open(confFile));
+      jobConf.addResource(fc.open(confFile), confFile.toString());
       return jobConf;
     }
   }
 
   private SerialNumberIndex serialNumberIndex = null;
-  private JobListCache jobListCache = null;
+  protected JobListCache jobListCache = null;
 
   // Maintains a list of known done subdirectories.
   private final Set<Path> existingDoneSubdirs = Collections
@@ -352,13 +378,13 @@ public class HistoryFileManager extends 
    * Maintains a mapping between intermediate user directories and the last
    * known modification time.
    */
-  private Map<String, Long> userDirModificationTimeMap = new HashMap<String, Long>();
+  private ConcurrentMap<String, UserLogDir> userDirModificationTimeMap = 
+    new ConcurrentHashMap<String, UserLogDir>();
 
   private JobACLsManager aclsMgr;
 
   private Configuration conf;
 
-  private boolean debugMode;
   private String serialNumberFormat;
 
   private Path doneDirPrefixPath = null; // folder for completed jobs
@@ -379,8 +405,7 @@ public class HistoryFileManager extends 
   public void init(Configuration conf) {
     this.conf = conf;
 
-    debugMode = conf.getBoolean(JHAdminConfig.MR_HISTORY_DEBUG_MODE, false);
-    int serialNumberLowDigits = debugMode ? 1 : 3;
+    int serialNumberLowDigits = 3;
     serialNumberFormat = ("%0"
         + (JobHistoryUtils.SERIAL_NUMBER_DIRECTORY_DIGITS + serialNumberLowDigits)
         + "d");
@@ -392,6 +417,7 @@ public class HistoryFileManager extends 
       doneDirPrefixPath = FileContext.getFileContext(conf).makeQualified(
           new Path(doneDirPrefix));
       doneDirFc = FileContext.getFileContext(doneDirPrefixPath.toUri(), conf);
+      doneDirFc.setUMask(JobHistoryUtils.HISTORY_DONE_DIR_UMASK);
       mkdir(doneDirFc, doneDirPrefixPath, new FsPermission(
           JobHistoryUtils.HISTORY_DONE_DIR_PERMISSION));
     } catch (IOException e) {
@@ -585,23 +611,15 @@ public class HistoryFileManager extends 
 
     for (FileStatus userDir : userDirList) {
       String name = userDir.getPath().getName();
-      long newModificationTime = userDir.getModificationTime();
-      boolean shouldScan = false;
-      synchronized (userDirModificationTimeMap) {
-        if (!userDirModificationTimeMap.containsKey(name)
-            || newModificationTime > userDirModificationTimeMap.get(name)) {
-          shouldScan = true;
-          userDirModificationTimeMap.put(name, newModificationTime);
-        }
-      }
-      if (shouldScan) {
-        try {
-          scanIntermediateDirectory(userDir.getPath());
-        } catch (IOException e) {
-          LOG.error("Error while trying to scan the directory " 
-              + userDir.getPath(), e);
+      UserLogDir dir = userDirModificationTimeMap.get(name);
+      if(dir == null) {
+        dir = new UserLogDir();
+        UserLogDir old = userDirModificationTimeMap.putIfAbsent(name, dir);
+        if(old != null) {
+          dir = old;
         }
       }
+      dir.scanIfNeeded(userDir);
     }
   }
 
@@ -692,8 +710,8 @@ public class HistoryFileManager extends 
    * @throws IOException
    */
   private HistoryFileInfo scanOldDirsForJob(JobId jobId) throws IOException {
-    int jobSerialNumber = JobHistoryUtils.jobSerialNumber(jobId);
-    String boxedSerialNumber = String.valueOf(jobSerialNumber);
+    String boxedSerialNumber = JobHistoryUtils.serialNumberDirectoryComponent(
+        jobId, serialNumberFormat);
     Set<String> dateStringSet = serialNumberIndex.get(boxedSerialNumber);
     if (dateStringSet == null) {
       return null;
@@ -779,8 +797,8 @@ public class HistoryFileManager extends 
   }
 
   private Path canonicalHistoryLogPath(JobId id, long millisecondTime) {
-    String timestampComponent = JobHistoryUtils.timestampDirectoryComponent(
-        millisecondTime, debugMode);
+    String timestampComponent = JobHistoryUtils
+        .timestampDirectoryComponent(millisecondTime);
     return new Path(doneDirPrefixPath, JobHistoryUtils.historyLogSubdirectory(
         id, timestampComponent, serialNumberFormat));
   }

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java Fri Oct 19 02:25:55 2012
@@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.YarnExcept
 import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.logaggregation.AggregatedLogDeletionService;
 import org.apache.hadoop.yarn.service.CompositeService;
 
 /******************************************************************
@@ -53,6 +54,7 @@ public class JobHistoryServer extends Co
   private HistoryClientService clientService;
   private JobHistory jobHistoryService;
   private JHSDelegationTokenSecretManager jhsDTSecretManager;
+  private AggregatedLogDeletionService aggLogDelService;
 
   public JobHistoryServer() {
     super(JobHistoryServer.class.getName());
@@ -74,8 +76,10 @@ public class JobHistoryServer extends Co
     this.jhsDTSecretManager = createJHSSecretManager(conf);
     clientService = new HistoryClientService(historyContext, 
         this.jhsDTSecretManager);
+    aggLogDelService = new AggregatedLogDeletionService();
     addService(jobHistoryService);
     addService(clientService);
+    addService(aggLogDelService);
     super.init(config);
   }
 

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsJobBlock.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsJobBlock.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsJobBlock.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsJobBlock.java Fri Oct 19 02:25:55 2012
@@ -21,28 +21,18 @@ package org.apache.hadoop.mapreduce.v2.h
 import com.google.inject.Inject;
 import java.util.Date;
 import java.util.List;
-import java.util.Map;
 
-import org.apache.hadoop.mapreduce.JobACL;
+import org.apache.hadoop.http.HttpConfig;
 import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
-import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
-import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
-import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
-import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
-import org.apache.hadoop.mapreduce.v2.app.job.Task;
-import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
 import org.apache.hadoop.mapreduce.v2.app.webapp.dao.ConfEntryInfo;
 import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.AMAttemptInfo;
 import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobInfo;
 import org.apache.hadoop.mapreduce.v2.util.MRApps;
 import org.apache.hadoop.mapreduce.v2.util.MRApps.TaskAttemptStateUI;
-import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.util.BuilderUtils;
 import org.apache.hadoop.yarn.util.Times;
 import org.apache.hadoop.yarn.webapp.ResponseInfo;
 import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
@@ -142,7 +132,8 @@ public class HsJobBlock extends HtmlBloc
             table.tr((odd = !odd) ? _ODD : _EVEN).
               td(String.valueOf(attempt.getAttemptId())).
               td(new Date(attempt.getStartTime()).toString()).
-              td().a(".nodelink", url("http://", attempt.getNodeHttpAddress()), 
+              td().a(".nodelink", url(HttpConfig.getSchemePrefix(),
+                  attempt.getNodeHttpAddress()),
                   attempt.getNodeHttpAddress())._().
               td().a(".logslink", url(attempt.getShortLogsLink()), 
                       "logs")._().

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsTaskPage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsTaskPage.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsTaskPage.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsTaskPage.java Fri Oct 19 02:25:55 2012
@@ -29,6 +29,7 @@ import static org.apache.hadoop.yarn.web
 
 import java.util.Collection;
 
+import org.apache.hadoop.http.HttpConfig;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
@@ -143,7 +144,7 @@ public class HsTaskPage extends HsView {
 
         td.br().$title(String.valueOf(sortId))._(). // sorting
             _(taid)._().td(ta.getState().toString()).td().a(".nodelink",
-                "http://"+ nodeHttpAddr,
+                HttpConfig.getSchemePrefix()+ nodeHttpAddr,
                 nodeRackName + "/" + nodeHttpAddr);
         td._();
         row.td().

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsWebApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsWebApp.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsWebApp.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsWebApp.java Fri Oct 19 02:25:55 2012
@@ -21,6 +21,7 @@ package org.apache.hadoop.mapreduce.v2.h
 import static org.apache.hadoop.yarn.util.StringHelper.pajoin;
 import static org.apache.hadoop.yarn.webapp.YarnWebParams.APP_OWNER;
 import static org.apache.hadoop.yarn.webapp.YarnWebParams.CONTAINER_ID;
+import static org.apache.hadoop.yarn.webapp.YarnWebParams.CONTAINER_LOG_TYPE;
 import static org.apache.hadoop.yarn.webapp.YarnWebParams.ENTITY_STRING;
 import static org.apache.hadoop.yarn.webapp.YarnWebParams.NM_NODENAME;
 
@@ -60,10 +61,10 @@ public class HsWebApp extends WebApp imp
     route(pajoin("/singletaskcounter",TASK_ID, COUNTER_GROUP, COUNTER_NAME),
         HsController.class, "singleTaskCounter");
     route("/about", HsController.class, "about");
-    route(pajoin("/logs", NM_NODENAME, CONTAINER_ID, ENTITY_STRING, APP_OWNER),
-        HsController.class, "logs");
-    route(pajoin("/nmlogs", NM_NODENAME, CONTAINER_ID, ENTITY_STRING, APP_OWNER),
-        HsController.class, "nmlogs");
+    route(pajoin("/logs", NM_NODENAME, CONTAINER_ID, ENTITY_STRING, APP_OWNER,
+        CONTAINER_LOG_TYPE), HsController.class, "logs");
+    route(pajoin("/nmlogs", NM_NODENAME, CONTAINER_ID, ENTITY_STRING, APP_OWNER,
+        CONTAINER_LOG_TYPE), HsController.class, "nmlogs");
   }
 }
 

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java Fri Oct 19 02:25:55 2012
@@ -49,6 +49,7 @@ import org.apache.hadoop.mapreduce.jobhi
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
 import org.apache.hadoop.mapreduce.v2.app.MRApp;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
@@ -59,6 +60,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.HistoryFileInfo;
 import org.apache.hadoop.mapreduce.v2.hs.TestJobHistoryEvents.MRAppWithHistory;
 import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
 import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
 import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo;
 import org.apache.hadoop.net.DNSToSwitchMapping;
@@ -84,6 +86,13 @@ public class TestJobHistoryParsing {
   }
 
   @Test
+  public void testJobInfo() throws Exception {
+    JobInfo info = new JobInfo();
+    Assert.assertEquals("NORMAL", info.getPriority());
+    info.printAll();
+  }
+
+  @Test
   public void testHistoryParsing() throws Exception {
     LOG.info("STARTING testHistoryParsing()");
     try {
@@ -395,6 +404,108 @@ public class TestJobHistoryParsing {
     }
   }
   
+  @Test
+  public void testCountersForFailedTask() throws Exception {
+    LOG.info("STARTING testCountersForFailedTask");
+    try {
+    Configuration conf = new Configuration();
+    conf
+        .setClass(
+            CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
+            MyResolver.class, DNSToSwitchMapping.class);
+    RackResolver.init(conf);
+    MRApp app = new MRAppWithHistoryWithFailedTask(2, 1, true,
+        this.getClass().getName(), true);
+    app.submit(conf);
+    Job job = app.getContext().getAllJobs().values().iterator().next();
+    JobId jobId = job.getID();
+    app.waitForState(job, JobState.FAILED);
+
+    // make sure all events are flushed
+    app.waitForState(Service.STATE.STOPPED);
+
+    String jobhistoryDir = JobHistoryUtils
+        .getHistoryIntermediateDoneDirForUser(conf);
+    JobHistory jobHistory = new JobHistory();
+    jobHistory.init(conf);
+
+    JobIndexInfo jobIndexInfo = jobHistory.getJobFileInfo(jobId)
+        .getJobIndexInfo();
+    String jobhistoryFileName = FileNameIndexUtils
+        .getDoneFileName(jobIndexInfo);
+
+    Path historyFilePath = new Path(jobhistoryDir, jobhistoryFileName);
+    FSDataInputStream in = null;
+    FileContext fc = null;
+    try {
+      fc = FileContext.getFileContext(conf);
+      in = fc.open(fc.makeQualified(historyFilePath));
+    } catch (IOException ioe) {
+      LOG.info("Can not open history file: " + historyFilePath, ioe);
+      throw (new Exception("Can not open History File"));
+    }
+
+    JobHistoryParser parser = new JobHistoryParser(in);
+    JobInfo jobInfo = parser.parse();
+    Exception parseException = parser.getParseException();
+    Assert.assertNull("Caught an expected exception " + parseException,
+        parseException);
+    for (Map.Entry<TaskID,TaskInfo> entry : jobInfo.getAllTasks().entrySet()) {
+      TaskId yarnTaskID = TypeConverter.toYarn(entry.getKey());
+      CompletedTask ct = new CompletedTask(yarnTaskID, entry.getValue());
+      Assert.assertNotNull("completed task report has null counters",
+          ct.getReport().getCounters());
+    }
+    } finally {
+      LOG.info("FINISHED testCountersForFailedTask");
+    }
+  }
+
+  @Test
+  public void testScanningOldDirs() throws Exception {
+    LOG.info("STARTING testScanningOldDirs");
+    try {
+    Configuration conf = new Configuration();
+    conf
+        .setClass(
+            CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
+            MyResolver.class, DNSToSwitchMapping.class);
+    RackResolver.init(conf);
+    MRApp app =
+        new MRAppWithHistory(1, 1, true,
+            this.getClass().getName(), true);
+    app.submit(conf);
+    Job job = app.getContext().getAllJobs().values().iterator().next();
+    JobId jobId = job.getID();
+    LOG.info("JOBID is " + TypeConverter.fromYarn(jobId).toString());
+    app.waitForState(job, JobState.SUCCEEDED);
+
+    // make sure all events are flushed
+    app.waitForState(Service.STATE.STOPPED);
+
+    HistoryFileManagerForTest hfm = new HistoryFileManagerForTest();
+    hfm.init(conf);
+    HistoryFileInfo fileInfo = hfm.getFileInfo(jobId);
+    Assert.assertNotNull("Unable to locate job history", fileInfo);
+
+    // force the manager to "forget" the job
+    hfm.deleteJobFromJobListCache(fileInfo);
+    final int msecPerSleep = 10;
+    int msecToSleep = 10 * 1000;
+    while (fileInfo.isMovePending() && msecToSleep > 0) {
+      Assert.assertTrue(!fileInfo.didMoveFail());
+      msecToSleep -= msecPerSleep;
+      Thread.sleep(msecPerSleep);
+    }
+    Assert.assertTrue("Timeout waiting for history move", msecToSleep > 0);
+
+    fileInfo = hfm.getFileInfo(jobId);
+    Assert.assertNotNull("Unable to locate old job history", fileInfo);
+   } finally {
+      LOG.info("FINISHED testScanningOldDirs");
+    }
+  }
+
   static class MRAppWithHistoryWithFailedAttempt extends MRAppWithHistory {
 
     public MRAppWithHistoryWithFailedAttempt(int maps, int reduces, boolean autoComplete,
@@ -415,6 +526,32 @@ public class TestJobHistoryParsing {
     }
   }
 
+  static class MRAppWithHistoryWithFailedTask extends MRAppWithHistory {
+
+    public MRAppWithHistoryWithFailedTask(int maps, int reduces, boolean autoComplete,
+        String testName, boolean cleanOnStart) {
+      super(maps, reduces, autoComplete, testName, cleanOnStart);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    protected void attemptLaunched(TaskAttemptId attemptID) {
+      if (attemptID.getTaskId().getId() == 0) {
+        getContext().getEventHandler().handle(
+            new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_FAILMSG));
+      } else {
+        getContext().getEventHandler().handle(
+            new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_DONE));
+      }
+    }
+  }
+
+  static class HistoryFileManagerForTest extends HistoryFileManager {
+    void deleteJobFromJobListCache(HistoryFileInfo fileInfo) {
+      jobListCache.delete(fileInfo);
+    }
+  }
+
   public static void main(String[] args) throws Exception {
     TestJobHistoryParsing t = new TestJobHistoryParsing();
     t.testHistoryParsing();

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHSWebApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHSWebApp.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHSWebApp.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHSWebApp.java Fri Oct 19 02:25:55 2012
@@ -24,9 +24,11 @@ import static org.apache.hadoop.mapreduc
 import static org.apache.hadoop.mapreduce.v2.app.webapp.AMParams.TASK_TYPE;
 import static org.apache.hadoop.yarn.webapp.YarnWebParams.APP_OWNER;
 import static org.apache.hadoop.yarn.webapp.YarnWebParams.CONTAINER_ID;
+import static org.apache.hadoop.yarn.webapp.YarnWebParams.CONTAINER_LOG_TYPE;
 import static org.apache.hadoop.yarn.webapp.YarnWebParams.ENTITY_STRING;
 import static org.apache.hadoop.yarn.webapp.YarnWebParams.NM_NODENAME;
 import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.verify;
 
 import java.io.IOException;
 import java.io.PrintWriter;
@@ -35,6 +37,7 @@ import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.MockJobs;
@@ -44,13 +47,14 @@ import org.apache.hadoop.yarn.Clock;
 import org.apache.hadoop.yarn.ClusterInfo;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.util.BuilderUtils;
 import org.apache.hadoop.yarn.webapp.log.AggregatedLogsPage;
 import org.apache.hadoop.yarn.webapp.test.WebAppTests;
 import org.junit.Test;
 
-import static org.mockito.Mockito.verify;
+import com.google.inject.AbstractModule;
 import com.google.inject.Injector;
 
 public class TestHSWebApp {
@@ -251,6 +255,64 @@ public class TestHSWebApp {
         "Aggregation is not enabled. Try the nodemanager at "
             + MockJobs.NM_HOST + ":" + MockJobs.NM_PORT);
   }
+
+  @Test
+  public void testLogsViewSingle() throws IOException {
+    LOG.info("HsLogsPage with params for single log and data limits");
+    TestAppContext ctx = new TestAppContext();
+    Map<String, String> params = new HashMap<String, String>();
+
+    final Configuration conf = new YarnConfiguration();
+    conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
+
+    params.put("start", "-2048");
+    params.put("end", "-1024");
+    params.put(CONTAINER_LOG_TYPE, "syslog");
+    params.put(CONTAINER_ID, BuilderUtils.newContainerId(1, 1, 333, 1)
+        .toString());
+    params.put(NM_NODENAME,
+        BuilderUtils.newNodeId(MockJobs.NM_HOST, MockJobs.NM_PORT).toString());
+    params.put(ENTITY_STRING, "container_10_0001_01_000001");
+    params.put(APP_OWNER, "owner");
+
+    Injector injector =
+        WebAppTests.testPage(AggregatedLogsPage.class, AppContext.class, ctx,
+            params, new AbstractModule() {
+          @Override
+          protected void configure() {
+            bind(Configuration.class).toInstance(conf);
+          }
+        });
+    PrintWriter spyPw = WebAppTests.getPrintWriter(injector);
+    verify(spyPw).write(
+        "Logs not available for container_10_0001_01_000001."
+            + " Aggregation may not be complete, "
+            + "Check back later or try the nodemanager at "
+            + MockJobs.NM_HOST + ":" + MockJobs.NM_PORT);
+  }
+
+  @Test
+  public void testLogsViewBadStartEnd() throws IOException {
+    LOG.info("HsLogsPage with bad start/end params");
+    TestAppContext ctx = new TestAppContext();
+    Map<String, String> params = new HashMap<String, String>();
+
+    params.put("start", "foo");
+    params.put("end", "bar");
+    params.put(CONTAINER_ID, BuilderUtils.newContainerId(1, 1, 333, 1)
+        .toString());
+    params.put(NM_NODENAME,
+        BuilderUtils.newNodeId(MockJobs.NM_HOST, MockJobs.NM_PORT).toString());
+    params.put(ENTITY_STRING, "container_10_0001_01_000001");
+    params.put(APP_OWNER, "owner");
+
+    Injector injector =
+        WebAppTests.testPage(AggregatedLogsPage.class, AppContext.class, ctx,
+            params);
+    PrintWriter spyPw = WebAppTests.getPrintWriter(injector);
+    verify(spyPw).write("Invalid log start value: foo");
+    verify(spyPw).write("Invalid log end value: bar");
+  }
 }
   
  

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesJobsQuery.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesJobsQuery.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesJobsQuery.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesJobsQuery.java Fri Oct 19 02:25:55 2012
@@ -25,6 +25,7 @@ import static org.mockito.Mockito.when;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.Map;
 
@@ -217,9 +218,23 @@ public class TestHsWebServicesJobsQuery 
   @Test
   public void testJobsQueryStateNone() throws JSONException, Exception {
     WebResource r = resource();
+
+     ArrayList<JobState> JOB_STATES = 
+         new ArrayList<JobState>(Arrays.asList(JobState.values()));
+
+      // find a state that isn't in use
+      Map<JobId, Job> jobsMap = appContext.getAllJobs();
+      for (Map.Entry<JobId, Job> entry : jobsMap.entrySet()) {
+        JOB_STATES.remove(entry.getValue().getState());
+      }
+
+    assertTrue("No unused job states", JOB_STATES.size() > 0);
+    JobState notInUse = JOB_STATES.get(0);
+
     ClientResponse response = r.path("ws").path("v1").path("history")
-        .path("mapreduce").path("jobs").queryParam("state", JobState.KILL_WAIT.toString())
+        .path("mapreduce").path("jobs").queryParam("state", notInUse.toString())
         .accept(MediaType.APPLICATION_JSON).get(ClientResponse.class);
+
     assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
     JSONObject json = response.getEntity(JSONObject.class);
     assertEquals("incorrect number of elements", 1, json.length());

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/resources/job_1329348432655_0001_conf.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/resources/job_1329348432655_0001_conf.xml?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/resources/job_1329348432655_0001_conf.xml (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/resources/job_1329348432655_0001_conf.xml Fri Oct 19 02:25:55 2012
@@ -102,7 +102,7 @@
 <property><!--Loaded from job.xml--><name>dfs.permissions.enabled</name><value>true</value></property>
 <property><!--Loaded from job.xml--><name>mapreduce.tasktracker.taskcontroller</name><value>org.apache.hadoop.mapred.DefaultTaskController</value></property>
 <property><!--Loaded from job.xml--><name>mapreduce.reduce.shuffle.parallelcopies</name><value>5</value></property>
-<property><!--Loaded from job.xml--><name>yarn.nodemanager.env-whitelist</name><value>JAVA_HOME,HADOOP_COMMON_HOME,HADOOP_HDFS_HOME,HADOOP_CONF_DIR,YARN_HOME</value></property>
+<property><!--Loaded from job.xml--><name>yarn.nodemanager.env-whitelist</name><value>JAVA_HOME,HADOOP_COMMON_HOME,HADOOP_HDFS_HOME,HADOOP_CONF_DIR,HADOOP_YARN_HOME</value></property>
 <property><!--Loaded from job.xml--><name>mapreduce.jobtracker.heartbeats.in.second</name><value>100</value></property>
 <property><!--Loaded from job.xml--><name>mapreduce.job.maxtaskfailures.per.tracker</name><value>4</value></property>
 <property><!--Loaded from job.xml--><name>ipc.client.connection.maxidletime</name><value>10000</value></property>
@@ -317,8 +317,8 @@
         $HADOOP_COMMON_HOME/share/hadoop/common/lib/*,
         $HADOOP_HDFS_HOME/share/hadoop/hdfs/*,
         $HADOOP_HDFS_HOME/share/hadoop/hdfs/lib/*,
-        $YARN_HOME/share/hadoop/mapreduce/*,
-        $YARN_HOME/share/hadoop/mapreduce/lib/*
+        $HADOOP_YARN_HOME/share/hadoop/mapreduce/*,
+        $HADOOP_YARN_HOME/share/hadoop/mapreduce/lib/*
      </value></property>
 <property><!--Loaded from job.xml--><name>yarn.nodemanager.log-aggregation.compression-type</name><value>gz</value></property>
 <property><!--Loaded from job.xml--><name>dfs.image.compress</name><value>false</value></property>

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/pom.xml?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/pom.xml (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/pom.xml Fri Oct 19 02:25:55 2012
@@ -94,11 +94,27 @@
     <dependency>
       <groupId>org.hsqldb</groupId>
       <artifactId>hsqldb</artifactId>
-      <version>2.0.0</version>
       <scope>test</scope>
     </dependency>
   </dependencies>
 
+ <profiles>
+  <profile>
+    <id>clover</id>
+    <activation>
+      <activeByDefault>false</activeByDefault>
+      <property>
+        <name>clover</name>
+      </property>
+    </activation>
+    <dependencies>
+      <dependency>
+        <groupId>com.cenqua.clover</groupId>
+        <artifactId>clover</artifactId>
+      </dependency>
+    </dependencies>
+  </profile>
+</profiles>
   <build>
     <plugins>
       <plugin>

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java Fri Oct 19 02:25:55 2012
@@ -32,6 +32,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.http.HttpConfig;
 import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.JobStatus;
 import org.apache.hadoop.mapreduce.MRJobConfig;
@@ -232,8 +233,9 @@ public class ClientServiceDelegate {
     if (user == null) {
       throw RPCUtil.getRemoteException("User is not set in the application report");
     }
-    if (application.getYarnApplicationState() == YarnApplicationState.NEW ||
-        application.getYarnApplicationState() == YarnApplicationState.SUBMITTED) {
+    if (application.getYarnApplicationState() == YarnApplicationState.NEW
+        || application.getYarnApplicationState() == YarnApplicationState.SUBMITTED
+        || application.getYarnApplicationState() == YarnApplicationState.ACCEPTED) {
       realProxy = null;
       return getNotRunningJob(application, JobState.NEW);
     }
@@ -392,7 +394,7 @@ public class ClientServiceDelegate {
       String url = StringUtils.isNotEmpty(historyTrackingUrl)
           ? historyTrackingUrl : trackingUrl;
       if (!UNAVAILABLE.equals(url)) {
-        url = "http://" + url;
+        url = HttpConfig.getSchemePrefix() + url;
       }
       jobStatus = TypeConverter.fromYarn(report, url);
     }

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/NotRunningJob.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/NotRunningJob.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/NotRunningJob.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/NotRunningJob.java Fri Oct 19 02:25:55 2012
@@ -56,6 +56,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskReport;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
@@ -75,13 +76,17 @@ public class NotRunningJob implements MR
 
 
   private ApplicationReport getUnknownApplicationReport() {
-    ApplicationId unknownAppId = recordFactory.newRecordInstance(ApplicationId.class);
+    ApplicationId unknownAppId = recordFactory
+        .newRecordInstance(ApplicationId.class);
+    ApplicationAttemptId unknownAttemptId = recordFactory
+        .newRecordInstance(ApplicationAttemptId.class);
 
-    // Setting AppState to NEW and finalStatus to UNDEFINED as they are never used 
+    // Setting AppState to NEW and finalStatus to UNDEFINED as they are never
+    // used
     // for a non running job
-    return BuilderUtils.newApplicationReport(unknownAppId, "N/A", "N/A", "N/A", "N/A", 0, "", 
-        YarnApplicationState.NEW, "N/A", "N/A", 0, 0, 
-        FinalApplicationStatus.UNDEFINED, null, "N/A");    
+    return BuilderUtils.newApplicationReport(unknownAppId, unknownAttemptId,
+        "N/A", "N/A", "N/A", "N/A", 0, "", YarnApplicationState.NEW, "N/A",
+        "N/A", 0, 0, FinalApplicationStatus.UNDEFINED, null, "N/A");
   }
 
   NotRunningJob(ApplicationReport applicationReport, JobState jobState) {

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java Fri Oct 19 02:25:55 2012
@@ -19,9 +19,6 @@
 package org.apache.hadoop.mapred;
 
 import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -41,75 +38,29 @@ import org.apache.hadoop.mapreduce.v2.ut
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.api.ClientRMProtocol;
-import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationReport;
-import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
-import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
-import org.apache.hadoop.yarn.api.records.DelegationToken;
 import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.util.ProtoUtils;
+import org.apache.hadoop.yarn.client.YarnClientImpl;
 
-
-// TODO: This should be part of something like yarn-client.
-public class ResourceMgrDelegate {
+public class ResourceMgrDelegate extends YarnClientImpl {
   private static final Log LOG = LogFactory.getLog(ResourceMgrDelegate.class);
       
-  private final InetSocketAddress rmAddress;
   private YarnConfiguration conf;
-  ClientRMProtocol applicationsManager;
+  private GetNewApplicationResponse application;
   private ApplicationId applicationId;
-  private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
 
   /**
    * Delegate responsible for communicating with the Resource Manager's {@link ClientRMProtocol}.
    * @param conf the configuration object.
    */
   public ResourceMgrDelegate(YarnConfiguration conf) {
+    super();
     this.conf = conf;
-    YarnRPC rpc = YarnRPC.create(this.conf);
-    this.rmAddress = getRmAddress(conf);
-    LOG.debug("Connecting to ResourceManager at " + rmAddress);
-    applicationsManager =
-        (ClientRMProtocol) rpc.getProxy(ClientRMProtocol.class,
-            rmAddress, this.conf);
-    LOG.debug("Connected to ResourceManager at " + rmAddress);
-  }
-  
-  /**
-   * Used for injecting applicationsManager, mostly for testing.
-   * @param conf the configuration object
-   * @param applicationsManager the handle to talk the resource managers 
-   *                            {@link ClientRMProtocol}.
-   */
-  public ResourceMgrDelegate(YarnConfiguration conf, 
-      ClientRMProtocol applicationsManager) {
-    this.conf = conf;
-    this.applicationsManager = applicationsManager;
-    this.rmAddress = getRmAddress(conf);
-  }
-  
-  private static InetSocketAddress getRmAddress(YarnConfiguration conf) {
-    return conf.getSocketAddr(YarnConfiguration.RM_ADDRESS,
-                              YarnConfiguration.DEFAULT_RM_ADDRESS,
-                              YarnConfiguration.DEFAULT_RM_PORT);
+    init(conf);
+    start();
   }
   
   public void cancelDelegationToken(Token<DelegationTokenIdentifier> arg0)
@@ -117,26 +68,15 @@ public class ResourceMgrDelegate {
     return;
   }
 
-
   public TaskTrackerInfo[] getActiveTrackers() throws IOException,
       InterruptedException {
-    GetClusterNodesRequest request = 
-      recordFactory.newRecordInstance(GetClusterNodesRequest.class);
-    GetClusterNodesResponse response = 
-      applicationsManager.getClusterNodes(request);
-    return TypeConverter.fromYarnNodes(response.getNodeReports());
+    return TypeConverter.fromYarnNodes(super.getNodeReports());
   }
 
-
   public JobStatus[] getAllJobs() throws IOException, InterruptedException {
-    GetAllApplicationsRequest request =
-      recordFactory.newRecordInstance(GetAllApplicationsRequest.class);
-    GetAllApplicationsResponse response = 
-      applicationsManager.getAllApplications(request);
-    return TypeConverter.fromYarnApps(response.getApplicationList(), this.conf);
+    return TypeConverter.fromYarnApps(super.getApplicationList(), this.conf);
   }
 
-
   public TaskTrackerInfo[] getBlacklistedTrackers() throws IOException,
       InterruptedException {
     // TODO: Implement getBlacklistedTrackers
@@ -144,128 +84,56 @@ public class ResourceMgrDelegate {
     return new TaskTrackerInfo[0];
   }
 
-
   public ClusterMetrics getClusterMetrics() throws IOException,
       InterruptedException {
-    GetClusterMetricsRequest request = recordFactory.newRecordInstance(GetClusterMetricsRequest.class);
-    GetClusterMetricsResponse response = applicationsManager.getClusterMetrics(request);
-    YarnClusterMetrics metrics = response.getClusterMetrics();
+    YarnClusterMetrics metrics = super.getYarnClusterMetrics();
     ClusterMetrics oldMetrics = new ClusterMetrics(1, 1, 1, 1, 1, 1, 
         metrics.getNumNodeManagers() * 10, metrics.getNumNodeManagers() * 2, 1,
         metrics.getNumNodeManagers(), 0, 0);
     return oldMetrics;
   }
 
-
   @SuppressWarnings("rawtypes")
-  public Token getDelegationToken(Text renewer)
-      throws IOException, InterruptedException {
-    /* get the token from RM */
-    org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest 
-    rmDTRequest = recordFactory.newRecordInstance(
-        org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest.class);
-    rmDTRequest.setRenewer(renewer.toString());
-    org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse 
-      response = applicationsManager.getDelegationToken(rmDTRequest);
-    DelegationToken yarnToken = response.getRMDelegationToken();
-    return ProtoUtils.convertFromProtoFormat(yarnToken, rmAddress);
+  public Token getDelegationToken(Text renewer) throws IOException,
+      InterruptedException {
+    return ProtoUtils.convertFromProtoFormat(
+      super.getRMDelegationToken(renewer), rmAddress);
   }
 
-
   public String getFilesystemName() throws IOException, InterruptedException {
     return FileSystem.get(conf).getUri().toString();
   }
 
   public JobID getNewJobID() throws IOException, InterruptedException {
-    GetNewApplicationRequest request = recordFactory.newRecordInstance(GetNewApplicationRequest.class);
-    applicationId = applicationsManager.getNewApplication(request).getApplicationId();
+    this.application = super.getNewApplication();
+    this.applicationId = this.application.getApplicationId();
     return TypeConverter.fromYarn(applicationId);
   }
 
-  private static final String ROOT = "root";
-
-  private GetQueueInfoRequest getQueueInfoRequest(String queueName, 
-      boolean includeApplications, boolean includeChildQueues, boolean recursive) {
-    GetQueueInfoRequest request = 
-      recordFactory.newRecordInstance(GetQueueInfoRequest.class);
-    request.setQueueName(queueName);
-    request.setIncludeApplications(includeApplications);
-    request.setIncludeChildQueues(includeChildQueues);
-    request.setRecursive(recursive);
-    return request;
-    
-  }
-  
   public QueueInfo getQueue(String queueName) throws IOException,
   InterruptedException {
-    GetQueueInfoRequest request = 
-      getQueueInfoRequest(queueName, true, false, false); 
-      recordFactory.newRecordInstance(GetQueueInfoRequest.class);
     return TypeConverter.fromYarn(
-        applicationsManager.getQueueInfo(request).getQueueInfo(), this.conf);
+        super.getQueueInfo(queueName), this.conf);
   }
-  
-  private void getChildQueues(org.apache.hadoop.yarn.api.records.QueueInfo parent, 
-      List<org.apache.hadoop.yarn.api.records.QueueInfo> queues,
-      boolean recursive) {
-    List<org.apache.hadoop.yarn.api.records.QueueInfo> childQueues = 
-      parent.getChildQueues();
-
-    for (org.apache.hadoop.yarn.api.records.QueueInfo child : childQueues) {
-      queues.add(child);
-      if(recursive) {
-        getChildQueues(child, queues, recursive);
-      }
-    }
-  }
-
 
   public QueueAclsInfo[] getQueueAclsForCurrentUser() throws IOException,
       InterruptedException {
-    GetQueueUserAclsInfoRequest request = 
-      recordFactory.newRecordInstance(GetQueueUserAclsInfoRequest.class);
-    List<QueueUserACLInfo> userAcls = 
-      applicationsManager.getQueueUserAcls(request).getUserAclsInfoList();
-    return TypeConverter.fromYarnQueueUserAclsInfo(userAcls);
+    return TypeConverter.fromYarnQueueUserAclsInfo(super
+      .getQueueAclsInfo());
   }
 
-
   public QueueInfo[] getQueues() throws IOException, InterruptedException {
-    List<org.apache.hadoop.yarn.api.records.QueueInfo> queues = 
-      new ArrayList<org.apache.hadoop.yarn.api.records.QueueInfo>();
-
-    org.apache.hadoop.yarn.api.records.QueueInfo rootQueue = 
-      applicationsManager.getQueueInfo(
-          getQueueInfoRequest(ROOT, false, true, true)).getQueueInfo();
-    getChildQueues(rootQueue, queues, true);
-
-    return TypeConverter.fromYarnQueueInfo(queues, this.conf);
+    return TypeConverter.fromYarnQueueInfo(super.getAllQueues(), this.conf);
   }
 
-
   public QueueInfo[] getRootQueues() throws IOException, InterruptedException {
-    List<org.apache.hadoop.yarn.api.records.QueueInfo> queues = 
-      new ArrayList<org.apache.hadoop.yarn.api.records.QueueInfo>();
-
-    org.apache.hadoop.yarn.api.records.QueueInfo rootQueue = 
-      applicationsManager.getQueueInfo(
-          getQueueInfoRequest(ROOT, false, true, true)).getQueueInfo();
-    getChildQueues(rootQueue, queues, false);
-
-    return TypeConverter.fromYarnQueueInfo(queues, this.conf);
+    return TypeConverter.fromYarnQueueInfo(super.getRootQueueInfos(), this.conf);
   }
 
   public QueueInfo[] getChildQueues(String parent) throws IOException,
       InterruptedException {
-      List<org.apache.hadoop.yarn.api.records.QueueInfo> queues = 
-          new ArrayList<org.apache.hadoop.yarn.api.records.QueueInfo>();
-        
-        org.apache.hadoop.yarn.api.records.QueueInfo parentQueue = 
-          applicationsManager.getQueueInfo(
-              getQueueInfoRequest(parent, false, true, false)).getQueueInfo();
-        getChildQueues(parentQueue, queues, true);
-        
-        return TypeConverter.fromYarnQueueInfo(queues, this.conf);
+    return TypeConverter.fromYarnQueueInfo(super.getChildQueueInfos(parent),
+      this.conf);
   }
 
   public String getStagingAreaDir() throws IOException, InterruptedException {
@@ -307,40 +175,6 @@ public class ResourceMgrDelegate {
     return 0;
   }
   
-  
-  public ApplicationId submitApplication(
-      ApplicationSubmissionContext appContext) 
-  throws IOException {
-    appContext.setApplicationId(applicationId);
-    SubmitApplicationRequest request = 
-        recordFactory.newRecordInstance(SubmitApplicationRequest.class);
-    request.setApplicationSubmissionContext(appContext);
-    applicationsManager.submitApplication(request);
-    LOG.info("Submitted application " + applicationId + " to ResourceManager" +
-    		" at " + rmAddress);
-    return applicationId;
-  }
-  
-  public void killApplication(ApplicationId applicationId) throws IOException {
-    KillApplicationRequest request = 
-        recordFactory.newRecordInstance(KillApplicationRequest.class);
-    request.setApplicationId(applicationId);
-    applicationsManager.forceKillApplication(request);
-    LOG.info("Killing application " + applicationId);
-  }
-
-
-  public ApplicationReport getApplicationReport(ApplicationId appId)
-      throws YarnRemoteException {
-    GetApplicationReportRequest request = recordFactory
-        .newRecordInstance(GetApplicationReportRequest.class);
-    request.setApplicationId(appId);
-    GetApplicationReportResponse response = applicationsManager
-        .getApplicationReport(request);
-    ApplicationReport applicationReport = response.getApplicationReport();
-    return applicationReport;
-  }
-
   public ApplicationId getApplicationId() {
     return applicationId;
   }

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java Fri Oct 19 02:25:55 2012
@@ -89,7 +89,7 @@ import org.apache.hadoop.yarn.util.Proto
 /**
  * This class enables the current JobClient (0.22 hadoop) to run on YARN.
  */
-@SuppressWarnings({ "rawtypes", "unchecked", "deprecation" })
+@SuppressWarnings({ "rawtypes", "unchecked" })
 public class YARNRunner implements ClientProtocol {
 
   private static final Log LOG = LogFactory.getLog(YARNRunner.class);
@@ -304,7 +304,7 @@ public class YARNRunner implements Clien
     return clientCache.getClient(jobId).getJobStatus(jobId);
   }
 
-  private LocalResource createApplicationResource(FileContext fs, Path p)
+  private LocalResource createApplicationResource(FileContext fs, Path p, LocalResourceType type)
       throws IOException {
     LocalResource rsrc = recordFactory.newRecordInstance(LocalResource.class);
     FileStatus rsrcStat = fs.getFileStatus(p);
@@ -312,7 +312,7 @@ public class YARNRunner implements Clien
         .getDefaultFileSystem().resolvePath(rsrcStat.getPath())));
     rsrc.setSize(rsrcStat.getLen());
     rsrc.setTimestamp(rsrcStat.getModificationTime());
-    rsrc.setType(LocalResourceType.FILE);
+    rsrc.setType(type);
     rsrc.setVisibility(LocalResourceVisibility.APPLICATION);
     return rsrc;
   }
@@ -343,11 +343,16 @@ public class YARNRunner implements Clien
 
     localResources.put(MRJobConfig.JOB_CONF_FILE,
         createApplicationResource(defaultFileContext,
-            jobConfPath));
+            jobConfPath, LocalResourceType.FILE));
     if (jobConf.get(MRJobConfig.JAR) != null) {
-      localResources.put(MRJobConfig.JOB_JAR,
-          createApplicationResource(defaultFileContext,
-              new Path(jobSubmitDir, MRJobConfig.JOB_JAR)));
+      Path jobJarPath = new Path(jobConf.get(MRJobConfig.JAR));
+      LocalResource rc = createApplicationResource(defaultFileContext,
+          jobJarPath, 
+          LocalResourceType.PATTERN);
+      String pattern = conf.getPattern(JobContext.JAR_UNPACK_PATTERN, 
+          JobConf.UNPACK_JAR_PATTERN_DEFAULT).pattern();
+      rc.setPattern(pattern);
+      localResources.put(MRJobConfig.JOB_JAR, rc);
     } else {
       // Job jar may be null. For e.g, for pipes, the job jar is the hadoop
       // mapreduce jar itself which is already on the classpath.
@@ -363,16 +368,13 @@ public class YARNRunner implements Clien
       localResources.put(
           MRJobConfig.JOB_SUBMIT_DIR + "/" + s,
           createApplicationResource(defaultFileContext,
-              new Path(jobSubmitDir, s)));
+              new Path(jobSubmitDir, s), LocalResourceType.FILE));
     }
 
     // Setup security tokens
-    ByteBuffer securityTokens = null;
-    if (UserGroupInformation.isSecurityEnabled()) {
-      DataOutputBuffer dob = new DataOutputBuffer();
-      ts.writeTokenStorageToStream(dob);
-      securityTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
-    }
+    DataOutputBuffer dob = new DataOutputBuffer();
+    ts.writeTokenStorageToStream(dob);
+    ByteBuffer securityTokens  = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
 
     // Setup the command to run the AM
     List<String> vargs = new ArrayList<String>(8);

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/conf/TestJobConf.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/conf/TestJobConf.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/conf/TestJobConf.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/conf/TestJobConf.java Fri Oct 19 02:25:55 2012
@@ -21,6 +21,7 @@ import org.junit.Assert;
 import org.junit.Test;
 
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContext;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 
 public class TestJobConf {
@@ -185,4 +186,19 @@ public class TestJobConf {
     
     
   }
+
+  /**
+   * Ensure that by default JobContext.MAX_TASK_FAILURES_PER_TRACKER is less
+   * JobContext.MAP_MAX_ATTEMPTS and JobContext.REDUCE_MAX_ATTEMPTS so that
+   * failed tasks will be retried on other nodes
+   */
+  @Test
+  public void testMaxTaskFailuresPerTracker() {
+    JobConf jobConf = new JobConf(true);
+    Assert.assertTrue("By default JobContext.MAX_TASK_FAILURES_PER_TRACKER was "
+      + "not less than JobContext.MAP_MAX_ATTEMPTS and REDUCE_MAX_ATTEMPTS"
+      ,jobConf.getMaxTaskFailuresPerTracker() < jobConf.getMaxMapAttempts() &&
+      jobConf.getMaxTaskFailuresPerTracker() < jobConf.getMaxReduceAttempts()
+      );
+  }
 }

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/IOMapperBase.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/IOMapperBase.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/IOMapperBase.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/IOMapperBase.java Fri Oct 19 02:25:55 2012
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.fs;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.net.InetAddress;
 import org.apache.hadoop.conf.Configured;
@@ -33,7 +34,6 @@ import org.apache.hadoop.mapred.*;
  * statistics data to be collected by subsequent reducers.
  * 
  */
-@SuppressWarnings("deprecation")
 public abstract class IOMapperBase<T> extends Configured
     implements Mapper<Text, LongWritable, Text, Text> {
   
@@ -41,6 +41,7 @@ public abstract class IOMapperBase<T> ex
   protected int bufferSize;
   protected FileSystem fs;
   protected String hostName;
+  protected Closeable stream;
 
   public IOMapperBase() { 
   }
@@ -79,6 +80,18 @@ public abstract class IOMapperBase<T> ex
                        long value) throws IOException;
 
   /**
+   * Create an input or output stream based on the specified file.
+   * Subclasses should override this method to provide an actual stream.
+   * 
+   * @param name file name
+   * @return the stream
+   * @throws IOException
+   */
+  public Closeable getIOStream(String name) throws IOException {
+    return null;
+  }
+
+  /**
    * Collect stat data to be combined by a subsequent reducer.
    * 
    * @param output
@@ -113,9 +126,15 @@ public abstract class IOMapperBase<T> ex
     long longValue = value.get();
     
     reporter.setStatus("starting " + name + " ::host = " + hostName);
-    
+
+    this.stream = getIOStream(name);
+    T statValue = null;
     long tStart = System.currentTimeMillis();
-    T statValue = doIO(reporter, name, longValue);
+    try {
+      statValue = doIO(reporter, name, longValue);
+    } finally {
+      if(stream != null) stream.close();
+    }
     long tEnd = System.currentTimeMillis();
     long execTime = tEnd - tStart;
     collectStats(output, name, execTime, statValue);



Mime
View raw message