hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ha...@apache.org
Subject svn commit: r1242979 - in /hadoop/common/branches/branch-1: CHANGES.txt src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
Date Fri, 10 Feb 2012 23:31:35 GMT
Author: harsh
Date: Fri Feb 10 23:31:34 2012
New Revision: 1242979

URL: http://svn.apache.org/viewvc?rev=1242979&view=rev
Log:
MAPREDUCE-3789. CapacityTaskScheduler shouldn't perform unnecessary reservations, when used
in heterogenous environments. (harsh)


Modified:
    hadoop/common/branches/branch-1/CHANGES.txt
    hadoop/common/branches/branch-1/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
    hadoop/common/branches/branch-1/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java

Modified: hadoop/common/branches/branch-1/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/CHANGES.txt?rev=1242979&r1=1242978&r2=1242979&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/CHANGES.txt (original)
+++ hadoop/common/branches/branch-1/CHANGES.txt Fri Feb 10 23:31:34 2012
@@ -92,6 +92,8 @@ Release 1.1.0 - unreleased
     HDFS-2877. If locking of a storage dir fails, it will remove the other
     NN's lock file on exit. (todd)
 
+    MAPREDUCE-3789. CapacityTaskScheduler shouldn't perform unnecessary reservations, when
used in heterogenous environments. (harsh)
+
   IMPROVEMENTS
 
     MAPREDUCE-3597. [Rumen] Provide a way to access other info of history file

Modified: hadoop/common/branches/branch-1/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java?rev=1242979&r1=1242978&r2=1242979&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
(original)
+++ hadoop/common/branches/branch-1/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
Fri Feb 10 23:31:34 2012
@@ -342,7 +342,10 @@ class CapacityTaskScheduler extends Task
           // tasktrackers to cover all pending tasks. If so we reserve the
           // current tasktracker for this job so that high memory jobs are not
           // starved
-          if ((getPendingTasks(j) != 0 && !hasSufficientReservedTaskTrackers(j)))
{
+          if ((getPendingTasks(j) != 0 &&
+              !hasSufficientReservedTaskTrackers(j)) &&
+                (taskTracker.getAvailableSlots(type) !=
+                 getTTMaxSlotsForType(taskTrackerStatus, type))) {
             // Reserve all available slots on this tasktracker
             LOG.info(j.getJobID() + ": Reserving "
                 + taskTracker.getTrackerName()
@@ -1225,5 +1228,8 @@ class CapacityTaskScheduler extends Task
     return queue.toString();
   }
 
+  private static int getTTMaxSlotsForType(TaskTrackerStatus status, TaskType type) {
+      return (type == TaskType.MAP) ? status.getMaxMapSlots() : status.getMaxReduceSlots();
+  }
 }
 

Modified: hadoop/common/branches/branch-1/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java?rev=1242979&r1=1242978&r2=1242979&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
(original)
+++ hadoop/common/branches/branch-1/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
Fri Feb 10 23:31:34 2012
@@ -487,7 +487,18 @@ public class TestCapacityScheduler exten
                                          maxReduceTasksPerTracker));
       trackers.put(ttName, tt);
     }
-    
+
+    public void addTaskTracker(String ttName,
+        int maxMapTasksPerTracker,
+        int maxReduceTasksPerTracker) {
+      TaskTracker tt = new TaskTracker(ttName);
+      tt.setStatus(new TaskTrackerStatus(ttName, ttName + ".host", 1,
+                                         new ArrayList<TaskStatus>(), 0, 0,
+                                         maxMapTasksPerTracker,
+                                         maxReduceTasksPerTracker));
+      trackers.put(ttName, tt);
+    }
+
     public ClusterStatus getClusterStatus() {
       int numTrackers = trackers.size();
       return new ClusterStatus(numTrackers, maps, reduces,
@@ -3039,7 +3050,82 @@ public class TestCapacityScheduler exten
 
     // No more tasks there in job3 also
     assertEquals(0, scheduler.assignTasks(tracker("tt3")).size());
-}
+  }
+
+  /**
+   * Test to verify that TTs are not reserved in case the required memory
+   * exceeds the total availability of memory on TT.
+   * @throws IOException
+   */
+  public void testTTReservingInHeterogenousEnvironment()
+      throws IOException {
+    // 2 taskTrackers, 4 map slots on one and 3 map slot on another.
+    taskTrackerManager = new FakeTaskTrackerManager(1, 4, 0);
+    taskTrackerManager.addTaskTracker("tt2", 3, 0);
+
+    taskTrackerManager.addQueues(new String[] { "default" });
+    ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
+    queues.add(new FakeQueueInfo("default", 100.0f, true, 100));
+    resConf.setFakeQueues(queues);
+    scheduler.setTaskTrackerManager(taskTrackerManager);
+    // enabled memory-based scheduling
+    // Normal job in the cluster would be 2GB maps/reduces
+    // Max allowed map memory would be 8GB.
+    scheduler.getConf().setLong(
+        JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY, 8 * 1024);
+    scheduler.getConf().setLong(
+        JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY, 2 * 1024);
+    scheduler.getConf().setLong(
+        JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY, 8 * 1024);
+    scheduler.getConf().setLong(
+        JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY, 2 * 1024);
+    scheduler.setResourceManagerConf(resConf);
+    scheduler.start();
+
+    LOG.debug("Submit a  memory(7GB vmem maps/reduces) job of "
+        + "2 map & 0 red tasks");
+    JobConf jConf = new JobConf(conf);
+
+    jConf = new JobConf(conf);
+    // We require 7GB maps, so thats worth 4 slots on the cluster.
+    jConf.setMemoryForMapTask(7 * 1024);
+    jConf.setMemoryForReduceTask(1 * 1024);
+    // Hence, 4 + 4 slots are required totally, for two tasks.
+    jConf.setNumMapTasks(2);
+    jConf.setNumReduceTasks(0);
+    jConf.setQueueName("default");
+    jConf.setUser("u1");
+    FakeJobInProgress job = submitJobAndInit(JobStatus.PREP, jConf);
+    // Heartbeating the trackers
+    scheduler.assignTasks(tracker("tt1"));
+    scheduler.assignTasks(tracker("tt2"));
+    scheduler.updateQueueUsageForTests();
+    LOG.info(job.getSchedulingInfo());
+    // tt2 can at most run 3 slots while each map task of this job requires
+    // at least 4 minimum slots to run.
+    // tt2 should not at all be reserved, hence. Since it would be a waste of
+    // slots for other jobs.
+    assertEquals("Tracker tt2 got reserved unnecessarily.",
+        0, scheduler.getMapScheduler().getNumReservedTaskTrackers(job));
+    assertEquals(
+        // Should be running only one map task worth four slots,
+        // and no reservations.
+        CapacityTaskScheduler.getJobQueueSchedInfo(1, 4, 0, 0, 0, 0),
+        (String) job.getSchedulingInfo());
+    jConf = new JobConf(conf);
+    // Try submitting a 3-slot worthy job, targeting tt2
+    // 5 GB should be worth 3 slots (2GB/map)
+    jConf.setMemoryForMapTask(5 * 1024);
+    jConf.setMemoryForReduceTask(1 * 1024);
+    // Just one task, targetting an unreserved tt2
+    jConf.setNumMapTasks(1);
+    jConf.setNumReduceTasks(0);
+    jConf.setQueueName("default");
+    jConf.setUser("u1");
+    submitJobAndInit(JobStatus.PREP, jConf);
+    // TT2 should get assigned.
+    checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2");
+  }
 
   /**
    * Test to verify that queue ordering is based on the number of slots occupied



Mime
View raw message