hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yhema...@apache.org
Subject svn commit: r824750 [2/2] - in /hadoop/mapreduce/trunk: ./ src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/ src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/
Date Tue, 13 Oct 2009 13:28:30 GMT
Modified: hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestContainerQueue.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestContainerQueue.java?rev=824750&r1=824749&r2=824750&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestContainerQueue.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestContainerQueue.java
Tue Oct 13 13:28:29 2009
@@ -23,7 +23,6 @@
 import java.util.HashMap;
 import java.util.Map;
 import java.io.IOException;
-
 import static org.apache.hadoop.mapred.CapacityTestUtils.*;
 
 public class TestContainerQueue extends TestCase {
@@ -120,8 +119,8 @@
 
   }
 
-  public void testMaxCapacity() throws IOException{
-    this.setUp(4,1,1);
+  public void testMaxCapacity() throws IOException {
+    this.setUp(4, 1, 1);
     taskTrackerManager.addJobInProgressListener(scheduler.jobQueuesManager);
 
     AbstractQueue rt = QueueHierarchyBuilder.createRootAbstractQueue();
@@ -131,16 +130,16 @@
     QueueSchedulingContext a2 = new QueueSchedulingContext(
       "R.b", 25, 30, -1, -1, -1);
     QueueSchedulingContext a3 = new QueueSchedulingContext(
-          "R.c", 50, -1, -1, -1, -1);
+      "R.c", 50, -1, -1, -1, -1);
 
 
     //Test for max capacity
     AbstractQueue q = new JobQueue(rt, a1);
     AbstractQueue q1 = new JobQueue(rt, a2);
     AbstractQueue q2 = new JobQueue(rt, a3);
-    scheduler.jobQueuesManager.addQueue((JobQueue)q);
-    scheduler.jobQueuesManager.addQueue((JobQueue)q1);
-    scheduler.jobQueuesManager.addQueue((JobQueue)q2);
+    scheduler.jobQueuesManager.addQueue((JobQueue) q);
+    scheduler.jobQueuesManager.addQueue((JobQueue) q1);
+    scheduler.jobQueuesManager.addQueue((JobQueue) q2);
 
     scheduler.setRoot(rt);
     rt.update(4, 4);
@@ -148,21 +147,37 @@
     scheduler.updateContextInfoForTests();
 
     // submit a job to the second queue
-    taskTrackerManager.submitJobAndInit(JobStatus.PREP, 20, 0, "R.a", "u1");
+    taskTrackerManager.submitJobAndInit(JobStatus.PREP, 20, 20, "R.a", "u1");
 
-    //Queue R.a should not more than 2 slots 
-    checkAssignment(
-      taskTrackerManager, scheduler, "tt1",
+    //Queue R.a should not more than 2 slots
+    Map<String, String> expectedStrings = new HashMap<String, String>();
+    expectedStrings.clear();
+    expectedStrings.put(
+      CapacityTestUtils.MAP,
       "attempt_test_0001_m_000001_0 on tt1");
-
-    checkAssignment(
-      taskTrackerManager, scheduler, "tt2",
-      "attempt_test_0001_m_000002_0 on tt2");
+    expectedStrings.put(
+      CapacityTestUtils.REDUCE,
+      "attempt_test_0001_r_000001_0 on tt1");
+    checkMultipleTaskAssignment(
+      taskTrackerManager, scheduler, "tt1",
+      expectedStrings);
 
     //Now the queue has already reached its max limit no further tasks should
     // be given.
-    List<Task> l = scheduler.assignTasks(taskTrackerManager.getTaskTracker("tt3"));
-    assertNull(l);
+    expectedStrings.clear();
+    expectedStrings.put(
+      CapacityTestUtils.MAP,
+      "attempt_test_0001_m_000002_0 on tt2");
+    expectedStrings.put(
+      CapacityTestUtils.REDUCE,
+      "attempt_test_0001_r_000002_0 on tt2");
+    checkMultipleTaskAssignment(
+      taskTrackerManager, scheduler, "tt2",
+      expectedStrings);
+
+    assertNull(scheduler.assignTasks(
+      taskTrackerManager.getTaskTracker(
+        "tt3")));
 
   }
 
@@ -269,86 +284,160 @@
     scheduler.updateContextInfoForTests();
 
     // verify initial capacity distribution
-    TaskSchedulingContext mapTsc 
-        = map.get("rt.gta").getQueueSchedulingContext().getMapTSC();
+    TaskSchedulingContext mapTsc
+      = map.get("rt.gta").getQueueSchedulingContext().getMapTSC();
     assertEquals(mapTsc.getCapacity(), 3);
-    
+
     mapTsc = map.get("rt.sch").getQueueSchedulingContext().getMapTSC();
     assertEquals(mapTsc.getCapacity(), 5);
-    
+
     mapTsc = map.get("rt.sch.prod").getQueueSchedulingContext().getMapTSC();
     assertEquals(mapTsc.getCapacity(), 4);
 
     mapTsc = map.get("rt.sch.misc").getQueueSchedulingContext().getMapTSC();
     assertEquals(mapTsc.getCapacity(), 1);
 
-    assertUsedCapacity(map, 
-        new String[] {"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"}, 
-        new int[] { 0, 0, 0, 0 });
-    
+    assertUsedCapacity(
+      map,
+      new String[]{"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"},
+      new int[]{0, 0, 0, 0});
+
     //Only Allow job submission to leaf queue
-    taskTrackerManager.submitJobAndInit(JobStatus.PREP, 4, 0, "rt.sch.prod",
-        "u1");
+    taskTrackerManager.submitJobAndInit(
+      JobStatus.PREP, 4, 4, "rt.sch.prod",
+      "u1");
 
     // submit a job to the second queue
-    taskTrackerManager.submitJobAndInit(JobStatus.PREP, 4, 0, "rt.sch.misc",
-        "u1");
+    taskTrackerManager.submitJobAndInit(
+      JobStatus.PREP, 4, 4, "rt.sch.misc",
+      "u1");
 
     //submit a job in gta level queue
-    taskTrackerManager.submitJobAndInit(JobStatus.PREP, 4, 0, "rt.gta", "u1");
+    taskTrackerManager.submitJobAndInit(JobStatus.PREP, 4, 4, "rt.gta", "u1");
 
     int counter = 0;
+    Map<String, String> expectedStrings = new HashMap<String, String>();
+    expectedStrings.clear();
+    expectedStrings.put(
+      CapacityTestUtils.MAP, "attempt_test_0001_m_000001_0 on tt1");
+    expectedStrings.put(
+      CapacityTestUtils.REDUCE, "attempt_test_0001_r_000001_0 on tt1");
+    checkMultipleTaskAssignment(
+      taskTrackerManager, scheduler, "tt1",
+      expectedStrings);
 
-    checkAssignment(taskTrackerManager, scheduler, "tt1",
-                      "attempt_test_0001_m_000001_0 on tt1");
-    assertUsedCapacity(map, 
-        new String[] {"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"}, 
-        new int[] { 0, 1, 1, 0 });
-
-    checkAssignment(taskTrackerManager, scheduler, "tt2",
-      "attempt_test_0003_m_000001_0 on tt2");
-    assertUsedCapacity(map, 
-        new String[] {"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"}, 
-        new int[] { 1, 1, 1, 0 });
-
-    checkAssignment(taskTrackerManager, scheduler, "tt3",
-      "attempt_test_0002_m_000001_0 on tt3");
-    assertUsedCapacity(map, 
-        new String[] {"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"}, 
-        new int[] { 1, 2, 1, 1 });
-
-    checkAssignment(taskTrackerManager, scheduler, "tt4",
-      "attempt_test_0003_m_000002_0 on tt4");
-    assertUsedCapacity(map, 
-        new String[] {"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"}, 
-        new int[] { 2, 2, 1, 1 });
-
-    checkAssignment(taskTrackerManager, scheduler, "tt5",
-      "attempt_test_0001_m_000002_0 on tt5");
-    assertUsedCapacity(map, 
-        new String[] {"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"}, 
-        new int[] { 2, 3, 2, 1 });
-
-    checkAssignment(taskTrackerManager, scheduler, "tt6",
-      "attempt_test_0001_m_000003_0 on tt6");
-    assertUsedCapacity(map, 
-        new String[] {"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"}, 
-        new int[] { 2, 4, 3, 1 });
-
-    checkAssignment(taskTrackerManager, scheduler, "tt7",
-      "attempt_test_0003_m_000003_0 on tt7");
-    assertUsedCapacity(map, 
-        new String[] {"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"}, 
-        new int[] { 3, 4, 3, 1 });
-
-    checkAssignment(taskTrackerManager, scheduler, "tt8",
-      "attempt_test_0001_m_000004_0 on tt8");
-    assertUsedCapacity(map, 
-        new String[] {"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"}, 
-        new int[] { 3, 5, 4, 1 });
+    assertUsedCapacity(
+      map,
+      new String[]{"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"},
+      new int[]{0, 1, 1, 0});
+//============================================
+    expectedStrings.clear();
+    expectedStrings.put(
+      CapacityTestUtils.MAP, "attempt_test_0003_m_000001_0 on tt2");
+    expectedStrings.put(
+      CapacityTestUtils.REDUCE, "attempt_test_0003_r_000001_0 on tt2");
 
-    checkAssignment(taskTrackerManager, scheduler, "tt9",
-      "attempt_test_0002_m_000002_0 on tt9");
+    checkMultipleTaskAssignment(
+      taskTrackerManager, scheduler, "tt2",
+      expectedStrings);
+    assertUsedCapacity(
+      map,
+      new String[]{"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"},
+      new int[]{1, 1, 1, 0});
+//============================================
+    expectedStrings.clear();
+    expectedStrings.put(
+      CapacityTestUtils.MAP, "attempt_test_0002_m_000001_0 on tt3");
+    expectedStrings.put(
+      CapacityTestUtils.REDUCE, "attempt_test_0002_r_000001_0 on tt3");
+
+    checkMultipleTaskAssignment(
+      taskTrackerManager, scheduler, "tt3",
+      expectedStrings);
+    assertUsedCapacity(
+      map,
+      new String[]{"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"},
+      new int[]{1, 2, 1, 1});
+//============================================
+    expectedStrings.clear();
+    expectedStrings.put(
+      CapacityTestUtils.MAP, "attempt_test_0003_m_000002_0 on tt4");
+    expectedStrings.put(
+      CapacityTestUtils.REDUCE, "attempt_test_0003_r_000002_0 on tt4");
+
+    checkMultipleTaskAssignment(
+      taskTrackerManager, scheduler, "tt4",
+      expectedStrings);
+    assertUsedCapacity(
+      map,
+      new String[]{"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"},
+      new int[]{2, 2, 1, 1});
+//============================================
+    expectedStrings.clear();
+    expectedStrings.put(
+      CapacityTestUtils.MAP, "attempt_test_0001_m_000002_0 on tt5");
+    expectedStrings.put(
+      CapacityTestUtils.REDUCE, "attempt_test_0001_r_000002_0 on tt5");
+
+    checkMultipleTaskAssignment(
+      taskTrackerManager, scheduler, "tt5",
+      expectedStrings);
+    assertUsedCapacity(
+      map,
+      new String[]{"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"},
+      new int[]{2, 3, 2, 1});
+//============================================
+    expectedStrings.clear();
+    expectedStrings.put(
+      CapacityTestUtils.MAP, "attempt_test_0001_m_000003_0 on tt6");
+    expectedStrings.put(
+      CapacityTestUtils.REDUCE, "attempt_test_0001_r_000003_0 on tt6");
+
+    checkMultipleTaskAssignment(
+      taskTrackerManager, scheduler, "tt6",
+      expectedStrings);
+    assertUsedCapacity(
+      map,
+      new String[]{"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"},
+      new int[]{2, 4, 3, 1});
+//============================================
+    expectedStrings.clear();
+    expectedStrings.put(
+      CapacityTestUtils.MAP, "attempt_test_0003_m_000003_0 on tt7");
+    expectedStrings.put(
+      CapacityTestUtils.REDUCE, "attempt_test_0003_r_000003_0 on tt7");
+
+    checkMultipleTaskAssignment(
+      taskTrackerManager, scheduler, "tt7",
+      expectedStrings);
+    assertUsedCapacity(
+      map,
+      new String[]{"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"},
+      new int[]{3, 4, 3, 1});
+//============================================
+    expectedStrings.clear();
+    expectedStrings.put(
+      CapacityTestUtils.MAP, "attempt_test_0001_m_000004_0 on tt8");
+    expectedStrings.put(
+      CapacityTestUtils.REDUCE, "attempt_test_0001_r_000004_0 on tt8");
+
+    checkMultipleTaskAssignment(
+      taskTrackerManager, scheduler, "tt8",
+      expectedStrings);
+    assertUsedCapacity(
+      map,
+      new String[]{"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"},
+      new int[]{3, 5, 4, 1});
+//============================================
+    expectedStrings.clear();
+    expectedStrings.put(
+      CapacityTestUtils.MAP, "attempt_test_0002_m_000002_0 on tt9");
+    expectedStrings.put(
+      CapacityTestUtils.REDUCE, "attempt_test_0002_r_000002_0 on tt9");
+
+    checkMultipleTaskAssignment(
+      taskTrackerManager, scheduler, "tt9",
+      expectedStrings);
   }
   
   /**
@@ -373,75 +462,124 @@
     scheduler.updateContextInfoForTests();
 
     // verify capacities as per the setup.
-    TaskSchedulingContext mapTSC 
+    TaskSchedulingContext mapTSC
       = map.get("rt.gta").getQueueSchedulingContext().getMapTSC();
     assertEquals(mapTSC.getCapacity(), 2);
-    
+
     mapTSC = map.get("rt.sch").getQueueSchedulingContext().getMapTSC();
     assertEquals(mapTSC.getCapacity(), 5);
-    
+
     mapTSC = map.get("rt.sch.prod").getQueueSchedulingContext().getMapTSC();
     assertEquals(mapTSC.getCapacity(), 4);
-    
+
     mapTSC = map.get("rt.sch.misc").getQueueSchedulingContext().getMapTSC();
     assertEquals(mapTSC.getCapacity(), 1);
-    
-    assertUsedCapacity(map, 
-        new String[] { "rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc" },
-        new int[] { 0, 0, 0, 0 });
+
+    assertUsedCapacity(
+      map,
+      new String[]{"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"},
+      new int[]{0, 0, 0, 0});
 
     // submit a job to the second queue
-    taskTrackerManager.submitJobAndInit(JobStatus.PREP, 20, 0, "rt.sch.misc",
-        "u1");
+    taskTrackerManager.submitJobAndInit(
+      JobStatus.PREP, 20, 20, "rt.sch.misc",
+      "u1");
 
     //submit a job in gta level queue
-    taskTrackerManager.submitJobAndInit(JobStatus.PREP, 20, 0, "rt.gta", "u1");
+    taskTrackerManager.submitJobAndInit(JobStatus.PREP, 20, 20, "rt.gta", "u1");
     int counter = 0;
+    Map<String, String> expectedStrings = new HashMap<String, String>();
+    expectedStrings.put(MAP, "attempt_test_0001_m_000001_0 on tt1");
+    expectedStrings.put(REDUCE, "attempt_test_0001_r_000001_0 on tt1");
 
-    checkAssignment(taskTrackerManager, scheduler, "tt1",
-                      "attempt_test_0001_m_000001_0 on tt1");
-    assertUsedCapacity(map, 
-        new String[] { "rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc" },
-        new int[] { 0, 1, 0, 1 });
-
-    checkAssignment(taskTrackerManager, scheduler, "tt2",
-      "attempt_test_0002_m_000001_0 on tt2");
-    assertUsedCapacity(map, 
-        new String[] { "rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc" },
-        new int[] { 1, 1, 0, 1 });
-
-    checkAssignment(taskTrackerManager, scheduler, "tt3",
-      "attempt_test_0001_m_000002_0 on tt3");
-    assertUsedCapacity(map, 
-        new String[] { "rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc" },
-        new int[] { 1, 2, 0, 2 });
-
-    checkAssignment(taskTrackerManager, scheduler, "tt4",
-      "attempt_test_0001_m_000003_0 on tt4");
-    assertUsedCapacity(map, 
-        new String[] { "rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc" },
-        new int[] { 1, 3, 0, 3 });
-
-    checkAssignment(taskTrackerManager, scheduler, "tt5",
-      "attempt_test_0002_m_000002_0 on tt5");
-    assertUsedCapacity(map, 
-        new String[] { "rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc" },
-        new int[] { 2, 3, 0, 3 });
-
-    checkAssignment(taskTrackerManager, scheduler, "tt6",
-      "attempt_test_0001_m_000004_0 on tt6");
-    assertUsedCapacity(map, 
-        new String[] { "rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc" },
-        new int[] { 2, 4, 0, 4 });
-
-    checkAssignment(taskTrackerManager, scheduler, "tt7",
-      "attempt_test_0001_m_000005_0 on tt7");
-    assertUsedCapacity(map, 
-        new String[] { "rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc" },
-        new int[] { 2, 5, 0, 5 });
+    checkMultipleTaskAssignment(
+      taskTrackerManager, scheduler, "tt1",
+      expectedStrings);
+    assertUsedCapacity(
+      map,
+      new String[]{"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"},
+      new int[]{0, 1, 0, 1});
+//============================================
+    expectedStrings.clear();
+    expectedStrings.put(MAP, "attempt_test_0002_m_000001_0 on tt2");
+    expectedStrings.put(REDUCE, "attempt_test_0002_r_000001_0 on tt2");
 
-    checkAssignment(taskTrackerManager, scheduler, "tt8",
-      "attempt_test_0001_m_000006_0 on tt8");
+    checkMultipleTaskAssignment(
+      taskTrackerManager, scheduler, "tt2",
+      expectedStrings);
+    assertUsedCapacity(
+      map,
+      new String[]{"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"},
+      new int[]{1, 1, 0, 1});
+//============================================
+    expectedStrings.clear();
+    expectedStrings.put(MAP, "attempt_test_0001_m_000002_0 on tt3");
+    expectedStrings.put(REDUCE, "attempt_test_0001_r_000002_0 on tt3");
+
+    checkMultipleTaskAssignment(
+      taskTrackerManager, scheduler, "tt3",
+      expectedStrings);
+    assertUsedCapacity(
+      map,
+      new String[]{"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"},
+      new int[]{1, 2, 0, 2});
+//============================================
+    expectedStrings.clear();
+    expectedStrings.put(MAP, "attempt_test_0001_m_000003_0 on tt4");
+    expectedStrings.put(REDUCE, "attempt_test_0001_r_000003_0 on tt4");
+
+    checkMultipleTaskAssignment(
+      taskTrackerManager, scheduler, "tt4",
+      expectedStrings);
+    assertUsedCapacity(
+      map,
+      new String[]{"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"},
+      new int[]{1, 3, 0, 3});
+//============================================
+    expectedStrings.clear();
+    expectedStrings.put(MAP, "attempt_test_0002_m_000002_0 on tt5");
+    expectedStrings.put(REDUCE, "attempt_test_0002_r_000002_0 on tt5");
+
+    checkMultipleTaskAssignment(
+      taskTrackerManager, scheduler, "tt5",
+      expectedStrings);
+    assertUsedCapacity(
+      map,
+      new String[]{"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"},
+      new int[]{2, 3, 0, 3});
+//============================================
+    expectedStrings.clear();
+    expectedStrings.put(MAP, "attempt_test_0001_m_000004_0 on tt6");
+    expectedStrings.put(REDUCE, "attempt_test_0001_r_000004_0 on tt6");
+
+    checkMultipleTaskAssignment(
+      taskTrackerManager, scheduler, "tt6",
+      expectedStrings);
+    assertUsedCapacity(
+      map,
+      new String[]{"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"},
+      new int[]{2, 4, 0, 4});
+//============================================
+    expectedStrings.clear();
+    expectedStrings.put(MAP, "attempt_test_0001_m_000005_0 on tt7");
+    expectedStrings.put(REDUCE, "attempt_test_0001_r_000005_0 on tt7");
+
+    checkMultipleTaskAssignment(
+      taskTrackerManager, scheduler, "tt7",
+      expectedStrings);
+    assertUsedCapacity(
+      map,
+      new String[]{"rt.gta", "rt.sch", "rt.sch.prod", "rt.sch.misc"},
+      new int[]{2, 5, 0, 5});
+//============================================
+    expectedStrings.clear();
+    expectedStrings.put(MAP, "attempt_test_0001_m_000006_0 on tt8");
+    expectedStrings.put(REDUCE, "attempt_test_0001_r_000006_0 on tt8");
+
+
+    checkMultipleTaskAssignment(
+      taskTrackerManager, scheduler, "tt8",
+      expectedStrings);
   }
 
   // verify that the number of slots used for each queue

Modified: hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestRefreshOfQueues.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestRefreshOfQueues.java?rev=824750&r1=824749&r2=824750&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestRefreshOfQueues.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestRefreshOfQueues.java
Tue Oct 13 13:28:29 2009
@@ -37,7 +37,7 @@
 import org.apache.hadoop.mapred.CapacityTestUtils.ControlledInitializationPoller;
 import org.apache.hadoop.mapred.CapacityTestUtils.FakeJobInProgress;
 import org.apache.hadoop.mapred.CapacityTestUtils.FakeTaskTrackerManager;
-import static org.apache.hadoop.mapred.CapacityTestUtils.checkAssignment;
+import static org.apache.hadoop.mapred.CapacityTestUtils.*;
 import org.junit.After;
 import org.junit.Test;
 
@@ -201,65 +201,111 @@
     JobQueueInfo[] queues = TestQueueManagerRefresh.getSimpleQueueHierarchy();
 
     queues[0].getProperties().setProperty(
-        CapacitySchedulerConf.CAPACITY_PROPERTY, String.valueOf(100));
+      CapacitySchedulerConf.CAPACITY_PROPERTY, String.valueOf(100));
     queues[1].getProperties().setProperty(
-        CapacitySchedulerConf.CAPACITY_PROPERTY, String.valueOf(50));
+      CapacitySchedulerConf.CAPACITY_PROPERTY, String.valueOf(50));
     queues[2].getProperties().setProperty(
-        CapacitySchedulerConf.CAPACITY_PROPERTY, String.valueOf(50));
+      CapacitySchedulerConf.CAPACITY_PROPERTY, String.valueOf(50));
 
     // write the configuration file
     QueueManagerTestUtils.writeQueueConfigurationFile(
-        queueConfigFile.getAbsolutePath(), new JobQueueInfo[] { queues[0] });
+      queueConfigFile.getAbsolutePath(), new JobQueueInfo[]{queues[0]});
 
     setupAndStartSchedulerFramework(2, 2, 2);
 
     FakeJobInProgress job1 =
-        taskTrackerManager.submitJobAndInit(JobStatus.PREP, 2, 0,
-            queues[1].getQueueName(), "user");
+      taskTrackerManager.submitJobAndInit(
+        JobStatus.PREP, 2, 2,
+        queues[1].getQueueName(), "user");
     FakeJobInProgress job2 =
-        taskTrackerManager.submitJobAndInit(JobStatus.PREP, 2, 0,
-            queues[2].getQueueName(), "user");
-
-    checkAssignment(taskTrackerManager, scheduler, "tt1",
-        "attempt_test_0001_m_000001_0 on tt1");
-    checkAssignment(taskTrackerManager, scheduler, "tt1",
-        "attempt_test_0002_m_000001_0 on tt1");
-    checkAssignment(taskTrackerManager, scheduler, "tt2",
-        "attempt_test_0002_m_000002_0 on tt2");
-    checkAssignment(taskTrackerManager, scheduler, "tt2",
-        "attempt_test_0001_m_000002_0 on tt2");
+      taskTrackerManager.submitJobAndInit(
+        JobStatus.PREP, 2, 2,
+        queues[2].getQueueName(), "user");
+
+    Map<String, String> expectedStrings = new HashMap<String, String>();
+    expectedStrings.put(MAP, "attempt_test_0001_m_000001_0 on tt1");
+    expectedStrings.put(REDUCE, "attempt_test_0001_r_000001_0 on tt1");
+    checkMultipleTaskAssignment(
+      taskTrackerManager, scheduler, "tt1",
+      expectedStrings);
+//===========================================
+    expectedStrings.clear();
+    expectedStrings.put(MAP, "attempt_test_0002_m_000001_0 on tt1");
+    expectedStrings.put(REDUCE, "attempt_test_0002_r_000001_0 on tt1");
+    checkMultipleTaskAssignment(
+      taskTrackerManager, scheduler, "tt1",
+      expectedStrings);
+//============================================
+    expectedStrings.clear();
+    expectedStrings.put(MAP, "attempt_test_0002_m_000002_0 on tt2");
+    expectedStrings.put(REDUCE, "attempt_test_0002_r_000002_0 on tt2");
+    checkMultipleTaskAssignment(
+      taskTrackerManager, scheduler, "tt2",
+      expectedStrings);
+//============================================
+    expectedStrings.clear();
+    expectedStrings.put(MAP, "attempt_test_0001_m_000002_0 on tt2");
+    expectedStrings.put(REDUCE, "attempt_test_0001_r_000002_0 on tt2");
+    checkMultipleTaskAssignment(
+      taskTrackerManager, scheduler, "tt2",
+      expectedStrings);
 
     taskTrackerManager.killJob(job1.getJobID());
     taskTrackerManager.killJob(job2.getJobID());
 
     // change configuration
     queues[1].getProperties().setProperty(
-        CapacitySchedulerConf.CAPACITY_PROPERTY, String.valueOf(25));
+      CapacitySchedulerConf.CAPACITY_PROPERTY, String.valueOf(25));
     queues[2].getProperties().setProperty(
-        CapacitySchedulerConf.CAPACITY_PROPERTY, String.valueOf(75));
+      CapacitySchedulerConf.CAPACITY_PROPERTY, String.valueOf(75));
 
     // Re-write the configuration file
     QueueManagerTestUtils.writeQueueConfigurationFile(
-        queueConfigFile.getAbsolutePath(), new JobQueueInfo[] { queues[0] });
+      queueConfigFile.getAbsolutePath(), new JobQueueInfo[]{queues[0]});
 
-    taskTrackerManager.getQueueManager().refreshQueues(null,
-        scheduler.getQueueRefresher());
+    taskTrackerManager.getQueueManager().refreshQueues(
+      null,
+      scheduler.getQueueRefresher());
 
     job1 =
-        taskTrackerManager.submitJobAndInit(JobStatus.PREP, 2, 0,
-            queues[1].getQueueName(), "user");
+      taskTrackerManager.submitJobAndInit(
+        JobStatus.PREP, 2, 2,
+        queues[1].getQueueName(), "user");
     job2 =
-        taskTrackerManager.submitJobAndInit(JobStatus.PREP, 4, 0,
-            queues[2].getQueueName(), "user");
+      taskTrackerManager.submitJobAndInit(
+        JobStatus.PREP, 4, 4,
+        queues[2].getQueueName(), "user");
+
+    expectedStrings.clear();
+    expectedStrings.put(MAP, "attempt_test_0003_m_000001_0 on tt1");
+    expectedStrings.put(REDUCE, "attempt_test_0003_r_000001_0 on tt1");
+    checkMultipleTaskAssignment(
+      taskTrackerManager, scheduler, "tt1",
+      expectedStrings);
+
+
+    expectedStrings.clear();
+    expectedStrings.put(MAP, "attempt_test_0004_m_000001_0 on tt1");
+    expectedStrings.put(REDUCE, "attempt_test_0004_r_000001_0 on tt1");
+    checkMultipleTaskAssignment(
+      taskTrackerManager, scheduler, "tt1",
+      expectedStrings);
+
+
+    expectedStrings.clear();
+    expectedStrings.put(MAP, "attempt_test_0004_m_000002_0 on tt2");
+    expectedStrings.put(REDUCE, "attempt_test_0004_r_000002_0 on tt2");
+    checkMultipleTaskAssignment(
+      taskTrackerManager, scheduler, "tt2",
+      expectedStrings);
+
+    expectedStrings.clear();
+    expectedStrings.put(MAP, "attempt_test_0004_m_000003_0 on tt2");
+    expectedStrings.put(REDUCE, "attempt_test_0004_r_000003_0 on tt2");
+    checkMultipleTaskAssignment(
+      taskTrackerManager, scheduler, "tt2",
+      expectedStrings);
 
-    checkAssignment(taskTrackerManager, scheduler, "tt1",
-        "attempt_test_0003_m_000001_0 on tt1");
-    checkAssignment(taskTrackerManager, scheduler, "tt1",
-        "attempt_test_0004_m_000001_0 on tt1");
-    checkAssignment(taskTrackerManager, scheduler, "tt2",
-        "attempt_test_0004_m_000002_0 on tt2");
-    checkAssignment(taskTrackerManager, scheduler, "tt2",
-        "attempt_test_0004_m_000003_0 on tt2");
   }
 
   /**
@@ -336,60 +382,84 @@
     JobQueueInfo[] queues = TestQueueManagerRefresh.getSimpleQueueHierarchy();
 
     queues[0].getProperties().setProperty(
-        CapacitySchedulerConf.CAPACITY_PROPERTY, String.valueOf(100));
+      CapacitySchedulerConf.CAPACITY_PROPERTY, String.valueOf(100));
     queues[1].getProperties().setProperty(
-        CapacitySchedulerConf.CAPACITY_PROPERTY, String.valueOf(50));
+      CapacitySchedulerConf.CAPACITY_PROPERTY, String.valueOf(50));
     queues[2].getProperties().setProperty(
-        CapacitySchedulerConf.CAPACITY_PROPERTY, String.valueOf(50));
+      CapacitySchedulerConf.CAPACITY_PROPERTY, String.valueOf(50));
 
     queues[2].getProperties().setProperty(
-        CapacitySchedulerConf.MINIMUM_USER_LIMIT_PERCENT_PROPERTY,
-        String.valueOf(100));
+      CapacitySchedulerConf.MINIMUM_USER_LIMIT_PERCENT_PROPERTY,
+      String.valueOf(100));
 
     // write the configuration file
     QueueManagerTestUtils.writeQueueConfigurationFile(
-        queueConfigFile.getAbsolutePath(), new JobQueueInfo[] { queues[0] });
+      queueConfigFile.getAbsolutePath(), new JobQueueInfo[]{queues[0]});
 
     setupAndStartSchedulerFramework(1, 2, 2);
 
     FakeJobInProgress job1 =
-        taskTrackerManager.submitJobAndInit(JobStatus.PREP, 2, 0,
-            queues[2].getQueueName(), "user1");
+      taskTrackerManager.submitJobAndInit(
+        JobStatus.PREP, 2, 2,
+        queues[2].getQueueName(), "user1");
     FakeJobInProgress job2 =
-        taskTrackerManager.submitJobAndInit(JobStatus.PREP, 2, 0,
-            queues[2].getQueueName(), "user2");
+      taskTrackerManager.submitJobAndInit(
+        JobStatus.PREP, 2, 2,
+        queues[2].getQueueName(), "user2");
+
+    Map<String, String> expectedStrings = new HashMap<String, String>();
+    expectedStrings.put(MAP, "attempt_test_0001_m_000001_0 on tt1");
+    expectedStrings.put(REDUCE, "attempt_test_0001_r_000001_0 on tt1");
+    checkMultipleTaskAssignment(
+      taskTrackerManager, scheduler, "tt1",
+      expectedStrings);
+    
+    expectedStrings.clear();
+    expectedStrings.put(MAP, "attempt_test_0001_m_000002_0 on tt1");
+    expectedStrings.put(REDUCE, "attempt_test_0001_r_000002_0 on tt1");
+    checkMultipleTaskAssignment(
+      taskTrackerManager, scheduler, "tt1",
+      expectedStrings);
 
-    checkAssignment(taskTrackerManager, scheduler, "tt1",
-        "attempt_test_0001_m_000001_0 on tt1");
-    checkAssignment(taskTrackerManager, scheduler, "tt1",
-        "attempt_test_0001_m_000002_0 on tt1");
     assertNull(scheduler.assignTasks(taskTrackerManager.getTaskTracker("tt1")));
     taskTrackerManager.killJob(job1.getJobID());
     taskTrackerManager.killJob(job2.getJobID());
 
     // change configuration
     queues[2].getProperties().setProperty(
-        CapacitySchedulerConf.MINIMUM_USER_LIMIT_PERCENT_PROPERTY,
-        String.valueOf(50));
+      CapacitySchedulerConf.MINIMUM_USER_LIMIT_PERCENT_PROPERTY,
+      String.valueOf(50));
 
     // Re-write the configuration file
     QueueManagerTestUtils.writeQueueConfigurationFile(
-        queueConfigFile.getAbsolutePath(), new JobQueueInfo[] { queues[0] });
+      queueConfigFile.getAbsolutePath(), new JobQueueInfo[]{queues[0]});
 
-    taskTrackerManager.getQueueManager().refreshQueues(null,
-        scheduler.getQueueRefresher());
+    taskTrackerManager.getQueueManager().refreshQueues(
+      null,
+      scheduler.getQueueRefresher());
 
     job1 =
-        taskTrackerManager.submitJobAndInit(JobStatus.PREP, 2, 0,
-            queues[1].getQueueName(), "user1");
+      taskTrackerManager.submitJobAndInit(
+        JobStatus.PREP, 2, 2,
+        queues[1].getQueueName(), "user1");
     job2 =
-        taskTrackerManager.submitJobAndInit(JobStatus.PREP, 2, 0,
-            queues[2].getQueueName(), "user2");
-
-    checkAssignment(taskTrackerManager, scheduler, "tt1",
-        "attempt_test_0003_m_000001_0 on tt1");
-    checkAssignment(taskTrackerManager, scheduler, "tt1",
-        "attempt_test_0004_m_000001_0 on tt1");
+      taskTrackerManager.submitJobAndInit(
+        JobStatus.PREP, 2, 2,
+        queues[2].getQueueName(), "user2");
+
+    expectedStrings.clear();
+    expectedStrings.put(MAP, "attempt_test_0003_m_000001_0 on tt1");
+    expectedStrings.put(REDUCE, "attempt_test_0003_r_000001_0 on tt1");
+    checkMultipleTaskAssignment(
+      taskTrackerManager, scheduler, "tt1",
+      expectedStrings);
+
+    expectedStrings.clear();
+    expectedStrings.put(MAP, "attempt_test_0004_m_000001_0 on tt1");
+    expectedStrings.put(REDUCE, "attempt_test_0004_r_000001_0 on tt1");
+    checkMultipleTaskAssignment(
+      taskTrackerManager, scheduler, "tt1",
+      expectedStrings);
   }
 
   /**



Mime
View raw message