hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yhema...@apache.org
Subject svn commit: r824757 [1/3] - in /hadoop/mapreduce/branches/branch-0.21: ./ conf/ src/c++/ src/contrib/ src/contrib/capacity-scheduler/ src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/ src/contrib/capacity-scheduler/src/test/org/apache/h...
Date Tue, 13 Oct 2009 13:44:14 GMT
Author: yhemanth
Date: Tue Oct 13 13:44:13 2009
New Revision: 824757

URL: http://svn.apache.org/viewvc?rev=824757&view=rev
Log:
Merge -r 824749:824750 from trunk to branch-0.21 to fix MAPREDUCE-1030.

Modified:
    hadoop/mapreduce/branches/branch-0.21/   (props changed)
    hadoop/mapreduce/branches/branch-0.21/.gitignore   (props changed)
    hadoop/mapreduce/branches/branch-0.21/CHANGES.txt
    hadoop/mapreduce/branches/branch-0.21/conf/   (props changed)
    hadoop/mapreduce/branches/branch-0.21/conf/capacity-scheduler.xml.template   (props changed)
    hadoop/mapreduce/branches/branch-0.21/src/c++/   (props changed)
    hadoop/mapreduce/branches/branch-0.21/src/contrib/   (props changed)
    hadoop/mapreduce/branches/branch-0.21/src/contrib/build-contrib.xml   (props changed)
    hadoop/mapreduce/branches/branch-0.21/src/contrib/build.xml   (props changed)
    hadoop/mapreduce/branches/branch-0.21/src/contrib/capacity-scheduler/   (props changed)
    hadoop/mapreduce/branches/branch-0.21/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
    hadoop/mapreduce/branches/branch-0.21/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/CapacityTestUtils.java
    hadoop/mapreduce/branches/branch-0.21/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
    hadoop/mapreduce/branches/branch-0.21/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestContainerQueue.java
    hadoop/mapreduce/branches/branch-0.21/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestRefreshOfQueues.java
    hadoop/mapreduce/branches/branch-0.21/src/contrib/data_join/   (props changed)
    hadoop/mapreduce/branches/branch-0.21/src/contrib/dynamic-scheduler/   (props changed)
    hadoop/mapreduce/branches/branch-0.21/src/contrib/eclipse-plugin/   (props changed)
    hadoop/mapreduce/branches/branch-0.21/src/contrib/fairscheduler/   (props changed)
    hadoop/mapreduce/branches/branch-0.21/src/contrib/index/   (props changed)
    hadoop/mapreduce/branches/branch-0.21/src/contrib/mrunit/   (props changed)
    hadoop/mapreduce/branches/branch-0.21/src/contrib/sqoop/   (props changed)
    hadoop/mapreduce/branches/branch-0.21/src/contrib/streaming/   (props changed)
    hadoop/mapreduce/branches/branch-0.21/src/contrib/vaidya/   (props changed)
    hadoop/mapreduce/branches/branch-0.21/src/examples/   (props changed)
    hadoop/mapreduce/branches/branch-0.21/src/java/   (props changed)
    hadoop/mapreduce/branches/branch-0.21/src/test/mapred/   (props changed)
    hadoop/mapreduce/branches/branch-0.21/src/webapps/job/   (props changed)

Propchange: hadoop/mapreduce/branches/branch-0.21/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Oct 13 13:44:13 2009
@@ -1,2 +1,2 @@
 /hadoop/core/branches/branch-0.19/mapred:713112
-/hadoop/mapreduce/trunk:818355,818918,818946
+/hadoop/mapreduce/trunk:818355,818918,818946,824750

Propchange: hadoop/mapreduce/branches/branch-0.21/.gitignore
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Oct 13 13:44:13 2009
@@ -1,3 +1,3 @@
 /hadoop/core/branches/branch-0.19/mapred/.gitignore:713112
 /hadoop/core/trunk/.gitignore:784664-785643
-/hadoop/mapreduce/trunk/.gitignore:818355,818918,818946
+/hadoop/mapreduce/trunk/.gitignore:818355,818918,818946,824750

Modified: hadoop/mapreduce/branches/branch-0.21/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/CHANGES.txt?rev=824757&r1=824756&r2=824757&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/CHANGES.txt (original)
+++ hadoop/mapreduce/branches/branch-0.21/CHANGES.txt Tue Oct 13 13:44:13 2009
@@ -729,4 +729,8 @@
     MAPREDUCE-979. Fixed JobConf APIs related to memory parameters to return
     values of new configuration variables when deprecated variables are
     disabled. (Sreekanth Ramakrishnan via yhemanth)
