hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rkan...@apache.org
Subject hadoop git commit: MAPREDUCE-6652. Add configuration property to prevent JHS from loading jobs with a task count greater than X (haibochen via rkanter)
Date Fri, 15 Jul 2016 20:37:02 GMT
Repository: hadoop
Updated Branches:
  refs/heads/branch-2 617149837 -> 233ffaf3f


MAPREDUCE-6652. Add configuration property to prevent JHS from loading jobs with a task count
greater than X (haibochen via rkanter)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/233ffaf3
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/233ffaf3
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/233ffaf3

Branch: refs/heads/branch-2
Commit: 233ffaf3f7b6dae2e26e6ba3220bd3eba9caed84
Parents: 6171498
Author: Robert Kanter <rkanter@apache.org>
Authored: Fri Jul 15 13:37:04 2016 -0700
Committer: Robert Kanter <rkanter@apache.org>
Committed: Fri Jul 15 13:37:04 2016 -0700

----------------------------------------------------------------------
 .../mapreduce/v2/jobhistory/JHAdminConfig.java  |   7 +
 .../src/main/resources/mapred-default.xml       |   9 +
 .../mapreduce/v2/hs/HistoryFileManager.java     |  35 ++-
 .../hadoop/mapreduce/v2/hs/UnparsedJob.java     | 211 +++++++++++++++
 .../mapreduce/v2/hs/webapp/HsJobBlock.java      |  16 +-
 .../mapreduce/v2/hs/TestHistoryFileManager.java |  98 ++++++-
 .../mapreduce/v2/hs/webapp/TestHsJobBlock.java  | 268 +++++++++++++++++++
 7 files changed, 634 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/233ffaf3/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java
