hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From z..@apache.org
Subject hadoop git commit: MAPREDUCE-6622. Add capability to set JHS job cache to a task-based limit (rchiang via rkanter)
Date Mon, 07 Mar 2016 02:35:21 GMT
Repository: hadoop
Updated Branches:
  refs/heads/branch-2.7 71e89e82e -> 66e1e92ff


MAPREDUCE-6622. Add capability to set JHS job cache to a task-based limit (rchiang via rkanter)

(cherry picked from commit 0f72da7e281376f4fcbfbf3fb33f5d7fedcdb1aa)

Conflicts:
	hadoop-mapreduce-project/CHANGES.txt


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/66e1e92f
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/66e1e92f
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/66e1e92f

Branch: refs/heads/branch-2.7
Commit: 66e1e92ffd7d4ebba47d5e47bd05e0dcc2493c89
Parents: 71e89e8
Author: Robert Kanter <rkanter@apache.org>
Authored: Fri Feb 26 17:57:17 2016 -0800
Committer: Zhihai Xu <zxu@apache.org>
Committed: Sun Mar 6 18:34:46 2016 -0800

----------------------------------------------------------------------
 hadoop-mapreduce-project/CHANGES.txt            |   3 +
 .../mapreduce/v2/jobhistory/JHAdminConfig.java  |   8 +-
 .../src/main/resources/mapred-default.xml       |  27 +-
 .../mapreduce/v2/hs/CachedHistoryStorage.java   | 146 ++++++++---
 .../hadoop/mapreduce/v2/hs/TestJobHistory.java  | 255 ++++++++++++++++++-
 5 files changed, 381 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/66e1e92f/hadoop-mapreduce-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index ae855ca..ab9f1fb 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -13,6 +13,9 @@ Release 2.7.3 - UNRELEASED
     MAPREDUCE-6637. Testcase Failure : TestFileInputFormat.testSplitLocationInfo.
     (Brahma Reddy Battula via wang)
 
+    MAPREDUCE-6622. Add capability to set JHS job cache to a
+    task-based limit (rchiang via rkanter)
+
   OPTIMIZATIONS
 
   BUG FIXES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/66e1e92f/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java