-    
+   
+    MAPREDUCE-1030. Modified scheduling algorithm to return a map and reduce
+    task per heartbeat in the capacity scheduler.
+    (Rahul Kumar Singh via yhemanth)
+ 

Propchange: hadoop/mapreduce/branches/branch-0.21/conf/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Oct 13 13:44:13 2009
@@ -1,3 +1,3 @@
 /hadoop/core/branches/branch-0.19/mapred/conf:713112
 /hadoop/core/trunk/conf:784664-785643
-/hadoop/mapreduce/trunk/conf:818355,818918,818946
+/hadoop/mapreduce/trunk/conf:818355,818918,818946,824750

Propchange: hadoop/mapreduce/branches/branch-0.21/conf/capacity-scheduler.xml.template
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Oct 13 13:44:13 2009
@@ -1,3 +1,3 @@
 /hadoop/core/branches/branch-0.19/mapred/conf/capacity-scheduler.xml.template:713112
 /hadoop/core/trunk/conf/capacity-scheduler.xml.template:776175-785643
-/hadoop/mapreduce/trunk/conf/capacity-scheduler.xml.template:818355,818918,818946
+/hadoop/mapreduce/trunk/conf/capacity-scheduler.xml.template:818355,818918,818946,824750

Propchange: hadoop/mapreduce/branches/branch-0.21/src/c++/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Oct 13 13:44:13 2009
@@ -1,3 +1,3 @@
 /hadoop/core/branches/branch-0.19/mapred/src/c++:713112
 /hadoop/core/trunk/src/c++:776175-784663
-/hadoop/mapreduce/trunk/src/c++:818355,818918,818946
+/hadoop/mapreduce/trunk/src/c++:818355,818918,818946,824750

Propchange: hadoop/mapreduce/branches/branch-0.21/src/contrib/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Oct 13 13:44:13 2009
@@ -1,3 +1,3 @@
 /hadoop/core/branches/branch-0.19/mapred/src/contrib:713112
 /hadoop/core/trunk/src/contrib:784664-785643
-/hadoop/mapreduce/trunk/src/contrib:818355,818918,818946
+/hadoop/mapreduce/trunk/src/contrib:818355,818918,818946,824750

Propchange: hadoop/mapreduce/branches/branch-0.21/src/contrib/build-contrib.xml
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Oct 13 13:44:13 2009
@@ -1,3 +1,3 @@
 /hadoop/core/branches/branch-0.19/mapred/src/contrib/build-contrib.xml:713112
 /hadoop/core/trunk/src/contrib/build-contrib.xml:776175-786373
-/hadoop/mapreduce/trunk/src/contrib/build-contrib.xml:818355,818918,818946
+/hadoop/mapreduce/trunk/src/contrib/build-contrib.xml:818355,818918,818946,824750

Propchange: hadoop/mapreduce/branches/branch-0.21/src/contrib/build.xml
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Oct 13 13:44:13 2009
@@ -1,3 +1,3 @@
 /hadoop/core/branches/branch-0.19/mapred/src/contrib/build.xml:713112
 /hadoop/core/trunk/src/contrib/build.xml:776175-786373
-/hadoop/mapreduce/trunk/src/contrib/build.xml:818355,818918,818946
+/hadoop/mapreduce/trunk/src/contrib/build.xml:818355,818918,818946,824750

Propchange: hadoop/mapreduce/branches/branch-0.21/src/contrib/capacity-scheduler/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Oct 13 13:44:13 2009
@@ -1,3 +1,3 @@
 /hadoop/core/branches/branch-0.19/mapred/src/contrib/capacity-scheduler:713112
 /hadoop/core/trunk/src/contrib/capacity-scheduler:776175-786373
-/hadoop/mapreduce/trunk/src/contrib/capacity-scheduler:818355,818918,818946
+/hadoop/mapreduce/trunk/src/contrib/capacity-scheduler:818355,818918,818946,824750

