hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject svn commit: r830230 [2/9] - in /hadoop/mapreduce/branches/HDFS-641: ./ .eclipse.templates/ conf/ ivy/ lib/ src/c++/ src/contrib/ src/contrib/capacity-scheduler/ src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/ src/contrib/capacity-sche...
Date Tue, 27 Oct 2009 15:44:06 GMT
Modified: hadoop/mapreduce/branches/HDFS-641/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/CapacityTestUtils.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/CapacityTestUtils.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/CapacityTestUtils.java
(original)
+++ hadoop/mapreduce/branches/HDFS-641/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/CapacityTestUtils.java
Tue Oct 27 15:43:58 2009
@@ -20,13 +20,13 @@
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -48,6 +48,8 @@
 public class CapacityTestUtils {
   static final Log LOG =
     LogFactory.getLog(org.apache.hadoop.mapred.CapacityTestUtils.class);
+  static final String MAP = "map";
+  static final String REDUCE = "reduce";
 
 
   /**
@@ -160,18 +162,116 @@
     }
   }
 
-
+  /**
+   * The method accepts a attempt string and checks for validity of
+   * assignTask w.r.t attempt string.
+   * 
+   * @param taskTrackerManager
+   * @param scheduler
+   * @param taskTrackerName
+   * @param expectedTaskString
+   * @return
+   * @throws IOException
+   */
   static Task checkAssignment(
     CapacityTestUtils.FakeTaskTrackerManager taskTrackerManager,
     CapacityTaskScheduler scheduler, String taskTrackerName,
     String expectedTaskString) throws IOException {
+    Map<String, String> expectedStrings = new HashMap<String, String>();
+    if (expectedTaskString.contains("_m_")) {
+      expectedStrings.put(MAP, expectedTaskString);
+    } else if (expectedTaskString.contains("_r_")) {
+      expectedStrings.put(REDUCE, expectedTaskString);
+    }
+    List<Task> tasks = checkMultipleTaskAssignment(
+      taskTrackerManager, scheduler, taskTrackerName, expectedStrings);
+    for (Task task : tasks) {
+      if (task.toString().equals(expectedTaskString)) {
+        return task;
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Checks the validity of tasks assigned by scheduler's assignTasks method
+   * According to JIRA:1030 every assignTasks call in CapacityScheduler
+   * would result in either MAP or REDUCE or BOTH.
+   *
+   * This method accepts a Map<String,String>.
+   * The map should always have <=2 entried in hashMap.
+   *
+   * sample calling code .
+   *
+   *  Map<String, String> expectedStrings = new HashMap<String, String>();
+   * ......
+   * .......
+   * expectedStrings.clear();
+   * expectedStrings.put(MAP,"attempt_test_0001_m_000001_0 on tt1");
+   * expectedStrings.put(REDUCE,"attempt_test_0001_r_000001_0 on tt1");
+   * checkMultipleTaskAssignment(
+   *   taskTrackerManager, scheduler, "tt1",
+   *   expectedStrings);
+   * 
+   * @param taskTrackerManager
+   * @param scheduler
+   * @param taskTrackerName
+   * @param expectedTaskStrings
+   * @return
+   * @throws IOException
+   */
+  static List<Task> checkMultipleTaskAssignment(
+    CapacityTestUtils.FakeTaskTrackerManager taskTrackerManager,
+    CapacityTaskScheduler scheduler, String taskTrackerName,
+    Map<String,String> expectedTaskStrings) throws IOException {
+    //Call assign task
     List<Task> tasks = scheduler.assignTasks(
       taskTrackerManager.getTaskTracker(
         taskTrackerName));
-    assertNotNull(expectedTaskString, tasks);
-    assertEquals(expectedTaskString, 1, tasks.size());
-    assertEquals(expectedTaskString, tasks.get(0).toString());
-    return tasks.get(0);
+
+    if (tasks==null || tasks.isEmpty()) {
+      if (expectedTaskStrings.size() > 0) {
+        fail("Expected some tasks to be assigned, but got none.");  
+      } else {
+        return null;
+      }
+    }
+
+    if (expectedTaskStrings.size() > tasks.size()) {
+      StringBuffer sb = new StringBuffer();
+      sb.append("Expected strings different from actual strings.");
+      sb.append(" Expected string count=").append(expectedTaskStrings.size());
+      sb.append(" Actual string count=").append(tasks.size());
+      sb.append(" Expected strings=");
+      for (String expectedTask : expectedTaskStrings.values()) {
+        sb.append(expectedTask).append(",");
+      }
+      sb.append("Actual strings=");
+      for (Task actualTask : tasks) {
+        sb.append(actualTask.toString()).append(",");
+      }
+      fail(sb.toString());
+    }
+    
+    for (Task task : tasks) {
+      LOG.info("tasks are : " + tasks.toString());
+      if (task.isMapTask()) {
+        //check if expected string is set for map or not.
+        if (expectedTaskStrings.get(MAP) != null) {
+          assertEquals(expectedTaskStrings.get(MAP), task.toString());
+        } else {
+          fail("No map task is expected, but got " + task.toString());
+        }
+      } else {
+        //check if expectedStrings is set for reduce or not.
+        if (expectedTaskStrings.get(REDUCE) != null) {
+          assertEquals(expectedTaskStrings.get(REDUCE), task.toString());
+        } else {
+          fail("No reduce task is expected, but got " + task.toString());
+        }
+      }
+    }
+    return tasks;
   }
 
   static void verifyCapacity(
@@ -207,8 +307,9 @@
 
     public FakeJobInProgress(
       JobID jId, JobConf jobConf,
-      FakeTaskTrackerManager taskTrackerManager, String user) {
-      super(jId, jobConf, null);
+      FakeTaskTrackerManager taskTrackerManager, String user, 
+      JobTracker jt) throws IOException {
+      super(jId, jobConf, jt);
       this.taskTrackerManager = taskTrackerManager;
       this.startTime = System.currentTimeMillis();
       this.status = new JobStatus(
@@ -372,8 +473,9 @@
 
     public FakeFailingJobInProgress(
       JobID id, JobConf jobConf,
-      FakeTaskTrackerManager taskTrackerManager, String user) {
-      super(id, jobConf, taskTrackerManager, user);
+      FakeTaskTrackerManager taskTrackerManager, String user, 
+      JobTracker jt) throws IOException {
+      super(id, jobConf, taskTrackerManager, user, jt);
     }
 
     @Override
@@ -686,7 +788,7 @@
       FakeJobInProgress job =
           new FakeJobInProgress(new JobID("test", ++jobCounter),
               (jobConf == null ? new JobConf(defaultJobConf) : jobConf), this,
-              jobConf.getUser());
+              jobConf.getUser(), UtilsForTests.getJobTracker());
       job.getStatus().setRunState(state);
       this.submitJob(job);
       return job;
@@ -855,6 +957,8 @@
         Properties p = new Properties();
         p.setProperty(CapacitySchedulerConf.CAPACITY_PROPERTY,
             String.valueOf(q.capacity));
+        p.setProperty(CapacitySchedulerConf.MAX_CAPACITY_PROPERTY,
+            String.valueOf(q.maxCapacity));
         p.setProperty(CapacitySchedulerConf.SUPPORTS_PRIORITY_PROPERTY,
             String.valueOf(q.supportsPrio));
         p.setProperty(
@@ -886,6 +990,7 @@
   static class FakeQueueInfo {
     String queueName;
     float capacity;
+    float maxCapacity = -1.0f;
     boolean supportsPrio;
     int ulMin;
 



Mime
View raw message