hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yhema...@apache.org
Subject svn commit: r779893 - in /hadoop/core/trunk: CHANGES.txt src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/MemoryMatcher.java src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
Date Fri, 29 May 2009 08:28:38 GMT
Author: yhemanth
Date: Fri May 29 08:28:37 2009
New Revision: 779893

URL: http://svn.apache.org/viewvc?rev=779893&view=rev
Log:
HADOOP-5932. Fixes a problem in capacity scheduler in computing available memory on a tasktracker.
Contributed by Vinod Kumar Vavilapalli.

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/MemoryMatcher.java
    hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=779893&r1=779892&r2=779893&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Fri May 29 08:28:37 2009
@@ -812,6 +812,10 @@
     output compression for merged data.
     (Jothi Padmanabhan and Billy Pearson via ddas)
 
+    HADOOP-5932. Fixes a problem in capacity scheduler in computing
+    available memory on a tasktracker.
+    (Vinod Kumar Vavilapalli via yhemanth)
+
 Release 0.20.0 - 2009-04-15
 
   INCOMPATIBLE CHANGES

Modified: hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/MemoryMatcher.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/MemoryMatcher.java?rev=779893&r1=779892&r2=779893&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/MemoryMatcher.java
(original)
+++ hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/MemoryMatcher.java
Fri May 29 08:28:37 2009
@@ -53,10 +53,9 @@
    * @return amount of memory that is used by the residing tasks,
    *          null if memory cannot be computed for some reason.
    */
-  private synchronized Long getMemReservedForTasks(
+  synchronized Long getMemReservedForTasks(
       TaskTrackerStatus taskTracker, CapacityTaskScheduler.TYPE taskType) {
     long vmem = 0;
-    long myVmem = 0;
 
     for (TaskStatus task : taskTracker.getTaskReports()) {
       // the following task states are one in which the slot is
@@ -90,6 +89,7 @@
         // tasks' memory limits to the nearest multiple of the slot-memory-size
         // set on JT. This essentially translates to tasks of a high memory job
         // using multiple slots.
+        long myVmem = 0;
         if (task.getIsMap() && taskType.equals(CapacityTaskScheduler.TYPE.MAP)) {
           myVmem = jConf.getMemoryForMapTask();
           myVmem =

Modified: hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java?rev=779893&r1=779892&r2=779893&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
(original)
+++ hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
Fri May 29 08:28:37 2009
@@ -1589,9 +1589,12 @@
     
     // first, a map from j1 will run
     checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+    checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 0L);
+
     // at this point, the scheduler tries to schedule another map from j1. 
     // there isn't enough space. The second job's reduce should be scheduled.
     checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1");
+    checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 1 * 1024L);
   }
 
   /**
@@ -1637,7 +1640,9 @@
 
     // Fill the second tt with this job.
     checkAssignment("tt2", "attempt_test_0001_m_000001_0 on tt2");
+    checkMemReservedForTasksOnTT("tt2", 1 * 1024L, 0L);
     checkAssignment("tt2", "attempt_test_0001_r_000001_0 on tt2");
+    checkMemReservedForTasksOnTT("tt2", 1 * 1024L, 1 * 1024L);
 
     LOG.debug("Submit one high memory(2GB maps/reduces) job of "
         + "2 map, 2 reduce tasks.");
@@ -1651,7 +1656,9 @@
     FakeJobInProgress job2 = submitJobAndInit(JobStatus.PREP, jConf);
 
     checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
+    checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 0L);
     checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1");
+    checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 2 * 1024L);
 
     LOG.debug("Submit one normal memory(1GB maps/reduces) job of "
         + "1 map, 0 reduce tasks.");
@@ -1669,6 +1676,8 @@
     assertNull(scheduler.assignTasks(tracker("tt2")));
     assertNull(scheduler.assignTasks(tracker("tt1")));
     assertNull(scheduler.assignTasks(tracker("tt2")));
+    checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 2 * 1024L);
+    checkMemReservedForTasksOnTT("tt2", 1 * 1024L, 1 * 1024L);
   }
 
   /**
@@ -1717,6 +1726,7 @@
 
     // 1st cycle - 1 map gets assigned.
     Task t = checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+    checkMemReservedForTasksOnTT("tt1",  512L, 0L);
     
     // kill this job !
     taskTrackerManager.killJob(job1.getJobID());
@@ -1738,15 +1748,21 @@
     // 2nd cycle - nothing should get assigned. Memory matching code
     // will see the job is missing and fail memory requirements.
     assertNull(scheduler.assignTasks(tracker("tt1")));
+    checkMemReservedForTasksOnTT("tt1", null, null);
+
     // calling again should not make a difference, as the task is still running
     assertNull(scheduler.assignTasks(tracker("tt1")));
+    checkMemReservedForTasksOnTT("tt1", null, null);
     
     // finish the task on the tracker.
     taskTrackerManager.finishTask("tt1", t.getTaskID().toString(), job1);
     // now a new task can be assigned.
     t = checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
+    checkMemReservedForTasksOnTT("tt1", 512L, 0L);
+
     // reduce can be assigned.
     t = checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1");
+    checkMemReservedForTasksOnTT("tt1", 512L, 512L);
   }
   
   protected TaskTrackerStatus tracker(String taskTrackerName) {
@@ -1761,7 +1777,35 @@
     assertEquals(expectedTaskString, tasks.get(0).toString());
     return tasks.get(0);
   }
-  
+
+  /**
+   * Get the amount of memory that is reserved for tasks on the taskTracker and
+   * verify that it matches what is expected.
+   * 
+   * @param taskTracker
+   * @param expectedMemForMapsOnTT
+   * @param expectedMemForReducesOnTT
+   */
+  private void checkMemReservedForTasksOnTT(String taskTracker,
+      Long expectedMemForMapsOnTT, Long expectedMemForReducesOnTT) {
+    Long observedMemForMapsOnTT =
+        scheduler.memoryMatcher.getMemReservedForTasks(tracker(taskTracker),
+            CapacityTaskScheduler.TYPE.MAP);
+    Long observedMemForReducesOnTT =
+        scheduler.memoryMatcher.getMemReservedForTasks(tracker(taskTracker),
+            CapacityTaskScheduler.TYPE.REDUCE);
+    if (expectedMemForMapsOnTT == null) {
+      assertTrue(observedMemForMapsOnTT == null);
+    } else {
+      assertTrue(observedMemForMapsOnTT.equals(expectedMemForMapsOnTT));
+    }
+    if (expectedMemForReducesOnTT == null) {
+      assertTrue(observedMemForReducesOnTT == null);
+    } else {
+      assertTrue(observedMemForReducesOnTT.equals(expectedMemForReducesOnTT));
+    }
+  }
+
   /*
    * Test cases for Job Initialization poller.
    */
@@ -2170,17 +2214,23 @@
     //scheduled. This task would be scheduled. Till the tasks from job1 gets
     //complete none of the tasks from other jobs would be scheduled.
     checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+    checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 0L);