index f7cba9f..7deb077 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java
@@ -98,7 +98,7 @@ public class JHAdminConfig {
   public static final String MR_HISTORY_JOBLIST_CACHE_SIZE =
     MR_HISTORY_PREFIX + "joblist.cache.size";
   public static final int DEFAULT_MR_HISTORY_JOBLIST_CACHE_SIZE = 20000;
-  
+
   /** The location of the Kerberos keytab file.*/
   public static final String MR_HISTORY_KEYTAB = MR_HISTORY_PREFIX + "keytab";
   
@@ -106,7 +106,11 @@ public class JHAdminConfig {
   public static final String MR_HISTORY_LOADED_JOB_CACHE_SIZE = 
     MR_HISTORY_PREFIX + "loadedjobs.cache.size";
   public static final int DEFAULT_MR_HISTORY_LOADED_JOB_CACHE_SIZE = 5;
-  
+
+  /** Size of the loaded job cache (in tasks).*/
+  public static final String MR_HISTORY_LOADED_TASKS_CACHE_SIZE =
+      MR_HISTORY_PREFIX + "loadedtasks.cache.size";
+
   /**
    * The maximum age of a job history file before it is deleted from the history
    * server.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/66e1e92f/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
index 9effd76..dc6155a 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
@@ -2034,7 +2034,32 @@
 <property>
   <name>mapreduce.jobhistory.loadedjobs.cache.size</name>
   <value>5</value>
-  <description>Size of the loaded job cache</description>
+  <description>Size of the loaded job cache.  This property is ignored if
+  the property mapreduce.jobhistory.loadedtasks.cache.size is set to a
+  positive value.
+  </description>
+</property>
+
+<property>
+  <name>mapreduce.jobhistory.loadedtasks.cache.size</name>
+  <value></value>
+  <description>Change the job history cache limit to be set in terms
+  of total task count.  If the total number of tasks loaded exceeds
+  this value, then the job cache will be shrunk down until it is
+  under this limit (minimum 1 job in cache).  If this value is empty
+  or nonpositive then the cache reverts to using the property
+  mapreduce.jobhistory.loadedjobs.cache.size as a job cache size.
+
+  Two recommendations for the mapreduce.jobhistory.loadedtasks.cache.size
+  property:
+  1) For every 100k of cache size, set the heap size of the Job History
+     Server to 1.2GB. For example,
+     mapreduce.jobhistory.loadedtasks.cache.size=500000, heap size=6GB.
+  2) Make sure that the cache size is larger than the number of tasks
+     required for the largest job run on the cluster. It might be a good
+     idea to set the value slightly higher (say, 20%) in order to allow
+     for job size growth.
+  </description>
 </property>
 
 <property>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/66e1e92f/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CachedHistoryStorage.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CachedHistoryStorage.java
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CachedHistoryStorage.java
index 8c9abc3..c59d17f 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CachedHistoryStorage.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CachedHistoryStorage.java
@@ -20,12 +20,16 @@ package org.apache.hadoop.mapreduce.v2.hs;
 
 import java.io.IOException;
 import java.util.Collection;
-import java.util.Collections;
-import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.SortedMap;
 import java.util.TreeMap;
 
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.cache.Weigher;
+import com.google.common.util.concurrent.UncheckedExecutionException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -49,9 +53,10 @@ public class CachedHistoryStorage extends AbstractService implements
     HistoryStorage {
   private static final Log LOG = LogFactory.getLog(CachedHistoryStorage.class);
 
-  private Map<JobId, Job> loadedJobCache = null;
-  // The number of loaded jobs.
+  private LoadingCache<JobId, Job> loadedJobCache = null;
   private int loadedJobCacheSize;
+  private int loadedTasksCacheSize;
+  private boolean useLoadedTasksCache;
 
   private HistoryFileManager hsManager;
 
@@ -70,17 +75,70 @@ public class CachedHistoryStorage extends AbstractService implements
 
   @SuppressWarnings("serial")
   private void createLoadedJobCache(Configuration conf) {
+    // Set property for old "loaded jobs" cache
     loadedJobCacheSize = conf.getInt(
         JHAdminConfig.MR_HISTORY_LOADED_JOB_CACHE_SIZE,
         JHAdminConfig.DEFAULT_MR_HISTORY_LOADED_JOB_CACHE_SIZE);
 
-    loadedJobCache = Collections.synchronizedMap(new LinkedHashMap<JobId, Job>(
-        loadedJobCacheSize + 1, 0.75f, true) {
+    // Check property for new "loaded tasks" cache perform sanity checking
+    useLoadedTasksCache = false;
+    try {
+      String taskSizeString = conf
+          .get(JHAdminConfig.MR_HISTORY_LOADED_TASKS_CACHE_SIZE);
+      if (taskSizeString != null) {
+        loadedTasksCacheSize = Math.max(Integer.parseInt(taskSizeString), 1);
+        useLoadedTasksCache = true;
+      }
+    } catch (NumberFormatException nfe) {
+      LOG.error("The property " +
+          JHAdminConfig.MR_HISTORY_LOADED_TASKS_CACHE_SIZE +
+          " is not an integer value.  Please set it to a positive" +
+          " integer value.");
+    }
+
+    CacheLoader<JobId, Job> loader;
+    loader = new CacheLoader<JobId, Job>() {
       @Override
-      public boolean removeEldestEntry(final Map.Entry<JobId, Job> eldest) {
-        return super.size() > loadedJobCacheSize;
+      public Job load(JobId key) throws Exception {
+        return loadJob(key);
       }
-    });
+    };
+
+    if (!useLoadedTasksCache) {
+      loadedJobCache = CacheBuilder.newBuilder()
+          .maximumSize(loadedJobCacheSize)
+          .initialCapacity(loadedJobCacheSize)
+          .concurrencyLevel(1)
+          .build(loader);
+    } else {
+      Weigher<JobId, Job> weightByTasks;
+      weightByTasks = new Weigher<JobId, Job>() {
+        /**
+         * Method for calculating Job weight by total task count.  If
+         * the total task count is greater than the size of the tasks
+         * cache, then cap it at the cache size.  This allows the cache
+         * to always hold one large job.
+         * @param key JobId object
+         * @param value Job object
+         * @return Weight of the job as calculated by total task count
+         */
+        @Override
+        public int weigh(JobId key, Job value) {
+          int taskCount = Math.min(loadedTasksCacheSize,
+              value.getTotalMaps() + value.getTotalReduces());
+          return taskCount;
+        }
+      };
+      // Keep concurrencyLevel at 1.  Otherwise, two problems:
+      // 1) The largest job that can be initially loaded is
+      //    cache size / 4.
+      // 2) Unit tests are not deterministic.
+      loadedJobCache = CacheBuilder.newBuilder()
+          .maximumWeight(loadedTasksCacheSize)
+          .weigher(weightByTasks)
+          .concurrencyLevel(1)
+          .build(loader);
+    }
   }
   
   public void refreshLoadedJobCache() {
@@ -100,52 +158,48 @@ public class CachedHistoryStorage extends AbstractService implements
   public CachedHistoryStorage() {
     super(CachedHistoryStorage.class.getName());
   }
+
+  private static class HSFileRuntimeException extends RuntimeException {
+    public HSFileRuntimeException(String message) {
+      super(message);
+    }
+  }
   
-  private Job loadJob(HistoryFileInfo fileInfo) {
-    try {
-      Job job = fileInfo.loadJob();
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Adding " + job.getID() + " to loaded job cache");
-      }
-      // We can clobber results here, but that should be OK, because it only
-      // means that we may have two identical copies of the same job floating
-      // around for a while.
-      loadedJobCache.put(job.getID(), job);
-      return job;
-    } catch (IOException e) {
-      throw new YarnRuntimeException(
-          "Could not find/load job: " + fileInfo.getJobId(), e);
+  private Job loadJob(JobId jobId) throws RuntimeException, IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Looking for Job " + jobId);
+    }
+    HistoryFileInfo fileInfo;
+
+    fileInfo = hsManager.getFileInfo(jobId);
+    if (fileInfo == null) {
+      throw new HSFileRuntimeException("Unable to find job " + jobId);
+    } else if (fileInfo.isDeleted()) {
+      throw new HSFileRuntimeException("Cannot load deleted job " + jobId);
+    } else {
+      return fileInfo.loadJob();
     }
   }
 
   @VisibleForTesting