index dd225e2..644727f 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java
@@ -253,4 +253,11 @@ public class JHAdminConfig {
       MR_HISTORY_PREFIX + "jhist.format";
   public static final String DEFAULT_MR_HS_JHIST_FORMAT =
       "json";
+
+  /**
+   * The maximum number of tasks for a job to be loaded in Job History Server.
+   */
+  public static final String MR_HS_LOADED_JOBS_TASKS_MAX =
+      MR_HISTORY_PREFIX + "loadedjob.tasks.max";
+  public static final int DEFAULT_MR_HS_LOADED_JOBS_TASKS_MAX = -1;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/233ffaf3/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
index fab5b25..a37d312 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
@@ -1889,4 +1889,13 @@
   <value>SAMEORIGIN</value>
 </property>
 
+<property>
+  <description>
+    The maximum number of tasks that a job can have so that the Job History
+    Server will fully parse its associated job history file and load it into
+    memory. A value of -1 (default) will allow all jobs to be loaded.
+  </description>
+  <name>mapreduce.jobhistory.loadedjob.tasks.max</name>
+  <value>-1</value>
+</property>
 </configuration>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/233ffaf3/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java
index 7bfb11c..814afe4 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java
@@ -458,15 +458,23 @@ public class HistoryFileManager extends AbstractService {
 
     /**
      * Parse a job from the JobHistoryFile, if the underlying file is not going
-     * to be deleted.
+     * to be deleted and the number of tasks associated with the job is not
+     * greater than maxTasksForLoadedJob.
      * 
-     * @return the Job or null if the underlying file was deleted.
+     * @return null if the underlying job history file was deleted, or
+     *         an {@link UnparsedJob} object representing a partially parsed job
+     *           if the job tasks exceeds the configured maximum, or
+     *         a {@link CompletedJob} representing a fully parsed job.
      * @throws IOException
-     *           if there is an error trying to read the file.
+     *           if there is an error trying to read the file if parsed.
      */
     public synchronized Job loadJob() throws IOException {
-      return new CompletedJob(conf, jobIndexInfo.getJobId(), historyFile,
-          false, jobIndexInfo.getUser(), this, aclsMgr);
+      if(isOversized()) {
+        return new UnparsedJob(maxTasksForLoadedJob, jobIndexInfo, this);
+      } else {
+        return new CompletedJob(conf, jobIndexInfo.getJobId(), historyFile,
+            false, jobIndexInfo.getUser(), this, aclsMgr);
+      }
     }
 
     /**
@@ -504,6 +512,12 @@ public class HistoryFileManager extends AbstractService {
       jobConf.addResource(fc.open(confFile), confFile.toString());
       return jobConf;
     }
+
+    private boolean isOversized() {
+      final int totalTasks = jobIndexInfo.getNumReduces() +
+          jobIndexInfo.getNumMaps();
+      return (maxTasksForLoadedJob > 0) && (totalTasks > maxTasksForLoadedJob);
+    }
   }
 
   private SerialNumberIndex serialNumberIndex = null;
@@ -536,7 +550,12 @@ public class HistoryFileManager extends AbstractService {
   @VisibleForTesting
   protected ThreadPoolExecutor moveToDoneExecutor = null;
   private long maxHistoryAge = 0;
-  
+
+  /**
+   * The maximum number of tasks allowed for a job to be loaded.
+   */
+  private int maxTasksForLoadedJob = -1;
+
   public HistoryFileManager() {
     super(HistoryFileManager.class.getName());
   }
@@ -555,6 +574,10 @@ public class HistoryFileManager extends AbstractService {
         JHAdminConfig.DEFAULT_MR_HISTORY_MAX_START_WAIT_TIME);
     createHistoryDirs(SystemClock.getInstance(), 10 * 1000, maxFSWaitTime);
 
+    maxTasksForLoadedJob = conf.getInt(
+        JHAdminConfig.MR_HS_LOADED_JOBS_TASKS_MAX,
+        JHAdminConfig.DEFAULT_MR_HS_LOADED_JOBS_TASKS_MAX);
+
     this.aclsMgr = new JobACLsManager(conf);
 
     maxHistoryAge = conf.getLong(JHAdminConfig.MR_HISTORY_MAX_AGE_MS,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/233ffaf3/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/UnparsedJob.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/UnparsedJob.java
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/UnparsedJob.java
new file mode 100644
index 0000000..cea336c
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/UnparsedJob.java
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.hs;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.TaskCompletionEvent;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.JobACL;
+import org.apache.hadoop.mapreduce.v2.api.records.*;
+import org.apache.hadoop.mapreduce.v2.app.job.Task;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.AccessControlList;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.util.Records;
+
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * A job that has too many tasks associated with it, of which we do not parse
+ * its job history file, to prevent the Job History Server from hanging on
+ * parsing the file. It is meant to be used only by JHS to indicate if the
+ * history file of a job is fully parsed or not.
+ */
+public class UnparsedJob implements org.apache.hadoop.mapreduce.v2.app.job.Job {
+  private final JobIndexInfo jobIndexInfo;
+  private final int maxTasksAllowed;
+  private JobReport jobReport;
+  private final HistoryFileManager.HistoryFileInfo jhfInfo;
+
+  public UnparsedJob(int maxTasksAllowed, JobIndexInfo jobIndexInfo,
+      HistoryFileManager.HistoryFileInfo jhfInfo) throws IOException {
+    this.jobIndexInfo = jobIndexInfo;
+    this.jhfInfo = jhfInfo;
+    this.maxTasksAllowed = maxTasksAllowed;
+  }
+
+  public int getMaxTasksAllowed() {
+    return maxTasksAllowed;
+  }
+
+  @Override
+  public JobId getID() {
+    return jobIndexInfo.getJobId();
+  }
+
+  @Override
+  public String getName() {
+    return jobIndexInfo.getJobName();
+  }
+
+  @Override
+  public JobState getState() {
+    return JobState.valueOf(jobIndexInfo.getJobStatus());
+  }
+
+  @Override
+  public synchronized JobReport getReport() {
+    if(jobReport == null) {
+      jobReport = constructJobReport();
+    }
+    return jobReport;
+  }
+
+  public JobReport constructJobReport() {
+    JobReport report = Records.newRecord(JobReport.class);
+    report.setJobId(getID());
+    report.setJobState(getState());
+    report.setSubmitTime(jobIndexInfo.getSubmitTime());
+    report.setStartTime(jobIndexInfo.getJobStartTime());
+    report.setFinishTime(jobIndexInfo.getFinishTime());
+    report.setJobName(jobIndexInfo.getJobName());
+    report.setUser(jobIndexInfo.getUser());
+    report.setJobFile(getConfFile().toString());
+    report.setHistoryFile(jhfInfo.getHistoryFile().toString());
+    return report;
+  }
+
+  @Override
+  public Counters getAllCounters() {
+    return new Counters();
+  }
+
+  @Override
+  public Map<TaskId, Task> getTasks() {
+    return new HashMap<>();
+  }
+
+  @Override
+  public Map<TaskId, Task> getTasks(TaskType taskType) {
+    return new HashMap<>();
+  }
+
+  @Override
+  public Task getTask(TaskId taskID) {
+    return null;
+  }
+
+  @Override
+  public List<String> getDiagnostics() {
+    return new ArrayList<>();
+  }
+
+  @Override
+  public int getTotalMaps() {
+    return jobIndexInfo.getNumMaps();
+  }
+
+  @Override
+  public int getTotalReduces() {
+    return jobIndexInfo.getNumReduces();
+  }
+
+  @Override
+  public int getCompletedMaps() {
+    return -1;
+  }
+
+  @Override
+  public int getCompletedReduces() {
+    return -1;
+  }
+
+  @Override
+  public float getProgress() {
+    return 1.0f;
+  }
+
+  @Override
+  public boolean isUber() {
+    return false;
+  }
+
+  @Override
+  public String getUserName() {
+    return jobIndexInfo.getUser();
+  }
+
+  @Override
+  public String getQueueName() {
+    return jobIndexInfo.getQueueName();
+  }
+
+  @Override
+  public Path getConfFile() {
+    return jhfInfo.getConfFile();
+  }
+
+  @Override
+  public Configuration loadConfFile() throws IOException {
+    return jhfInfo.loadConfFile();
+  }
+
+  @Override
+  public Map<JobACL, AccessControlList> getJobACLs() {
+    return new HashMap<>();
+  }
+
+  @Override
+  public TaskAttemptCompletionEvent[] getTaskAttemptCompletionEvents(
+      int fromEventId, int maxEvents) {
+    return new TaskAttemptCompletionEvent[0];
+  }
+
+  @Override
+  public TaskCompletionEvent[] getMapAttemptCompletionEvents(
+      int startIndex, int maxEvents) {
+    return new TaskCompletionEvent[0];
+  }
+
+  @Override
+  public List<AMInfo> getAMInfos() {
+    return new ArrayList<>();
+  }
+
+  @Override
+  public boolean checkAccess(UserGroupInformation callerUGI,
+      JobACL jobOperation) {
+    return true;
+  }
+
+  @Override
+  public void setQueueName(String queueName) {
+    throw new UnsupportedOperationException("Can't set job's " +
+        "queue name in history");
+  }
+
+  @Override
+  public void setJobPriority(Priority priority) {
+    throw new UnsupportedOperationException(
+        "Can't set job's priority in history");
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/233ffaf3/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsJobBlock.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsJobBlock.java
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsJobBlock.java
index 13774a8..0d5b03a 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsJobBlock.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsJobBlock.java
@@ -33,8 +33,10 @@ import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
 import org.apache.hadoop.mapreduce.v2.app.webapp.dao.ConfEntryInfo;
+import org.apache.hadoop.mapreduce.v2.hs.UnparsedJob;
 import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.AMAttemptInfo;
 import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobInfo;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
 import org.apache.hadoop.mapreduce.v2.util.MRApps;
 import org.apache.hadoop.mapreduce.v2.util.MRApps.TaskAttemptStateUI;
 import org.apache.hadoop.mapreduce.v2.util.MRWebAppUtil;
@@ -73,8 +75,18 @@ public class HsJobBlock extends HtmlBlock {
     JobId jobID = MRApps.toJobID(jid);
     Job j = appContext.getJob(jobID);
     if (j == null) {
-      html.
-        p()._("Sorry, ", jid, " not found.")._();
+      html.p()._("Sorry, ", jid, " not found.")._();
+      return;
+    }
+    if(j instanceof UnparsedJob) {
+      final int taskCount = j.getTotalMaps() + j.getTotalReduces();
+      UnparsedJob oversizedJob = (UnparsedJob) j;
+      html.p()._("The job has a total of " + taskCount + " tasks. ")
+          ._("Any job larger than " + oversizedJob.getMaxTasksAllowed() +
+              " will not be loaded.")._();
+      html.p()._("You can either use the CLI tool: 'mapred job -history'"
+          + " to view large jobs or adjust the property " +
+          JHAdminConfig.MR_HS_LOADED_JOBS_TASKS_MAX + ".")._();
       return;
     }
     List<AMInfo> amInfos = j.getAMInfos();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/233ffaf3/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestHistoryFileManager.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestHistoryFileManager.java
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestHistoryFileManager.java
index aa2e979..b7a3672 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestHistoryFileManager.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestHistoryFileManager.java
@@ -25,6 +25,7 @@ import java.io.FileNotFoundException;
 import java.util.UUID;
 import java.util.List;
 
+import org.apache.hadoop.mapreduce.v2.app.job.Job;
 import org.junit.Assert;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
@@ -37,11 +38,9 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.TypeConverter;
-import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.HistoryFileInfo;
 import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
 import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo;
-import org.apache.hadoop.test.CoreTestDriver;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.util.Clock;
@@ -86,6 +85,10 @@ public class TestHistoryFileManager {
   @After
   public void cleanTest() throws Exception {
     new File(coreSitePath).delete();
+    dfsCluster.getFileSystem().setSafeMode(
+        HdfsConstants.SafeModeAction.SAFEMODE_LEAVE);
+    dfsCluster2.getFileSystem().setSafeMode(
+        HdfsConstants.SafeModeAction.SAFEMODE_LEAVE);
   }
 
   private String getDoneDirNameForTest() {
@@ -247,6 +250,97 @@ public class TestHistoryFileManager {
     Assert.assertFalse(info.didMoveFail());
   }
 
+  @Test
+  public void testHistoryFileInfoLoadOversizedJobShouldReturnUnParsedJob()
+      throws Exception {
+    HistoryFileManagerTest hmTest = new HistoryFileManagerTest();
+
+    int allowedMaximumTasks = 5;
+    Configuration conf = dfsCluster.getConfiguration(0);
+    conf.setInt(JHAdminConfig.MR_HS_LOADED_JOBS_TASKS_MAX, allowedMaximumTasks);
+
+    hmTest.init(conf);
+
+    // set up a job of which the number of tasks is greater than maximum allowed
+    String jobId = "job_1410889000000_123456";
+    JobIndexInfo jobIndexInfo = new JobIndexInfo();
+    jobIndexInfo.setJobId(TypeConverter.toYarn(JobID.forName(jobId)));
+    jobIndexInfo.setNumMaps(allowedMaximumTasks);
+    jobIndexInfo.setNumReduces(allowedMaximumTasks);
+
+
+    HistoryFileInfo info = hmTest.getHistoryFileInfo(null, null, null,
+        jobIndexInfo, false);
+
+    Job job = info.loadJob();
+    Assert.assertTrue("Should return an instance of UnparsedJob to indicate" +
+        " the job history file is not parsed", job instanceof UnparsedJob);
+  }
+
+  @Test
+  public void testHistoryFileInfoLoadNormalSizedJobShouldReturnCompletedJob()
+      throws Exception {
+    HistoryFileManagerTest hmTest = new HistoryFileManagerTest();
+
+    final int numOfTasks = 100;
+    Configuration conf = dfsCluster.getConfiguration(0);
+    conf.setInt(JHAdminConfig.MR_HS_LOADED_JOBS_TASKS_MAX,
+        numOfTasks + numOfTasks + 1);
+
+    hmTest.init(conf);
+
+    // set up a job of which the number of tasks is smaller than the maximum
+    // allowed, and therefore will be fully loaded.
+    final String jobId = "job_1416424547277_0002";
+    JobIndexInfo jobIndexInfo = new JobIndexInfo();
+    jobIndexInfo.setJobId(TypeConverter.toYarn(JobID.forName(jobId)));
+    jobIndexInfo.setNumMaps(numOfTasks);
+    jobIndexInfo.setNumReduces(numOfTasks);
+
+
+    final String historyFile = getClass().getClassLoader().getResource(
+        "job_2.0.3-alpha-FAILED.jhist").getFile();
+    final Path historyFilePath = FileSystem.getLocal(conf).makeQualified(
+        new Path(historyFile));
+    HistoryFileInfo info = hmTest.getHistoryFileInfo(historyFilePath, null,
+        null, jobIndexInfo, false);
+
+    Job job = info.loadJob();
+    Assert.assertTrue("Should return an instance of CompletedJob as " +
+        "a result of parsing the job history file of the job",
+        job instanceof CompletedJob);
+  }
+
+  @Test
+  public void testHistoryFileInfoShouldReturnCompletedJobIfMaxNotConfiged()
+      throws Exception {
+    HistoryFileManagerTest hmTest = new HistoryFileManagerTest();
+
+    Configuration conf = dfsCluster.getConfiguration(0);
+    conf.setInt(JHAdminConfig.MR_HS_LOADED_JOBS_TASKS_MAX, -1);
+
+    hmTest.init(conf);
+
+    final String jobId = "job_1416424547277_0002";
+    JobIndexInfo jobIndexInfo = new JobIndexInfo();
+    jobIndexInfo.setJobId(TypeConverter.toYarn(JobID.forName(jobId)));
+    jobIndexInfo.setNumMaps(100);
+    jobIndexInfo.setNumReduces(100);
+
+    final String historyFile = getClass().getClassLoader().getResource(
+        "job_2.0.3-alpha-FAILED.jhist").getFile();
+    final Path historyFilePath = FileSystem.getLocal(conf).makeQualified(
+        new Path(historyFile));
+    HistoryFileInfo info = hmTest.getHistoryFileInfo(historyFilePath, null,
+        null, jobIndexInfo, false);
+
+    Job job = info.loadJob();
+    Assert.assertTrue("Should return an instance of CompletedJob as " +
+            "a result of parsing the job history file of the job",
+        job instanceof CompletedJob);
+
+  }
+
   static class HistoryFileManagerTest extends HistoryFileManager {
     public HistoryFileManagerTest() {
       super();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/233ffaf3/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsJobBlock.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsJobBlock.java
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsJobBlock.java
new file mode 100644
index 0000000..7fa238e
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsJobBlock.java
@@ -0,0 +1,268 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.hs.webapp;
+
+import org.apache.commons.io.output.ByteArrayOutputStream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.v2.api.records.*;
+import org.apache.hadoop.mapreduce.v2.api.records.impl.pb.JobIdPBImpl;
+import org.apache.hadoop.mapreduce.v2.app.job.Task;
+import org.apache.hadoop.mapreduce.v2.app.webapp.AMParams;
+import org.apache.hadoop.mapreduce.v2.hs.CompletedJob;
+import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager;
+import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.HistoryFileInfo;
+import org.apache.hadoop.mapreduce.v2.hs.JobHistory;
+import org.apache.hadoop.mapreduce.v2.hs.UnparsedJob;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.util.StringHelper;
+import org.apache.hadoop.yarn.webapp.ResponseInfo;
+import org.apache.hadoop.yarn.webapp.SubView;
+import org.apache.hadoop.yarn.webapp.view.BlockForTest;
+import org.apache.hadoop.yarn.webapp.view.HtmlBlock;
+import org.apache.hadoop.yarn.webapp.view.HtmlBlockForTest;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test the HsJobBlock generated for oversized jobs in JHS.
+ */
+public class TestHsJobBlock {
+
+  @Test
+  public void testHsJobBlockForOversizeJobShouldDisplayWarningMessage() {
+    int maxAllowedTaskNum = 100;
+
+    Configuration config = new Configuration();
+    config.setInt(JHAdminConfig.MR_HS_LOADED_JOBS_TASKS_MAX, maxAllowedTaskNum);
+
+    JobHistory jobHistory =
+        new JobHistoryStubWithAllOversizeJobs(maxAllowedTaskNum);
+    jobHistory.init(config);
+
+    HsJobBlock jobBlock = new HsJobBlock(jobHistory) {
+      // override this so that job block can fetch a job id.
+      @Override
+      public Map<String, String> moreParams() {
+        Map<String, String> map = new HashMap<>();
+        map.put(AMParams.JOB_ID, "job_0000_0001");
+        return map;
+      }
+    };
+
+    // set up the test block to render HsJobBLock to
+    OutputStream outputStream = new ByteArrayOutputStream();
+    HtmlBlock.Block block = createBlockToCreateTo(outputStream);
+
+    jobBlock.render(block);
+
+    block.getWriter().flush();
+    String out = outputStream.toString();
+    Assert.assertTrue("Should display warning message for jobs that have too " +
+        "many tasks", out.contains("Any job larger than " + maxAllowedTaskNum +
+            " will not be loaded"));
+  }
+
+  @Test
+  public void testHsJobBlockForNormalSizeJobShouldNotDisplayWarningMessage() {
+
+    Configuration config = new Configuration();
+    config.setInt(JHAdminConfig.MR_HS_LOADED_JOBS_TASKS_MAX, -1);
+
+    JobHistory jobHistory = new JobHitoryStubWithAllNormalSizeJobs();
+    jobHistory.init(config);
+
+    HsJobBlock jobBlock = new HsJobBlock(jobHistory) {
+      // override this so that the job block can fetch a job id.
+      @Override
+      public Map<String, String> moreParams() {
+        Map<String, String> map = new HashMap<>();
+        map.put(AMParams.JOB_ID, "job_0000_0001");
+        return map;
+      }
+
+      // override this to avoid view context lookup in render()
+      @Override
+      public ResponseInfo info(String about) {
+        return new ResponseInfo().about(about);
+      }
+
+      // override this to avoid view context lookup in render()
+      @Override
+      public String url(String... parts) {
+        return StringHelper.ujoin("", parts);
+      }
+    };
+
+    // set up the test block to render HsJobBLock to
+    OutputStream outputStream = new ByteArrayOutputStream();
+    HtmlBlock.Block block = createBlockToCreateTo(outputStream);
+
+    jobBlock.render(block);
+
+    block.getWriter().flush();
+    String out = outputStream.toString();
+
+    Assert.assertTrue("Should display job overview for the job.",
+        out.contains("ApplicationMaster"));
+  }
+
+  private static HtmlBlock.Block createBlockToCreateTo(
+      OutputStream outputStream) {
+    PrintWriter printWriter = new PrintWriter(outputStream);
+    HtmlBlock html = new HtmlBlockForTest();
+    return new BlockForTest(html, printWriter, 10, false) {
+      @Override
+      protected void subView(Class<? extends SubView> cls) {
+      }
+    };
+  };
+
+  /**
+   * A JobHistory stub that treat all jobs as oversized and therefore will
+   * not parse their job history files but return a UnparseJob instance.
+   */
+  static class JobHistoryStubWithAllOversizeJobs extends JobHistory {
+    private final int maxAllowedTaskNum;
+
+    public JobHistoryStubWithAllOversizeJobs(int maxAllowedTaskNum) {
+      this.maxAllowedTaskNum = maxAllowedTaskNum;
+    }
+
+    @Override
+    protected HistoryFileManager createHistoryFileManager() {
+      HistoryFileManager historyFileManager;
+      try {
+        HistoryFileInfo historyFileInfo =
+            createUnparsedJobHistoryFileInfo(maxAllowedTaskNum);
+
+        historyFileManager = mock(HistoryFileManager.class);
+        when(historyFileManager.getFileInfo(any(JobId.class))).thenReturn(
+            historyFileInfo);
+      } catch (IOException ex) {
+        // this should never happen
+        historyFileManager = super.createHistoryFileManager();
+      }
+      return historyFileManager;
+    }
+
+    private static HistoryFileInfo createUnparsedJobHistoryFileInfo(
+        int maxAllowedTaskNum) throws IOException {
+      HistoryFileInfo fileInfo = mock(HistoryFileInfo.class);
+
+      // create an instance of UnparsedJob for a large job
+      UnparsedJob unparsedJob = mock(UnparsedJob.class);
+      when(unparsedJob.getMaxTasksAllowed()).thenReturn(maxAllowedTaskNum);
+      when(unparsedJob.getTotalMaps()).thenReturn(maxAllowedTaskNum);
+      when(unparsedJob.getTotalReduces()).thenReturn(maxAllowedTaskNum);
+
+      when(fileInfo.loadJob()).thenReturn(unparsedJob);
+
+      return fileInfo;
+    }
+  }
+
+  /**
+   * A JobHistory stub that treats all jobs as normal size and therefore will
+   * return a CompletedJob on HistoryFileInfo.loadJob().
+   */
+  static class JobHitoryStubWithAllNormalSizeJobs extends  JobHistory {
+    @Override
+    public HistoryFileManager createHistoryFileManager() {
+      HistoryFileManager historyFileManager;
+      try {
+        HistoryFileInfo historyFileInfo = createParsedJobHistoryFileInfo();
+
+        historyFileManager = mock(HistoryFileManager.class);
+        when(historyFileManager.getFileInfo(any(JobId.class))).thenReturn(
+            historyFileInfo);
+      } catch (IOException ex) {
+        // this should never happen
+        historyFileManager = super.createHistoryFileManager();
+      }
+      return historyFileManager;
+
+    }
+
+    private static HistoryFileInfo createParsedJobHistoryFileInfo()
+        throws IOException {
+      HistoryFileInfo fileInfo = mock(HistoryFileInfo.class);
+      CompletedJob job = createFakeCompletedJob();
+      when(fileInfo.loadJob()).thenReturn(job);
+      return fileInfo;
+    }
+
+
+    private static CompletedJob createFakeCompletedJob() {
+      CompletedJob job = mock(CompletedJob.class);
+
+      when(job.getTotalMaps()).thenReturn(0);
+      when(job.getCompletedMaps()).thenReturn(0);
+      when(job.getTotalReduces()).thenReturn(0);
+      when(job.getCompletedReduces()).thenReturn(0);
+
+      JobId jobId = createFakeJobId();
+      when(job.getID()).thenReturn(jobId);
+
+      JobReport jobReport = mock(JobReport.class);
+      when(jobReport.getSubmitTime()).thenReturn(-1L);
+      when(jobReport.getStartTime()).thenReturn(-1L);
+      when(jobReport.getFinishTime()).thenReturn(-1L);
+      when(job.getReport()).thenReturn(jobReport);
+
+      when(job.getAMInfos()).thenReturn(new ArrayList<AMInfo>());
+      when(job.getDiagnostics()).thenReturn(new ArrayList<String>());
+      when(job.getName()).thenReturn("fake completed job");
+      when(job.getQueueName()).thenReturn("default");
+      when(job.getUserName()).thenReturn("junit");
+      when(job.getState()).thenReturn(JobState.ERROR);
+      when(job.getAllCounters()).thenReturn(new Counters());
+      when(job.getTasks()).thenReturn(new HashMap<TaskId, Task>());
+
+      return job;
+    }
+
+    private static JobId createFakeJobId() {
+      JobId jobId = new JobIdPBImpl();
+      jobId.setId(0);
+
+      ApplicationId appId = mock(ApplicationId.class);
+      when(appId.getClusterTimestamp()).thenReturn(0L);
+      when(appId.getId()).thenReturn(0);
+
+      jobId.setAppId(appId);
+
+      return jobId;
+    }
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message