Modified: hadoop/mapreduce/branches/branch-0.21/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java?rev=824757&r1=824756&r2=824757&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
(original)
+++ hadoop/mapreduce/branches/branch-0.21/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
Tue Oct 13 13:44:13 2009
@@ -906,8 +906,10 @@
 
   /*
    * The grand plan for assigning a task. 
-   * First, decide whether a Map or Reduce task should be given to a TT 
-   * (if the TT can accept either). 
+   * Always assigns 1 reduce and 1 map , if sufficient slots are
+   * available for each of types.
+   * If not , then which ever type of slots are available , that type of task is
+   * assigned.
    * Next, pick a queue. We only look at queues that need a slot. Among these,
    * we first look at queues whose (# of running tasks)/capacity is the least.
    * Next, pick a job in a queue. we pick the job at the front of the queue
@@ -921,12 +923,12 @@
     
     TaskLookupResult tlr;
     TaskTrackerStatus taskTrackerStatus = taskTracker.getStatus();
+    List<Task> result = new ArrayList<Task>();
     
     /* 
-     * If TT has Map and Reduce slot free, we need to figure out whether to
-     * give it a Map or Reduce task.
-     * Number of ways to do this. For now, base decision on how much is needed
-     * versus how much is used (default to Map, if equal).
+     * If TT has Map and Reduce slot free, we assign 1 map and 1 reduce
+     * We  base decision on how much is needed
+     * versus how much is used
      */
     ClusterStatus c = taskTrackerManager.getClusterStatus();
     int mapClusterCapacity = c.getMaxMapTasks();
@@ -953,51 +955,26 @@
     // make sure we get our map or reduce scheduling object to update its 
     // collection of QSC objects too.
 