-  Map<JobId, Job> getLoadedJobCache() {
+  Cache<JobId, Job> getLoadedJobCache() {
     return loadedJobCache;
   }
   
   @Override
   public Job getFullJob(JobId jobId) {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Looking for Job " + jobId);
-    }
+    Job retVal = null;
     try {
-      HistoryFileInfo fileInfo = hsManager.getFileInfo(jobId);
-      Job result = null;
-      if (fileInfo != null) {
-        result = loadedJobCache.get(jobId);
-        if (result == null) {
-          result = loadJob(fileInfo);
-        } else if(fileInfo.isDeleted()) {
-          loadedJobCache.remove(jobId);
-          result = null;
-        }
+      retVal = loadedJobCache.getUnchecked(jobId);
+    } catch (UncheckedExecutionException e) {
+      if (e.getCause() instanceof HSFileRuntimeException) {
+        LOG.error(e.getCause().getMessage());
+        return null;
       } else {
-        loadedJobCache.remove(jobId);
+        throw new YarnRuntimeException(e.getCause());
       }
-      return result;
-    } catch (IOException e) {
-      throw new YarnRuntimeException(e);
     }
+    return retVal;
   }
 
   @Override
@@ -243,4 +297,14 @@ public class CachedHistoryStorage extends AbstractService implements
     }
     return allJobs;
   }