+
     assertEquals("pending maps greater than zero " , job1.pendingMaps(), 0);
     //make same tracker get back, check if you are blocking. Your job
     //has speculative map task so tracker should be blocked even tho' it
     //can run job2's map.
     assertNull(scheduler.assignTasks(tracker("tt1")));
+    checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 0L);
+
     //TT2 now gets speculative map of the job1
     checkAssignment("tt2", "attempt_test_0001_m_000001_1 on tt2");
+    checkMemReservedForTasksOnTT("tt2", 2 * 1024L, 0L);
 
     // Now since the first job has no more speculative maps, it can schedule
     // the second job.
     checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
+    checkMemReservedForTasksOnTT("tt1", 3 * 1024L, 0L);
 
     //finish everything
     taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000001_0", 
@@ -2223,7 +2273,9 @@
 
     // Finish up the map scheduler
     checkAssignment("tt1", "attempt_test_0003_m_000001_0 on tt1");
+    checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 0L);
     checkAssignment("tt2", "attempt_test_0004_m_000001_0 on tt2");
+    checkMemReservedForTasksOnTT("tt2", 2 * 1024L, 0L);
 
     // first, a reduce from j3 will run
     // at this point, there is a speculative task for the same job to be
@@ -2231,16 +2283,20 @@
     //complete none of the tasks from other jobs would be scheduled.
     checkAssignment("tt1", "attempt_test_0003_r_000001_0 on tt1");
     assertEquals("pending reduces greater than zero " , job3.pendingMaps(), 0);
+    checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 2*1024L);
+
     //make same tracker get back, check if you are blocking. Your job
     //has speculative reduce task so tracker should be blocked even tho' it
     //can run job4's reduce.
     assertNull(scheduler.assignTasks(tracker("tt1")));
     //TT2 now gets speculative map of the job1
     checkAssignment("tt2", "attempt_test_0003_r_000001_1 on tt2");
+    checkMemReservedForTasksOnTT("tt2", 2 * 1024L, 2 * 1024L);
 
     // Now since j3 has no more speculative reduces, it can schedule
     // the j4.
     checkAssignment("tt1", "attempt_test_0004_r_000001_0 on tt1");
+    checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 3 * 1024L);
   }
 
   private void checkRunningJobMovementAndCompletion() throws IOException {



Mime
View raw message