-    if ((maxReduceSlots - currentReduceSlots) > 
-    (maxMapSlots - currentMapSlots)) {
-      // get a reduce task first
+    if (maxReduceSlots > currentReduceSlots) {
+      //reduce slot available , try to get a
+      //reduce task
       tlr = reduceScheduler.assignTasks(taskTracker);
       if (TaskLookupResult.LookUpStatus.TASK_FOUND == 
         tlr.getLookUpStatus()) {
-        // found a task; return
-        return Collections.singletonList(tlr.getTask());
-      }
-      // if we didn't get any, look at map tasks, if TT has space
-      else if ((TaskLookupResult.LookUpStatus.TASK_FAILING_MEMORY_REQUIREMENT
-                                  == tlr.getLookUpStatus() ||
-                TaskLookupResult.LookUpStatus.NO_TASK_FOUND
-                                  == tlr.getLookUpStatus())
-          && (maxMapSlots > currentMapSlots)) {
-        tlr = mapScheduler.assignTasks(taskTracker);
-        if (TaskLookupResult.LookUpStatus.TASK_FOUND == 
-          tlr.getLookUpStatus()) {
-          return Collections.singletonList(tlr.getTask());
-        }
+        result.add(tlr.getTask());
       }
     }
-    else {
-      // get a map task first
+
+    if(maxMapSlots > currentMapSlots) {
+      //map slot available , try to get a map task
       tlr = mapScheduler.assignTasks(taskTracker);
       if (TaskLookupResult.LookUpStatus.TASK_FOUND == 
         tlr.getLookUpStatus()) {
-        // found a task; return
-        return Collections.singletonList(tlr.getTask());
-      }
-      // if we didn't get any, look at reduce tasks, if TT has space
-      else if ((TaskLookupResult.LookUpStatus.TASK_FAILING_MEMORY_REQUIREMENT
-                                    == tlr.getLookUpStatus()
-                || TaskLookupResult.LookUpStatus.NO_TASK_FOUND
-                                    == tlr.getLookUpStatus())
-          && (maxReduceSlots > currentReduceSlots)) {
-        tlr = reduceScheduler.assignTasks(taskTracker);
-        if (TaskLookupResult.LookUpStatus.TASK_FOUND == 
-          tlr.getLookUpStatus()) {
-          return Collections.singletonList(tlr.getTask());
-        }
+        result.add(tlr.getTask());
       }
     }
-
-    return null;
+    
+    return (result.isEmpty()) ? null : result;
   }
 
   

Modified: hadoop/mapreduce/branches/branch-0.21/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/CapacityTestUtils.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/CapacityTestUtils.java?rev=824757&r1=824756&r2=824757&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/CapacityTestUtils.java
(original)
+++ hadoop/mapreduce/branches/branch-0.21/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/CapacityTestUtils.java
Tue Oct 13 13:44:13 2009
@@ -20,13 +20,13 @@
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -48,6 +48,8 @@
 public class CapacityTestUtils {
   static final Log LOG =
     LogFactory.getLog(org.apache.hadoop.mapred.CapacityTestUtils.class);
+  static final String MAP = "map";
+  static final String REDUCE = "reduce";
 
 
   /**
@@ -160,18 +162,116 @@
     }
   }
 
-
+  /**
+   * The method accepts a attempt string and checks for validity of
+   * assignTask w.r.t attempt string.
+   * 
+   * @param taskTrackerManager
+   * @param scheduler
+   * @param taskTrackerName
+   * @param expectedTaskString
+   * @return
+   * @throws IOException
+   */
   static Task checkAssignment(
     CapacityTestUtils.FakeTaskTrackerManager taskTrackerManager,
     CapacityTaskScheduler scheduler, String taskTrackerName,
     String expectedTaskString) throws IOException {
+    Map<String, String> expectedStrings = new HashMap<String, String>();
+    if (expectedTaskString.contains("_m_")) {
+      expectedStrings.put(MAP, expectedTaskString);
+    } else if (expectedTaskString.contains("_r_")) {
+      expectedStrings.put(REDUCE, expectedTaskString);
+    }
+    List<Task> tasks = checkMultipleTaskAssignment(
+      taskTrackerManager, scheduler, taskTrackerName, expectedStrings);
+    for (Task task : tasks) {
+      if (task.toString().equals(expectedTaskString)) {
+        return task;
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Checks the validity of tasks assigned by scheduler's assignTasks method
+   * According to JIRA:1030 every assignTasks call in CapacityScheduler
+   * would result in either MAP or REDUCE or BOTH.
+   *
+   * This method accepts a Map<String,String>.
+   * The map should always have <=2 entried in hashMap.
+   *
+   * sample calling code .
+   *
+   *  Map<String, String> expectedStrings = new HashMap<String, String>();
+   * ......
+   * .......
+   * expectedStrings.clear();
+   * expectedStrings.put(MAP,"attempt_test_0001_m_000001_0 on tt1");
+   * expectedStrings.put(REDUCE,"attempt_test_0001_r_000001_0 on tt1");
+   * checkMultipleTaskAssignment(
+   *   taskTrackerManager, scheduler, "tt1",
+   *   expectedStrings);
+   * 
+   * @param taskTrackerManager
+   * @param scheduler
+   * @param taskTrackerName
+   * @param expectedTaskStrings
+   * @return
+   * @throws IOException
+   */
+  static List<Task> checkMultipleTaskAssignment(
+    CapacityTestUtils.FakeTaskTrackerManager taskTrackerManager,
+    CapacityTaskScheduler scheduler, String taskTrackerName,
+    Map<String,String> expectedTaskStrings) throws IOException {
+    //Call assign task
     List<Task> tasks = scheduler.assignTasks(
       taskTrackerManager.getTaskTracker(
         taskTrackerName));
-    assertNotNull(expectedTaskString, tasks);
-    assertEquals(expectedTaskString, 1, tasks.size());
-    assertEquals(expectedTaskString, tasks.get(0).toString());
-    return tasks.get(0);
+
+    if (tasks==null) {
+      if (expectedTaskStrings.size() > 0) {
+        fail("Expected some tasks to be assigned, but got none.");  
+      } else {
+        return null;
+      }
+    }
+
+    if (expectedTaskStrings.size() > tasks.size()) {
+      StringBuffer sb = new StringBuffer();
+      sb.append("Expected strings different from actual strings.");
+      sb.append(" Expected string count=").append(expectedTaskStrings.size());
+      sb.append(" Actual string count=").append(tasks.size());
+      sb.append(" Expected strings=");
+      for (String expectedTask : expectedTaskStrings.values()) {
+        sb.append(expectedTask).append(",");
+      }
+      sb.append("Actual strings=");
+      for (Task actualTask : tasks) {
+        sb.append(actualTask.toString()).append(",");
+      }
+      fail(sb.toString());
+    }
+    
+    for (Task task : tasks) {
+      LOG.info("tasks are : " + tasks.toString());
+      if (task.isMapTask()) {
+        //check if expected string is set for map or not.
+        if (expectedTaskStrings.get(MAP) != null) {
+          assertEquals(expectedTaskStrings.get(MAP), task.toString());
+        } else {
+          fail("No map task is expected, but got " + task.toString());
+        }
+      } else {
+        //check if expectedStrings is set for reduce or not.
+        if (expectedTaskStrings.get(REDUCE) != null) {
+          assertEquals(expectedTaskStrings.get(REDUCE), task.toString());
+        } else {
+          fail("No reduce task is expected, but got " + task.toString());
+        }
+      }
+    }
+    return tasks;
   }
 
   static void verifyCapacity(



Mime
View raw message