+
+  @VisibleForTesting
+  public boolean getUseLoadedTasksCache() {
+    return useLoadedTasksCache;
+  }
+
+  @VisibleForTesting
+  public int getLoadedTasksCacheSize() {
+    return loadedTasksCacheSize;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/66e1e92f/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistory.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistory.java
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistory.java
index de0de7d..936c772 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistory.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistory.java
@@ -24,6 +24,7 @@ import java.io.IOException;
 import java.util.LinkedList;
 import java.util.List;
 
+import com.google.common.cache.Cache;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.FileStatus;
@@ -35,16 +36,27 @@ import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.JobListCache;
 import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobsInfo;
 import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
 import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.junit.After;
 import org.junit.Test;
 import org.mockito.Mockito;
 
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.*;
-import org.apache.hadoop.mapreduce.v2.app.job.Job;
-
+import static junit.framework.TestCase.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.apache.hadoop.mapreduce.v2.app.job.Job;
 
 public class TestJobHistory {
 
@@ -58,13 +70,15 @@ public class TestJobHistory {
 
     Configuration conf = new Configuration();
     // Set the cache size to 2
-    conf.set(JHAdminConfig.MR_HISTORY_LOADED_JOB_CACHE_SIZE, "2");
+    conf.setInt(JHAdminConfig.MR_HISTORY_LOADED_JOB_CACHE_SIZE, 2);
     jobHistory.init(conf);
     jobHistory.start();
 
     CachedHistoryStorage storage = spy((CachedHistoryStorage) jobHistory
         .getHistoryStorage());
 
+    assertFalse(storage.getUseLoadedTasksCache());
+
     Job[] jobs = new Job[3];
     JobId[] jobIds = new JobId[3];
 
@@ -84,14 +98,13 @@ public class TestJobHistory {
       storage.getFullJob(jobs[i].getID());
     }
 
-    Map<JobId, Job> jobCache = storage.getLoadedJobCache();
-    // job0 should have been purged since cache size is 2
-    assertFalse(jobCache.containsKey(jobs[0].getID()));
-    assertTrue(jobCache.containsKey(jobs[1].getID())
-        && jobCache.containsKey(jobs[2].getID()));
+    Cache<JobId, Job> jobCache = storage.getLoadedJobCache();
+    // Verify some jobs are stored in the cache.  Hard to predict eviction
+    // in Guava version.
+    assertTrue(jobCache.size() > 0);
 
     // Setting cache size to 3
-    conf.set(JHAdminConfig.MR_HISTORY_LOADED_JOB_CACHE_SIZE, "3");
+    conf.setInt(JHAdminConfig.MR_HISTORY_LOADED_JOB_CACHE_SIZE, 3);
     doReturn(conf).when(storage).createConf();
 
     when(fileInfo.loadJob()).thenReturn(jobs[0]).thenReturn(jobs[1])
@@ -105,9 +118,223 @@ public class TestJobHistory {
 
     jobCache = storage.getLoadedJobCache();
 
-    // All three jobs should be in cache since its size is now 3
-    for (int i = 0; i < 3; i++) {
-      assertTrue(jobCache.containsKey(jobs[i].getID()));
+    // Verify some jobs are stored in the cache.  Hard to predict eviction
+    // in Guava version.
+    assertTrue(jobCache.size() > 0);
+  }
+
+  @Test
+  public void testTasksCacheLimit() throws Exception {
+    HistoryFileManager historyManager = mock(HistoryFileManager.class);
+    jobHistory = spy(new JobHistory());
+    doReturn(historyManager).when(jobHistory).createHistoryFileManager();
+
+    Configuration conf = new Configuration();
+    // Set the cache threshold to 50 tasks
+    conf.setInt(JHAdminConfig.MR_HISTORY_LOADED_TASKS_CACHE_SIZE, 50);
+    jobHistory.init(conf);
+    jobHistory.start();
+
+    CachedHistoryStorage storage = spy((CachedHistoryStorage) jobHistory
+        .getHistoryStorage());
+
+    assertTrue(storage.getUseLoadedTasksCache());
+    assertEquals(storage.getLoadedTasksCacheSize(), 50);
+
+    // Create a bunch of smaller jobs (<< 50 tasks)
+    Job[] jobs = new Job[10];
+    JobId[] jobIds = new JobId[10];
+    for (int i = 0; i < jobs.length; i++) {
+      jobs[i] = mock(Job.class);
+      jobIds[i] = mock(JobId.class);
+      when(jobs[i].getID()).thenReturn(jobIds[i]);
+      when(jobs[i].getTotalMaps()).thenReturn(10);
+      when(jobs[i].getTotalReduces()).thenReturn(2);
+    }
+
+    // Create some large jobs that forces task-based cache flushing
+    Job[] lgJobs = new Job[3];
+    JobId[] lgJobIds = new JobId[3];
+    for (int i = 0; i < lgJobs.length; i++) {
+      lgJobs[i] = mock(Job.class);
+      lgJobIds[i] = mock(JobId.class);
+      when(lgJobs[i].getID()).thenReturn(lgJobIds[i]);
+      when(lgJobs[i].getTotalMaps()).thenReturn(2000);
+      when(lgJobs[i].getTotalReduces()).thenReturn(10);
+    }
+
+    HistoryFileInfo fileInfo = mock(HistoryFileInfo.class);
+    when(historyManager.getFileInfo(any(JobId.class))).thenReturn(fileInfo);
+    when(fileInfo.loadJob()).thenReturn(jobs[0]).thenReturn(jobs[1])
+        .thenReturn(jobs[2]).thenReturn(jobs[3]).thenReturn(jobs[4])
+        .thenReturn(jobs[5]).thenReturn(jobs[6]).thenReturn(jobs[7])
+        .thenReturn(jobs[8]).thenReturn(jobs[9]).thenReturn(lgJobs[0])
+        .thenReturn(lgJobs[1]).thenReturn(lgJobs[2]);
+
+    // getFullJob will put the job in the cache if it isn't there
+    Cache<JobId, Job> jobCache = storage.getLoadedJobCache();
+    for (int i = 0; i < jobs.length; i++) {
+      storage.getFullJob(jobs[i].getID());
+    }
+    long prevSize = jobCache.size();
+
+    // Fill the cache with some larger jobs and verify the cache
+    // gets reduced in size.
+    for (int i = 0; i < lgJobs.length; i++) {
+      storage.getFullJob(lgJobs[i].getID());
+    }
+    assertTrue(jobCache.size() < prevSize);
+  }
+
+  @Test
+  public void testJobCacheLimitLargerThanMax() throws Exception {
+    HistoryFileManager historyManager = mock(HistoryFileManager.class);
+    JobHistory jobHistory = spy(new JobHistory());
+    doReturn(historyManager).when(jobHistory).createHistoryFileManager();
+
+    Configuration conf = new Configuration();
+    // Set the cache threshold to 50 tasks
+    conf.setInt(JHAdminConfig.MR_HISTORY_LOADED_TASKS_CACHE_SIZE, 500);
+    jobHistory.init(conf);
+    jobHistory.start();
+
+    CachedHistoryStorage storage = spy((CachedHistoryStorage) jobHistory
+        .getHistoryStorage());
+
+    assertTrue(storage.getUseLoadedTasksCache());
+    assertEquals(storage.getLoadedTasksCacheSize(), 500);
+
+    // Create a bunch of large jobs (>> 50 tasks)
+    Job[] lgJobs = new Job[10];
+    JobId[] lgJobIds = new JobId[10];
+    for (int i = 0; i < lgJobs.length; i++) {
+      lgJobs[i] = mock(Job.class);
+      lgJobIds[i] = mock(JobId.class);
+      when(lgJobs[i].getID()).thenReturn(lgJobIds[i]);
+      when(lgJobs[i].getTotalMaps()).thenReturn(700);
+      when(lgJobs[i].getTotalReduces()).thenReturn(50);
+    }
+
+    HistoryFileInfo fileInfo = mock(HistoryFileInfo.class);
+    when(historyManager.getFileInfo(any(JobId.class))).thenReturn(fileInfo);
+    when(fileInfo.loadJob()).thenReturn(lgJobs[0]).thenReturn(lgJobs[1])
+        .thenReturn(lgJobs[2]).thenReturn(lgJobs[3]).thenReturn(lgJobs[4])
+        .thenReturn(lgJobs[5]).thenReturn(lgJobs[6]).thenReturn(lgJobs[7])
+        .thenReturn(lgJobs[8]).thenReturn(lgJobs[9]);
+
+    // getFullJob will put the job in the cache if it isn't there
+    Cache<JobId, Job> jobCache = storage.getLoadedJobCache();
+    long[] cacheSize = new long[10];
+    for (int i = 0; i < lgJobs.length; i++) {
+      storage.getFullJob(lgJobs[i].getID());
+      assertTrue(jobCache.size() > 0);
+    }
+  }
+
+  @Test
+  public void testLoadedTasksEmptyConfiguration() {
+    Configuration conf = new Configuration();
+    conf.set(JHAdminConfig.MR_HISTORY_LOADED_TASKS_CACHE_SIZE, "");
+
+    HistoryFileManager historyManager = mock(HistoryFileManager.class);
+    JobHistory jobHistory = spy(new JobHistory());
+    doReturn(historyManager).when(jobHistory).createHistoryFileManager();
+    jobHistory.init(conf);
+    jobHistory.start();
+
+    CachedHistoryStorage storage = spy((CachedHistoryStorage) jobHistory
+        .getHistoryStorage());
+
+    assertFalse(storage.getUseLoadedTasksCache());
+  }
+
+  @Test
+  public void testLoadedTasksZeroConfiguration() {
+    Configuration conf = new Configuration();
+    conf.setInt(JHAdminConfig.MR_HISTORY_LOADED_TASKS_CACHE_SIZE, 0);
+
+    HistoryFileManager historyManager = mock(HistoryFileManager.class);
+    JobHistory jobHistory = spy(new JobHistory());
+    doReturn(historyManager).when(jobHistory).createHistoryFileManager();
+    jobHistory.init(conf);
+    jobHistory.start();
+
+    CachedHistoryStorage storage = spy((CachedHistoryStorage) jobHistory
+        .getHistoryStorage());
+
+    assertTrue(storage.getUseLoadedTasksCache());
+    assertEquals(storage.getLoadedTasksCacheSize(), 1);
+  }
+
+  @Test
+  public void testLoadedTasksNegativeConfiguration() {
+    Configuration conf = new Configuration();
+    conf.setInt(JHAdminConfig.MR_HISTORY_LOADED_TASKS_CACHE_SIZE, -1);
+
+    HistoryFileManager historyManager = mock(HistoryFileManager.class);
+    JobHistory jobHistory = spy(new JobHistory());
+    doReturn(historyManager).when(jobHistory).createHistoryFileManager();
+    jobHistory.init(conf);
+    jobHistory.start();
+
+    CachedHistoryStorage storage = spy((CachedHistoryStorage) jobHistory
+        .getHistoryStorage());
+
+    assertTrue(storage.getUseLoadedTasksCache());
+    assertEquals(storage.getLoadedTasksCacheSize(), 1);
+  }
+
+  @Test
+  public void testLoadJobErrorCases() throws IOException {
+    HistoryFileManager historyManager = mock(HistoryFileManager.class);
+    jobHistory = spy(new JobHistory());
+    doReturn(historyManager).when(jobHistory).createHistoryFileManager();
+
+    Configuration conf = new Configuration();
+    // Set the cache threshold to 50 tasks
+    conf.setInt(JHAdminConfig.MR_HISTORY_LOADED_TASKS_CACHE_SIZE, 50);
+    jobHistory.init(conf);
+    jobHistory.start();
+
+    CachedHistoryStorage storage = spy((CachedHistoryStorage) jobHistory
+        .getHistoryStorage());
+
+    assertTrue(storage.getUseLoadedTasksCache());
+    assertEquals(storage.getLoadedTasksCacheSize(), 50);
+
+    // Create jobs for bad fileInfo results
+    Job[] jobs = new Job[4];
+    JobId[] jobIds = new JobId[4];
+    for (int i = 0; i < jobs.length; i++) {
+      jobs[i] = mock(Job.class);
+      jobIds[i] = mock(JobId.class);
+      when(jobs[i].getID()).thenReturn(jobIds[i]);
+      when(jobs[i].getTotalMaps()).thenReturn(10);
+      when(jobs[i].getTotalReduces()).thenReturn(2);
+    }
+
+    HistoryFileInfo loadJobException = mock(HistoryFileInfo.class);
+    when(loadJobException.loadJob()).thenThrow(new IOException("History file not found"));
+    when(historyManager.getFileInfo(jobIds[0])).thenThrow(new IOException(""));
+    when(historyManager.getFileInfo(jobIds[1])).thenReturn(null);
+    when(historyManager.getFileInfo(jobIds[2])).thenReturn(loadJobException);
+
+    try {
+      storage.getFullJob(jobIds[0]);
+      fail("Did not get expected YarnRuntimeException for getFileInfo() throwing IOException");
+    } catch (YarnRuntimeException e) {
+      // Expected
+    }
+
+    // fileInfo==null should return null
+    Job job = storage.getFullJob(jobIds[1]);
+    assertNull(job);
+
+    try {
+      storage.getFullJob(jobIds[2]);
+      fail("Did not get expected YarnRuntimeException for fileInfo.loadJob() throwing IOException");
+    } catch (YarnRuntimeException e) {
+      // Expected
     }
   }
 


Mime
View raw message