hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ste...@apache.org
Subject svn commit: r885145 [8/34] - in /hadoop/mapreduce/branches/MAPREDUCE-233: ./ .eclipse.templates/ .eclipse.templates/.launches/ conf/ ivy/ lib/ src/benchmarks/gridmix/ src/benchmarks/gridmix/pipesort/ src/benchmarks/gridmix2/ src/benchmarks/gridmix2/src...
Date Sat, 28 Nov 2009 20:26:22 GMT
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java Sat Nov 28 20:26:01 2009
@@ -38,10 +38,12 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.mapred.FairScheduler.JobInfo;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapred.FakeObjectUtilities.FakeJobHistory;
+import org.apache.hadoop.mapred.UtilsForTests.FakeClock;
 import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.hadoop.net.Node;
 import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
-import org.apache.hadoop.mapred.UtilsForTests.FakeClock;
+import org.apache.hadoop.net.Node;
 
 public class TestFairScheduler extends TestCase {
   final static String TEST_DIR = new File(System.getProperty("test.build.data",
@@ -50,6 +52,7 @@
       "test-pools").getAbsolutePath();
   
   private static final String POOL_PROPERTY = "pool";
+  private static final String EXPLICIT_POOL_PROPERTY = "mapred.fairscheduler.pool";
   
   private static int jobCounter;
   
@@ -62,8 +65,8 @@
     
     public FakeJobInProgress(JobConf jobConf,
         FakeTaskTrackerManager taskTrackerManager, 
-        String[][] mapInputLocations) throws IOException {
-      super(new JobID("test", ++jobCounter), jobConf, null);
+        String[][] mapInputLocations, JobTracker jt) throws IOException {
+      super(new JobID("test", ++jobCounter), jobConf, jt);
       this.taskTrackerManager = taskTrackerManager;
       this.mapInputLocations = mapInputLocations;
       this.startTime = System.currentTimeMillis();
@@ -74,6 +77,7 @@
       this.runningMapCache = new IdentityHashMap<Node, Set<TaskInProgress>>();
       this.nonRunningReduces = new LinkedList<TaskInProgress>();   
       this.runningReduces = new LinkedHashSet<TaskInProgress>();
+      this.jobHistory = new FakeJobHistory();
       initTasks();
     }
     
@@ -228,7 +232,7 @@
     // Constructor for map
     FakeTaskInProgress(JobID jId, int id, JobConf jobConf,
         FakeJobInProgress job, String[] inputLocations) {
-      super(jId, "", new JobClient.RawSplit(), null, jobConf, job, id, 1);
+      super(jId, "", new Job.RawSplit(), null, jobConf, job, id, 1);
       this.isMap = true;
       this.fakeJob = job;
       this.inputLocations = inputLocations;
@@ -306,7 +310,7 @@
     void setQueues(Set<String> queues) {
       this.queues = queues;
     }
-    public synchronized Set<String> getQueues() {
+    public synchronized Set<String> getLeafQueueNames() {
       return queues;
     }
   }
@@ -533,7 +537,7 @@
     if (pool != null)
       jobConf.set(POOL_PROPERTY, pool);
     JobInProgress job = new FakeJobInProgress(jobConf, taskTrackerManager,
-        mapInputLocations);
+        mapInputLocations, UtilsForTests.getJobTracker());
     job.getStatus().setRunState(state);
     taskTrackerManager.submitJob(job);
     job.startTime = clock.time;
@@ -679,8 +683,8 @@
     
     // Assign tasks and check that jobs alternate in filling slots
     checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0002_m_000000_0 on tt1");
     checkAssignment("tt1", "attempt_test_0001_r_000000_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0002_m_000000_0 on tt1");
     checkAssignment("tt1", "attempt_test_0002_r_000000_0 on tt1");
     checkAssignment("tt2", "attempt_test_0001_m_000001_0 on tt2");
     checkAssignment("tt2", "attempt_test_0002_r_000001_0 on tt2");
@@ -738,8 +742,8 @@
     
     // Assign tasks and check that jobs alternate in filling slots
     checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1",
-                           "attempt_test_0002_m_000000_0 on tt1",
                            "attempt_test_0001_r_000000_0 on tt1",
+                           "attempt_test_0002_m_000000_0 on tt1",
                            "attempt_test_0002_r_000000_0 on tt1");
     checkAssignment("tt2", "attempt_test_0001_m_000001_0 on tt2",
                            "attempt_test_0002_r_000001_0 on tt2");
@@ -799,12 +803,12 @@
     
     // Check that tasks are filled alternately by the jobs
     checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0002_m_000000_0 on tt1");
     checkAssignment("tt1", "attempt_test_0001_r_000000_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0002_m_000000_0 on tt1");
     checkAssignment("tt1", "attempt_test_0002_r_000000_0 on tt1");
     checkAssignment("tt2", "attempt_test_0001_m_000001_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2");
     checkAssignment("tt2", "attempt_test_0001_r_000001_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2");
     checkAssignment("tt2", "attempt_test_0002_r_000001_0 on tt2");
     
     // Check that no new tasks can be launched once the tasktrackers are full
@@ -842,12 +846,12 @@
 
     // Check that tasks are filled alternately by the jobs
     checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0002_m_000002_0 on tt1");
     checkAssignment("tt1", "attempt_test_0001_r_000002_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0002_m_000002_0 on tt1");
     checkAssignment("tt1", "attempt_test_0002_r_000002_0 on tt1");
     checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0002_m_000003_0 on tt2");
     checkAssignment("tt2", "attempt_test_0001_r_000003_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0002_m_000003_0 on tt2");
     checkAssignment("tt2", "attempt_test_0002_r_000003_0 on tt2");
     
     // Check scheduler variables; the demands should now be 8 because 2 tasks
@@ -907,12 +911,12 @@
     
     // Check that tasks are filled alternately by the jobs
     checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1",
-                           "attempt_test_0002_m_000000_0 on tt1",
                            "attempt_test_0001_r_000000_0 on tt1",
+                           "attempt_test_0002_m_000000_0 on tt1",
                            "attempt_test_0002_r_000000_0 on tt1");
     checkAssignment("tt2", "attempt_test_0001_m_000001_0 on tt2",
-                           "attempt_test_0002_m_000001_0 on tt2",
                            "attempt_test_0001_r_000001_0 on tt2",
+                           "attempt_test_0002_m_000001_0 on tt2",
                            "attempt_test_0002_r_000001_0 on tt2");
     
     // Check that no new tasks can be launched once the tasktrackers are full
@@ -950,12 +954,12 @@
 
     // Check that tasks are filled alternately by the jobs
     checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1",
-                           "attempt_test_0002_m_000002_0 on tt1",
                            "attempt_test_0001_r_000002_0 on tt1",
+                           "attempt_test_0002_m_000002_0 on tt1",
                            "attempt_test_0002_r_000002_0 on tt1");
     checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2",
-                           "attempt_test_0002_m_000003_0 on tt2",
                            "attempt_test_0001_r_000003_0 on tt2",
+                           "attempt_test_0002_m_000003_0 on tt2",
                            "attempt_test_0002_r_000003_0 on tt2");
     
     // Check scheduler variables; the demands should now be 8 because 2 tasks
@@ -1012,16 +1016,16 @@
     // type should be handed out alternately to 1, 2, 2, 1, 2, 2, etc.
     System.out.println("HEREEEE");
     checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0002_m_000000_0 on tt1");
     checkAssignment("tt1", "attempt_test_0001_r_000000_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0002_m_000000_0 on tt1");
     checkAssignment("tt1", "attempt_test_0002_r_000000_0 on tt1");
     checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0001_m_000001_0 on tt2");
     checkAssignment("tt2", "attempt_test_0002_r_000001_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_m_000001_0 on tt2");
     checkAssignment("tt2", "attempt_test_0001_r_000001_0 on tt2");
     checkAssignment("tt3", "attempt_test_0002_m_000002_0 on tt3");
-    checkAssignment("tt3", "attempt_test_0002_m_000003_0 on tt3");
     checkAssignment("tt3", "attempt_test_0002_r_000002_0 on tt3");
+    checkAssignment("tt3", "attempt_test_0002_m_000003_0 on tt3");
     checkAssignment("tt3", "attempt_test_0002_r_000003_0 on tt3");
   }
   
@@ -1097,12 +1101,12 @@
     
     // Assign tasks and check that slots are first given to needy jobs
     checkAssignment("tt1", "attempt_test_0002_m_000000_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0003_m_000000_0 on tt1");
     checkAssignment("tt1", "attempt_test_0002_r_000000_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0003_m_000000_0 on tt1");
     checkAssignment("tt1", "attempt_test_0003_r_000000_0 on tt1");
     checkAssignment("tt2", "attempt_test_0003_m_000001_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0001_m_000000_0 on tt2");
     checkAssignment("tt2", "attempt_test_0002_r_000001_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_m_000000_0 on tt2");
     checkAssignment("tt2", "attempt_test_0001_r_000000_0 on tt2");
   }
 
@@ -1177,12 +1181,12 @@
     // Assign tasks and check that slots are first given to needy jobs, but
     // that job 1 gets two tasks after due to having a larger share.
     checkAssignment("tt1", "attempt_test_0002_m_000000_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0003_m_000000_0 on tt1");
     checkAssignment("tt1", "attempt_test_0002_r_000000_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0003_m_000000_0 on tt1");
     checkAssignment("tt1", "attempt_test_0003_r_000000_0 on tt1");
     checkAssignment("tt2", "attempt_test_0001_m_000000_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0001_m_000001_0 on tt2");
     checkAssignment("tt2", "attempt_test_0001_r_000000_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_m_000001_0 on tt2");
     checkAssignment("tt2", "attempt_test_0001_r_000001_0 on tt2");
   }
   
@@ -1253,12 +1257,12 @@
     // Assign tasks and check that slots are first given to needy jobs, but
     // that job 1 gets two tasks after due to having a larger share.
     checkAssignment("tt1", "attempt_test_0002_m_000000_0 on tt1",
-                           "attempt_test_0003_m_000000_0 on tt1",
                            "attempt_test_0002_r_000000_0 on tt1",
+                           "attempt_test_0003_m_000000_0 on tt1",
                            "attempt_test_0003_r_000000_0 on tt1");
     checkAssignment("tt2", "attempt_test_0001_m_000000_0 on tt2",
-                           "attempt_test_0001_m_000001_0 on tt2",
                            "attempt_test_0001_r_000000_0 on tt2",
+                           "attempt_test_0001_m_000001_0 on tt2",
                            "attempt_test_0001_r_000001_0 on tt2");
   }
   
@@ -1305,12 +1309,12 @@
     
     // Assign tasks and check that slots are first given to needy jobs
     checkAssignment("tt1", "attempt_test_0002_m_000000_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1");
     checkAssignment("tt1", "attempt_test_0002_r_000000_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1");
     checkAssignment("tt1", "attempt_test_0001_r_000000_0 on tt1");
     checkAssignment("tt2", "attempt_test_0001_m_000001_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2");
     checkAssignment("tt2", "attempt_test_0001_r_000001_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2");
     checkAssignment("tt2", "attempt_test_0001_r_000002_0 on tt2");
   }
   
@@ -1357,13 +1361,13 @@
     
     // Assign tasks and check that only jobs 1 and 2 get them
     checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0002_m_000000_0 on tt1");
     checkAssignment("tt1", "attempt_test_0001_r_000000_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0002_m_000000_0 on tt1");
     checkAssignment("tt1", "attempt_test_0002_r_000000_0 on tt1");
     advanceTime(100);
     checkAssignment("tt2", "attempt_test_0001_m_000001_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2");
     checkAssignment("tt2", "attempt_test_0001_r_000001_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2");
     checkAssignment("tt2", "attempt_test_0002_r_000001_0 on tt2");
   }
 
@@ -1387,19 +1391,19 @@
     // Submit jobs, advancing time in-between to make sure that they are
     // all submitted at distinct times.
     JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10);
-    job1.getJobConf().set("user.name", "user1");
+    job1.getJobConf().set(JobContext.USER_NAME, "user1");
     JobInfo info1 = scheduler.infos.get(job1);
     advanceTime(10);
     JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 10);
-    job2.getJobConf().set("user.name", "user1");
+    job2.getJobConf().set(JobContext.USER_NAME, "user1");
     JobInfo info2 = scheduler.infos.get(job2);
     advanceTime(10);
     JobInProgress job3 = submitJob(JobStatus.RUNNING, 10, 10);
-    job3.getJobConf().set("user.name", "user2");
+    job3.getJobConf().set(JobContext.USER_NAME, "user2");
     JobInfo info3 = scheduler.infos.get(job3);
     advanceTime(10);
     JobInProgress job4 = submitJob(JobStatus.RUNNING, 10, 10);
-    job4.getJobConf().set("user.name", "user2");
+    job4.getJobConf().set(JobContext.USER_NAME, "user2");
     JobInfo info4 = scheduler.infos.get(job4);
     
     // Check scheduler variables
@@ -1414,13 +1418,13 @@
     
     // Assign tasks and check that slots are given only to jobs 1, 3 and 4
     checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0003_m_000000_0 on tt1");
     checkAssignment("tt1", "attempt_test_0001_r_000000_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0003_m_000000_0 on tt1");
     checkAssignment("tt1", "attempt_test_0003_r_000000_0 on tt1");
     advanceTime(100);
     checkAssignment("tt2", "attempt_test_0004_m_000000_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0001_m_000001_0 on tt2");
     checkAssignment("tt2", "attempt_test_0004_r_000000_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_m_000001_0 on tt2");
     checkAssignment("tt2", "attempt_test_0001_r_000001_0 on tt2");
   }
   
@@ -1453,49 +1457,49 @@
     
     // Two jobs for user1; only one should get to run
     JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10);
-    job1.getJobConf().set("user.name", "user1");
+    job1.getJobConf().set(JobContext.USER_NAME, "user1");
     JobInfo info1 = scheduler.infos.get(job1);
     advanceTime(10);
     JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 10);
-    job2.getJobConf().set("user.name", "user1");
+    job2.getJobConf().set(JobContext.USER_NAME, "user1");
     JobInfo info2 = scheduler.infos.get(job2);
     advanceTime(10);
     
     // Three jobs for user2; all should get to run
     JobInProgress job3 = submitJob(JobStatus.RUNNING, 10, 10);
-    job3.getJobConf().set("user.name", "user2");
+    job3.getJobConf().set(JobContext.USER_NAME, "user2");
     JobInfo info3 = scheduler.infos.get(job3);
     advanceTime(10);
     JobInProgress job4 = submitJob(JobStatus.RUNNING, 10, 10);
-    job4.getJobConf().set("user.name", "user2");
+    job4.getJobConf().set(JobContext.USER_NAME, "user2");
     JobInfo info4 = scheduler.infos.get(job4);
     advanceTime(10);
     JobInProgress job5 = submitJob(JobStatus.RUNNING, 10, 10);
-    job5.getJobConf().set("user.name", "user2");
+    job5.getJobConf().set(JobContext.USER_NAME, "user2");
     JobInfo info5 = scheduler.infos.get(job5);
     advanceTime(10);
     
     // Three jobs for user3; only two should get to run
     JobInProgress job6 = submitJob(JobStatus.RUNNING, 10, 10);
-    job6.getJobConf().set("user.name", "user3");
+    job6.getJobConf().set(JobContext.USER_NAME, "user3");
     JobInfo info6 = scheduler.infos.get(job6);
     advanceTime(10);
     JobInProgress job7 = submitJob(JobStatus.RUNNING, 10, 10);
-    job7.getJobConf().set("user.name", "user3");
+    job7.getJobConf().set(JobContext.USER_NAME, "user3");
     JobInfo info7 = scheduler.infos.get(job7);
     advanceTime(10);
     JobInProgress job8 = submitJob(JobStatus.RUNNING, 10, 10);
-    job8.getJobConf().set("user.name", "user3");
+    job8.getJobConf().set(JobContext.USER_NAME, "user3");
     JobInfo info8 = scheduler.infos.get(job8);
     advanceTime(10);
     
     // Two jobs for user4, in poolA; only one should get to run
     JobInProgress job9 = submitJob(JobStatus.RUNNING, 10, 10, "poolA");
-    job9.getJobConf().set("user.name", "user4");
+    job9.getJobConf().set(JobContext.USER_NAME, "user4");
     JobInfo info9 = scheduler.infos.get(job9);
     advanceTime(10);
     JobInProgress job10 = submitJob(JobStatus.RUNNING, 10, 10, "poolA");
-    job10.getJobConf().set("user.name", "user4");
+    job10.getJobConf().set(JobContext.USER_NAME, "user4");
     JobInfo info10 = scheduler.infos.get(job10);
     advanceTime(10);
     
@@ -1683,13 +1687,13 @@
     // tasks on tt1 and tt2 to ensure that the ones on tt2 get preempted first.
     JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10);
     checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
     checkAssignment("tt1", "attempt_test_0001_r_000000_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
     checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
     advanceTime(100);
     checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2");
     checkAssignment("tt2", "attempt_test_0001_r_000002_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2");
     checkAssignment("tt2", "attempt_test_0001_r_000003_0 on tt2");
     
     // Ten seconds later, submit job 2.
@@ -1761,13 +1765,13 @@
     // tasks on tt1 and tt2 to ensure that the ones on tt2 get preempted first.
     JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10);
     checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
     checkAssignment("tt1", "attempt_test_0001_r_000000_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
     checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
     advanceTime(100);
     checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2");
     checkAssignment("tt2", "attempt_test_0001_r_000002_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2");
     checkAssignment("tt2", "attempt_test_0001_r_000003_0 on tt2");
     
     // Ten seconds later, submit job 2.
@@ -1794,8 +1798,8 @@
     scheduler.update();
     assertEquals(3, job1.runningMaps());
     assertEquals(2, job1.runningReduces());
-    checkAssignment("tt2", "attempt_test_0002_m_000000_0 on tt2");
     checkAssignment("tt2", "attempt_test_0002_r_000000_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0002_m_000000_0 on tt2");
     checkAssignment("tt2", "attempt_test_0002_r_000001_0 on tt2");
     assertNull(scheduler.assignTasks(tracker("tt1")));
     assertNull(scheduler.assignTasks(tracker("tt2")));
@@ -1840,18 +1844,18 @@
     JobInProgress job1 = submitJob(JobStatus.RUNNING, 6, 6, "pool1");
     advanceTime(100);
     checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
     checkAssignment("tt1", "attempt_test_0001_r_000000_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
     checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
     advanceTime(100);
     checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2");
     checkAssignment("tt2", "attempt_test_0001_r_000002_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2");
     checkAssignment("tt2", "attempt_test_0001_r_000003_0 on tt2");
     advanceTime(100);
     checkAssignment("tt3", "attempt_test_0001_m_000004_0 on tt3");
-    checkAssignment("tt3", "attempt_test_0001_m_000005_0 on tt3");
     checkAssignment("tt3", "attempt_test_0001_r_000004_0 on tt3");
+    checkAssignment("tt3", "attempt_test_0001_m_000005_0 on tt3");
     checkAssignment("tt3", "attempt_test_0001_r_000005_0 on tt3");
     advanceTime(100);
     
@@ -1859,8 +1863,8 @@
     JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 10, "pool2");
     advanceTime(100);
     checkAssignment("tt4", "attempt_test_0002_m_000000_0 on tt4");
-    checkAssignment("tt4", "attempt_test_0002_m_000001_0 on tt4");
     checkAssignment("tt4", "attempt_test_0002_r_000000_0 on tt4");
+    checkAssignment("tt4", "attempt_test_0002_m_000001_0 on tt4");
     checkAssignment("tt4", "attempt_test_0002_r_000001_0 on tt4");
     
     // Submit job 3.
@@ -1896,8 +1900,8 @@
     assertEquals(4, job1.runningMaps());
     assertEquals(4, job1.runningReduces());
     checkAssignment("tt3", "attempt_test_0003_m_000000_0 on tt3");
-    checkAssignment("tt3", "attempt_test_0003_m_000001_0 on tt3");
     checkAssignment("tt3", "attempt_test_0003_r_000000_0 on tt3");
+    checkAssignment("tt3", "attempt_test_0003_m_000001_0 on tt3");
     checkAssignment("tt3", "attempt_test_0003_r_000001_0 on tt3");
     assertNull(scheduler.assignTasks(tracker("tt1")));
     assertNull(scheduler.assignTasks(tracker("tt2")));
@@ -1937,13 +1941,13 @@
     // tasks on tt1 and tt2 to ensure that the ones on tt2 get preempted first.
     JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10);
     checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
     checkAssignment("tt1", "attempt_test_0001_r_000000_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
     checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
     advanceTime(100);
     checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2");
     checkAssignment("tt2", "attempt_test_0001_r_000002_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2");
     checkAssignment("tt2", "attempt_test_0001_r_000003_0 on tt2");
     
     // Ten seconds later, submit job 2.
@@ -1979,8 +1983,8 @@
     assertEquals(2, job1.runningMaps());
     assertEquals(2, job1.runningReduces());
     checkAssignment("tt2", "attempt_test_0002_m_000000_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2");
     checkAssignment("tt2", "attempt_test_0002_r_000000_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2");
     checkAssignment("tt2", "attempt_test_0002_r_000001_0 on tt2");
     assertNull(scheduler.assignTasks(tracker("tt1")));
     assertNull(scheduler.assignTasks(tracker("tt2")));
@@ -2011,13 +2015,13 @@
     // tasks on tt1 and tt2 to ensure that the ones on tt2 get preempted first.
     JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10);
     checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
     checkAssignment("tt1", "attempt_test_0001_r_000000_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
     checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
     advanceTime(100);
     checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2");
     checkAssignment("tt2", "attempt_test_0001_r_000002_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2");
     checkAssignment("tt2", "attempt_test_0001_r_000003_0 on tt2");
     
     // Ten seconds later, submit job 2.
@@ -2065,13 +2069,13 @@
     // tasks on tt1 and tt2 to ensure that the ones on tt2 get preempted first.
     JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10);
     checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
     checkAssignment("tt1", "attempt_test_0001_r_000000_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
     checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
     advanceTime(100);
     checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2");
     checkAssignment("tt2", "attempt_test_0001_r_000002_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2");
     checkAssignment("tt2", "attempt_test_0001_r_000003_0 on tt2");
     
     // Ten seconds later, submit job 2.
@@ -2381,12 +2385,12 @@
     // Assign tasks and check that they're given first to job3 (because it is
     // high priority), then to job1, then to job2.
     checkAssignment("tt1", "attempt_test_0003_m_000000_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1");
     checkAssignment("tt1", "attempt_test_0003_r_000000_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1");
     checkAssignment("tt1", "attempt_test_0001_r_000000_0 on tt1");
     checkAssignment("tt2", "attempt_test_0002_m_000000_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2");
     checkAssignment("tt2", "attempt_test_0002_r_000000_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2");
     checkAssignment("tt2", "attempt_test_0002_r_000001_0 on tt2");
   }
   
@@ -2419,12 +2423,12 @@
     // Assign tasks and check that they alternate between jobs 1 and 3, the
     // head-of-line jobs in their respective pools.
     checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0003_m_000000_0 on tt1");
     checkAssignment("tt1", "attempt_test_0001_r_000000_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0003_m_000000_0 on tt1");
     checkAssignment("tt1", "attempt_test_0003_r_000000_0 on tt1");
     checkAssignment("tt2", "attempt_test_0001_m_000001_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0003_m_000001_0 on tt2");
     checkAssignment("tt2", "attempt_test_0001_r_000001_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0003_m_000001_0 on tt2");
     checkAssignment("tt2", "attempt_test_0003_r_000001_0 on tt2");
   }
   
@@ -2460,14 +2464,95 @@
     // Assign tasks and check that only job 1 gets tasks in pool A, but
     // jobs 3 and 4 both get tasks in pool B.
     checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0003_m_000000_0 on tt1");
     checkAssignment("tt1", "attempt_test_0001_r_000000_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0003_m_000000_0 on tt1");
     checkAssignment("tt1", "attempt_test_0003_r_000000_0 on tt1");
     checkAssignment("tt2", "attempt_test_0001_m_000001_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0004_m_000000_0 on tt2");
     checkAssignment("tt2", "attempt_test_0001_r_000001_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0004_m_000000_0 on tt2");
     checkAssignment("tt2", "attempt_test_0004_r_000000_0 on tt2");
   }
+
+  /**
+   * This test uses the mapred.fairscheduler.pool property to assign jobs to pools.
+   */
+  public void testPoolAssignment() throws Exception {
+    // Set up pools file
+    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+    out.println("<?xml version=\"1.0\"?>");
+    out.println("<allocations>");
+    out.println("<pool name=\"default\">");
+    out.println("<schedulingMode>fair</schedulingMode>");
+    out.println("</pool>");
+    out.println("<pool name=\"poolA\">");
+    out.println("<schedulingMode>fair</schedulingMode>");
+    out.println("</pool>");
+    out.println("</allocations>");
+    out.close();
+    scheduler.getPoolManager().reloadAllocs();
+    Pool defaultPool = scheduler.getPoolManager().getPool("default");
+    Pool poolA = scheduler.getPoolManager().getPool("poolA");
+ 
+    // Submit a job to the default pool.  All specifications take default values.
+    JobInProgress job1 = submitJob(JobStatus.RUNNING, 1, 3);
+
+    assertEquals(1,    defaultPool.getMapSchedulable().getDemand());
+    assertEquals(3,    defaultPool.getReduceSchedulable().getDemand());
+    assertEquals(0,    poolA.getMapSchedulable().getDemand());
+    assertEquals(0,    poolA.getReduceSchedulable().getDemand());
+
+    // Submit a job to the default pool and move it to poolA using setPool.
+    JobInProgress job2 = submitJob(JobStatus.RUNNING, 5, 7);
+
+    assertEquals(6,    defaultPool.getMapSchedulable().getDemand());
+    assertEquals(10,   defaultPool.getReduceSchedulable().getDemand());
+    assertEquals(0,    poolA.getMapSchedulable().getDemand());
+    assertEquals(0,    poolA.getReduceSchedulable().getDemand());
+
+    scheduler.getPoolManager().setPool(job2, "poolA");
+    assertEquals("poolA", scheduler.getPoolManager().getPoolName(job2));
+
+    defaultPool.getMapSchedulable().updateDemand();
+    defaultPool.getReduceSchedulable().updateDemand();
+    poolA.getMapSchedulable().updateDemand();
+    poolA.getReduceSchedulable().updateDemand();
+
+    assertEquals(1,    defaultPool.getMapSchedulable().getDemand());
+    assertEquals(3,    defaultPool.getReduceSchedulable().getDemand());
+    assertEquals(5,    poolA.getMapSchedulable().getDemand());
+    assertEquals(7,    poolA.getReduceSchedulable().getDemand());
+
+    // Submit a job to poolA by specifying mapred.fairscheduler.pool
+    JobConf jobConf = new JobConf(conf);
+    jobConf.setNumMapTasks(11);
+    jobConf.setNumReduceTasks(13);
+    jobConf.set(POOL_PROPERTY, "nonsense"); // test that this is overridden
+    jobConf.set(EXPLICIT_POOL_PROPERTY, "poolA");
+    JobInProgress job3 = new FakeJobInProgress(jobConf, taskTrackerManager,
+        null, UtilsForTests.getJobTracker());
+    job3.getStatus().setRunState(JobStatus.RUNNING);
+    taskTrackerManager.submitJob(job3);
+
+    assertEquals(1,    defaultPool.getMapSchedulable().getDemand());
+    assertEquals(3,    defaultPool.getReduceSchedulable().getDemand());
+    assertEquals(16,   poolA.getMapSchedulable().getDemand());
+    assertEquals(20,   poolA.getReduceSchedulable().getDemand());
+
+    // Submit a job to poolA by specifying pool and not mapred.fairscheduler.pool
+    JobConf jobConf2 = new JobConf(conf);
+    jobConf2.setNumMapTasks(17);
+    jobConf2.setNumReduceTasks(19);
+    jobConf2.set(POOL_PROPERTY, "poolA");
+    JobInProgress job4 = new FakeJobInProgress(jobConf2, taskTrackerManager,
+        null, UtilsForTests.getJobTracker());
+    job4.getStatus().setRunState(JobStatus.RUNNING);
+    taskTrackerManager.submitJob(job4);
+
+    assertEquals(1,    defaultPool.getMapSchedulable().getDemand());
+    assertEquals(3,    defaultPool.getReduceSchedulable().getDemand());
+    assertEquals(33,   poolA.getMapSchedulable().getDemand());
+    assertEquals(39,   poolA.getReduceSchedulable().getDemand());
+  }
   
   private void advanceTime(long time) {
     clock.advance(time);

Propchange: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/index/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Nov 28 20:26:01 2009
@@ -1,3 +1,3 @@
 /hadoop/core/branches/branch-0.19/mapred/src/contrib/index:713112
 /hadoop/core/trunk/src/contrib/index:776175-786373
-/hadoop/mapreduce/trunk/src/contrib/index:804974-807678
+/hadoop/mapreduce/trunk/src/contrib/index:804974-884916

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/index/ivy.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/index/ivy.xml?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/index/ivy.xml (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/index/ivy.xml Sat Nov 28 20:26:01 2009
@@ -24,17 +24,9 @@
     <artifact conf="master"/>
   </publications>
   <dependencies>
-    <dependency org="commons-logging"
-      name="commons-logging"
-      rev="${commons-logging.version}"
-      conf="common->default"/>
-    <dependency org="log4j"
-      name="log4j"
-      rev="${log4j.version}"
-      conf="common->master"/>
-    <dependency org="org.apache.lucene"
-      name="lucene-core"
-      rev="${lucene-core.version}"
-      conf="common->default"/>
-    </dependencies>
+    <dependency org="org.apache.hadoop" name="hadoop-core" rev="${hadoop-core.version}" conf="common->default"/>
+    <dependency org="commons-logging" name="commons-logging" rev="${commons-logging.version}" conf="common->default"/>
+    <dependency org="log4j" name="log4j" rev="${log4j.version}" conf="common->master"/>
+    <dependency org="org.apache.lucene" name="lucene-core" rev="${lucene-core.version}" conf="common->default"/>
+  </dependencies>
 </ivy-module>

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/index/src/java/org/apache/hadoop/contrib/index/main/UpdateIndex.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/index/src/java/org/apache/hadoop/contrib/index/main/UpdateIndex.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/index/src/java/org/apache/hadoop/contrib/index/main/UpdateIndex.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/index/src/java/org/apache/hadoop/contrib/index/main/UpdateIndex.java Sat Nov 28 20:26:01 2009
@@ -204,7 +204,8 @@
     IndexUpdateConfiguration iconf = new IndexUpdateConfiguration(jobConf);
 
     if (inputPathsString != null) {
-      jobConf.set("mapred.input.dir", inputPathsString);
+      jobConf.set(org.apache.hadoop.mapreduce.lib.input.
+        FileInputFormat.INPUT_DIR, inputPathsString);
     }
     inputPaths = FileInputFormat.getInputPaths(jobConf);
     if (inputPaths.length == 0) {

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IndexUpdateConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IndexUpdateConfiguration.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IndexUpdateConfiguration.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IndexUpdateConfiguration.java Sat Nov 28 20:26:01 2009
@@ -23,6 +23,8 @@
 import org.apache.hadoop.contrib.index.example.LineDocInputFormat;
 import org.apache.hadoop.contrib.index.example.LineDocLocalAnalysis;
 import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.lucene.analysis.Analyzer;
 import org.apache.lucene.analysis.standard.StandardAnalyzer;
 
@@ -62,7 +64,7 @@
    * @return the IO sort space in MB
    */
   public int getIOSortMB() {
-    return conf.getInt("io.sort.mb", 100);
+    return conf.getInt(JobContext.IO_SORT_MB, 100);
   }
 
   /**
@@ -70,7 +72,7 @@
    * @param mb  the IO sort space in MB
    */
   public void setIOSortMB(int mb) {
-    conf.setInt("io.sort.mb", mb);
+    conf.setInt(JobContext.IO_SORT_MB, mb);
   }
 
   /**
@@ -78,7 +80,7 @@
    * @return the Map/Reduce temp directory
    */
   public String getMapredTempDir() {
-    return conf.get("mapred.temp.dir");
+    return conf.get(MRConfig.TEMP_DIR);
   }
 
   //

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IndexUpdater.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IndexUpdater.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IndexUpdater.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IndexUpdater.java Sat Nov 28 20:26:01 2009
@@ -64,9 +64,9 @@
     IndexUpdateConfiguration iconf = new IndexUpdateConfiguration(conf);
     Shard.setIndexShards(iconf, shards);
 
-    // MapTask.MapOutputBuffer uses "io.sort.mb" to decide its max buffer size
-    // (max buffer size = 1/2 * "io.sort.mb").
-    // Here we half-en "io.sort.mb" because we use the other half memory to
+    // MapTask.MapOutputBuffer uses JobContext.IO_SORT_MB to decide its max buffer size
+    // (max buffer size = 1/2 * JobContext.IO_SORT_MB).
+    // Here we half-en JobContext.IO_SORT_MB because we use the other half memory to
     // build an intermediate form/index in Combiner.
     iconf.setIOSortMB(iconf.getIOSortMB() / 2);
 
@@ -93,10 +93,10 @@
       buffer.append(inputs[i].toString());
     }
     LOG.info("mapred.input.dir = " + buffer.toString());
-    LOG.info("mapred.output.dir = " + 
+    LOG.info("mapreduce.output.fileoutputformat.outputdir = " + 
              FileOutputFormat.getOutputPath(jobConf).toString());
-    LOG.info("mapred.map.tasks = " + jobConf.getNumMapTasks());
-    LOG.info("mapred.reduce.tasks = " + jobConf.getNumReduceTasks());
+    LOG.info("mapreduce.job.maps = " + jobConf.getNumMapTasks());
+    LOG.info("mapreduce.job.reduces = " + jobConf.getNumReduceTasks());
     LOG.info(shards.length + " shards = " + iconf.getIndexShards());
     // better if we don't create the input format instance
     LOG.info("mapred.input.format.class = "

Propchange: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/mrunit/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Nov 28 20:26:01 2009
@@ -1,3 +1,3 @@
 /hadoop/core/branches/branch-0.19/mapred/src/contrib/mrunit:713112
 /hadoop/core/trunk/src/contrib/mrunit:776175-786373
-/hadoop/mapreduce/trunk/src/contrib/mrunit:804974-807678
+/hadoop/mapreduce/trunk/src/contrib/mrunit:804974-884916

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/mrunit/ivy.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/mrunit/ivy.xml?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/mrunit/ivy.xml (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/mrunit/ivy.xml Sat Nov 28 20:26:01 2009
@@ -40,21 +40,11 @@
     <artifact conf="master"/>
   </publications>
   <dependencies>
-    <dependency org="commons-logging"
-      name="commons-logging"
-      rev="${commons-logging.version}"
-      conf="common->master"/>
-    <dependency org="log4j"
-      name="log4j"
-      rev="${log4j.version}"
-      conf="common->master"/>
-   <dependency org="junit"
-      name="junit"
-      rev="${junit.version}"
-      conf="common->master"/>
-   <dependency org="org.apache.hadoop"
-      name="avro"
-      rev="1.0.0"
-      conf="common->default"/>
+    <dependency org="org.apache.hadoop" name="hadoop-core" rev="${hadoop-core.version}" conf="common->default"/>
+    <dependency org="commons-logging" name="commons-logging" rev="${commons-logging.version}" conf="common->default"/>
+    <dependency org="log4j" name="log4j" rev="${log4j.version}" conf="common->master"/>
+    <dependency org="junit" name="junit" rev="${junit.version}" conf="common->master"/>
+    <dependency org="org.apache.hadoop" name="avro" rev="${avro.version}" conf="common->default"/>
   </dependencies>
+
 </ivy-module>

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/MapDriver.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/MapDriver.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/MapDriver.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/MapDriver.java Sat Nov 28 20:26:01 2009
@@ -24,6 +24,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.Counters;
 import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.mrunit.mock.MockOutputCollector;
 import org.apache.hadoop.mrunit.mock.MockReporter;
@@ -43,12 +44,33 @@
   public static final Log LOG = LogFactory.getLog(MapDriver.class);
 
   private Mapper<K1, V1, K2, V2> myMapper;
+  private Counters counters;
 
   public MapDriver(final Mapper<K1, V1, K2, V2> m) {
     myMapper = m;
+    counters = new Counters();
   }
 
   public MapDriver() {
+    counters = new Counters();
+  }
+
+  /** @return the counters used in this test */
+  public Counters getCounters() {
+    return counters;
+  }
+
+  /** Sets the counters object to use for this test.
+   * @param ctrs The counters object to use.
+   */
+  public void setCounters(final Counters ctrs) {
+    this.counters = ctrs;
+  }
+
+  /** Sets the counters to use and returns self for fluent style */
+  public MapDriver<K1, V1, K2, V2> withCounters(final Counters ctrs) {
+    setCounters(ctrs);
+    return this;
   }
 
   /**
@@ -165,7 +187,7 @@
   public List<Pair<K2, V2>> run() throws IOException {
     MockOutputCollector<K2, V2> outputCollector =
       new MockOutputCollector<K2, V2>();
-    MockReporter reporter = new MockReporter(MockReporter.ReporterType.Mapper);
+    MockReporter reporter = new MockReporter(MockReporter.ReporterType.Mapper, getCounters());
 
     myMapper.map(inputKey, inputVal, outputCollector, reporter);
 

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/MapReduceDriver.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/MapReduceDriver.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/MapReduceDriver.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/MapReduceDriver.java Sat Nov 28 20:26:01 2009
@@ -29,6 +29,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.Counters;
 import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mrunit.types.Pair;
@@ -55,11 +56,13 @@
   private Mapper<K1, V1, K2, V2> myMapper;
   private Reducer<K2, V2, K3, V3> myReducer;
   private Reducer<K2, V2, K2, V2> myCombiner;
+  private Counters counters;
 
   public MapReduceDriver(final Mapper<K1, V1, K2, V2> m,
                          final Reducer<K2, V2, K3, V3> r) {
     myMapper = m;
     myReducer = r;
+    counters = new Counters();
   }
 
   public MapReduceDriver(final Mapper<K1, V1, K2, V2> m,
@@ -68,9 +71,29 @@
     myMapper = m;
     myReducer = r;
     myCombiner = c;
+    counters = new Counters();
   }
 
   public MapReduceDriver() {
+    counters = new Counters();
+  }
+
+  /** @return the counters used in this test */
+  public Counters getCounters() {
+    return counters;
+  }
+
+  /** Sets the counters object to use for this test.
+   * @param ctrs The counters object to use.
+   */
+  public void setCounters(final Counters ctrs) {
+    this.counters = ctrs;
+  }
+
+  /** Sets the counters to use and returns self for fluent style */
+  public MapReduceDriver<K1, V1, K2, V2, K3, V3> withCounters(final Counters ctrs) {
+    setCounters(ctrs);
+    return this;
   }
 
   /** Set the Mapper instance to use with this test driver
@@ -227,7 +250,10 @@
             + sb.toString() + ")");
 
         reduceOutputs.addAll(new ReduceDriver<K2, V2, OUTKEY, OUTVAL>(reducer)
-                .withInputKey(inputKey).withInputValues(inputValues).run());
+                .withCounters(getCounters())
+                .withInputKey(inputKey)
+                .withInputValues(inputValues)
+                .run());
       }
 
       return reduceOutputs;
@@ -243,7 +269,7 @@
       LOG.debug("Mapping input " + input.toString() + ")");
 
       mapOutputs.addAll(new MapDriver<K1, V1, K2, V2>(myMapper).withInput(
-              input).run());
+              input).withCounters(getCounters()).run());
     }
 
     if (myCombiner != null) {

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/PipelineMapReduceDriver.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/PipelineMapReduceDriver.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/PipelineMapReduceDriver.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/PipelineMapReduceDriver.java Sat Nov 28 20:26:01 2009
@@ -25,6 +25,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.Counters;
 import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mrunit.types.Pair;
@@ -56,15 +57,18 @@
 
   private List<Pair<Mapper, Reducer>> mapReducePipeline;
   private List<Pair<K1, V1>> inputList;
+  private Counters counters;
 
   public PipelineMapReduceDriver(final List<Pair<Mapper, Reducer>> pipeline) {
     this.mapReducePipeline = copyMapReduceList(pipeline);
     this.inputList = new ArrayList<Pair<K1, V1>>();
+    this.counters = new Counters();
   }
 
   public PipelineMapReduceDriver() {
     this.mapReducePipeline = new ArrayList<Pair<Mapper, Reducer>>();
     this.inputList = new ArrayList<Pair<K1, V1>>();
+    this.counters = new Counters();
   }
 
   private List<Pair<Mapper, Reducer>> copyMapReduceList(List<Pair<Mapper, Reducer>> lst) {
@@ -77,6 +81,25 @@
     return outList;
   }
 
+  /** @return the counters used in this test */
+  public Counters getCounters() {
+    return counters;
+  }
+
+  /** Sets the counters object to use for this test.
+   * @param ctrs The counters object to use.
+   */
+  public void setCounters(final Counters ctrs) {
+    this.counters = ctrs;
+  }
+
+  /** Sets the counters to use and returns self for fluent style */
+  public PipelineMapReduceDriver<K1, V1, K2, V2> withCounters(final Counters ctrs) {
+    setCounters(ctrs);
+    return this;
+  }
+
+
   /** Add a Mapper and Reducer instance to the pipeline to use with this test driver
    * @param m The Mapper instance to add to the pipeline
    * @param r The Reducer instance to add to the pipeline
@@ -282,6 +305,8 @@
       // Create a MapReduceDriver to run this phase of the pipeline.
       MapReduceDriver mrDriver = new MapReduceDriver(job.getFirst(), job.getSecond());
 
+      mrDriver.setCounters(getCounters());
+
       // Add the inputs from the user, or from the previous stage of the pipeline.
       for (Object input : inputs) {
         mrDriver.addInput((Pair) input);

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/ReduceDriver.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/ReduceDriver.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/ReduceDriver.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/ReduceDriver.java Sat Nov 28 20:26:01 2009
@@ -25,6 +25,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.Counters;
 import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mrunit.mock.MockOutputCollector;
 import org.apache.hadoop.mrunit.mock.MockReporter;
@@ -45,14 +46,36 @@
   public static final Log LOG = LogFactory.getLog(ReduceDriver.class);
 
   private Reducer<K1, V1, K2, V2> myReducer;
+  private Counters counters;
 
   public ReduceDriver(final Reducer<K1, V1, K2, V2> r) {
     myReducer = r;
+    counters = new Counters();
   }
 
   public ReduceDriver() {
+    counters = new Counters();
   }
 
+  /** @return the counters used in this test */
+  public Counters getCounters() {
+    return counters;
+  }
+
+  /** Sets the counters object to use for this test.
+   * @param ctrs The counters object to use.
+   */
+  public void setCounters(final Counters ctrs) {
+    this.counters = ctrs;
+  }
+
+  /** Sets the counters to use and returns self for fluent style */
+  public ReduceDriver<K1, V1, K2, V2> withCounters(final Counters ctrs) {
+    setCounters(ctrs);
+    return this;
+  }
+
+
   /**
    * Sets the reducer object to use for this test
    *
@@ -172,7 +195,7 @@
   public List<Pair<K2, V2>> run() throws IOException {
     MockOutputCollector<K2, V2> outputCollector =
       new MockOutputCollector<K2, V2>();
-    MockReporter reporter = new MockReporter(MockReporter.ReporterType.Reducer);
+    MockReporter reporter = new MockReporter(MockReporter.ReporterType.Reducer, getCounters());
 
     myReducer.reduce(inputKey, inputValues.iterator(), outputCollector,
             reporter);

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/MapDriver.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/MapDriver.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/MapDriver.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/MapDriver.java Sat Nov 28 20:26:01 2009
@@ -25,10 +25,11 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.map.WrappedMapper;
 import org.apache.hadoop.mrunit.MapDriverBase;
-import org.apache.hadoop.mrunit.mapreduce.mock.MockMapContextWrapper;
+import org.apache.hadoop.mrunit.mapreduce.mock.MockMapContext;
 import org.apache.hadoop.mrunit.types.Pair;
 
 /**
@@ -45,12 +46,15 @@
   public static final Log LOG = LogFactory.getLog(MapDriver.class);
 
   private Mapper<K1, V1, K2, V2> myMapper;
+  private Counters counters;
 
   public MapDriver(final Mapper<K1, V1, K2, V2> m) {
     myMapper = m;
+    counters = new Counters();
   }
 
   public MapDriver() {
+    counters = new Counters();
   }
 
 
@@ -76,6 +80,24 @@
     return myMapper;
   }
 
+  /** @return the counters used in this test */
+  public Counters getCounters() {
+    return counters;
+  }
+
+  /** Sets the counters object to use for this test.
+   * @param ctrs The counters object to use.
+   */
+  public void setCounters(final Counters ctrs) {
+    this.counters = ctrs;
+  }
+
+  /** Sets the counters to use and returns self for fluent style */
+  public MapDriver<K1, V1, K2, V2> withCounters(final Counters ctrs) {
+    setCounters(ctrs);
+    return this;
+  }
+
   /**
    * Identical to setInputKey() but with fluent programming style
    *
@@ -170,11 +192,10 @@
     inputs.add(new Pair<K1, V1>(inputKey, inputVal));
 
     try {
-      MockMapContextWrapper<K1, V1, K2, V2> wrapper = new MockMapContextWrapper();
-      MockMapContextWrapper<K1, V1, K2, V2>.MockMapContext context =
-          wrapper.getMockContext(inputs);
+      MockMapContext<K1, V1, K2, V2> context =
+        new MockMapContext<K1, V1, K2, V2>(inputs, getCounters());
 
-      myMapper.run(context);
+      myMapper.run(new WrappedMapper<K1, V1, K2, V2>().getMapContext(context));
       return context.getOutputs();
     } catch (InterruptedException ie) {
       throw new IOException(ie);

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/MapReduceDriver.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/MapReduceDriver.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/MapReduceDriver.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/MapReduceDriver.java Sat Nov 28 20:26:01 2009
@@ -29,6 +29,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.hadoop.mrunit.MapReduceDriverBase;
@@ -51,14 +52,17 @@
 
   private Mapper<K1, V1, K2, V2> myMapper;
   private Reducer<K2, V2, K3, V3> myReducer;
+  private Counters counters;
 
   public MapReduceDriver(final Mapper<K1, V1, K2, V2> m,
                          final Reducer<K2, V2, K3, V3> r) {
     myMapper = m;
     myReducer = r;
+    counters = new Counters();
   }
 
   public MapReduceDriver() {
+    counters = new Counters();
   }
 
   /** Set the Mapper instance to use with this test driver
@@ -107,6 +111,24 @@
     return myReducer;
   }
 
+  /** @return the counters used in this test */
+  public Counters getCounters() {
+    return counters;
+  }
+
+  /** Sets the counters object to use for this test.
+   * @param ctrs The counters object to use.
+   */
+  public void setCounters(final Counters ctrs) {
+    this.counters = ctrs;
+  }
+
+  /** Sets the counters to use and returns self for fluent style */
+  public MapReduceDriver<K1, V1, K2, V2, K3, V3> withCounters(final Counters ctrs) {
+    setCounters(ctrs);
+    return this;
+  }
+
   /**
    * Identical to addInput() but returns self for fluent programming style
    * @param key
@@ -180,7 +202,7 @@
       LOG.debug("Mapping input " + input.toString() + ")");
 
       mapOutputs.addAll(new MapDriver<K1, V1, K2, V2>(myMapper).withInput(
-              input).run());
+              input).withCounters(getCounters()).run());
     }
 
     List<Pair<K2, List<V2>>> reduceInputs = shuffle(mapOutputs);
@@ -195,6 +217,7 @@
           + sb.toString() + ")");
 
       reduceOutputs.addAll(new ReduceDriver<K2, V2, K3, V3>(myReducer)
+              .withCounters(getCounters())
               .withInputKey(inputKey).withInputValues(inputValues).run());
     }
 

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/ReduceDriver.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/ReduceDriver.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/ReduceDriver.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/ReduceDriver.java Sat Nov 28 20:26:01 2009
@@ -25,10 +25,11 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer;
 import org.apache.hadoop.mrunit.ReduceDriverBase;
-import org.apache.hadoop.mrunit.mapreduce.mock.MockReduceContextWrapper;
+import org.apache.hadoop.mrunit.mapreduce.mock.MockReduceContext;
 import org.apache.hadoop.mrunit.types.Pair;
 
 /**
@@ -46,12 +47,15 @@
   public static final Log LOG = LogFactory.getLog(ReduceDriver.class);
 
   private Reducer<K1, V1, K2, V2> myReducer;
+  private Counters counters;
 
   public ReduceDriver(final Reducer<K1, V1, K2, V2> r) {
     myReducer = r;
+    counters = new Counters();
   }
 
   public ReduceDriver() {
+    counters = new Counters();
   }
 
   /**
@@ -80,6 +84,24 @@
     return myReducer;
   }
 
+  /** @return the counters used in this test */
+  public Counters getCounters() {
+    return counters;
+  }
+
+  /** Sets the counters object to use for this test.
+   * @param ctrs The counters object to use.
+   */
+  public void setCounters(final Counters ctrs) {
+    this.counters = ctrs;
+  }
+
+  /** Sets the counters to use and returns self for fluent style */
+  public ReduceDriver<K1, V1, K2, V2> withCounters(final Counters ctrs) {
+    setCounters(ctrs);
+    return this;
+  }
+
   /**
    * Identical to setInputKey() but with fluent programming style
    *
@@ -175,11 +197,9 @@
     inputs.add(new Pair<K1, List<V1>>(inputKey, inputValues));
 
     try {
-      MockReduceContextWrapper<K1, V1, K2, V2> wrapper = new MockReduceContextWrapper();
-      MockReduceContextWrapper<K1, V1, K2, V2>.MockReduceContext context =
-          wrapper.getMockContext(inputs);
-
-      myReducer.run(context);
+      MockReduceContext<K1, V1, K2, V2> context = 
+        new MockReduceContext<K1, V1, K2, V2>(inputs, getCounters());
+      myReducer.run(new WrappedReducer<K1, V1, K2, V2>().getReducerContext(context));
       return context.getOutputs();
     } catch (InterruptedException ie) {
       throw new IOException(ie);

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mock/MockReporter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mock/MockReporter.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mock/MockReporter.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mock/MockReporter.java Sat Nov 28 20:26:01 2009
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.mrunit.mock;
 
+import org.apache.hadoop.mapred.Counters;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.Counters.Counter;
@@ -24,6 +25,7 @@
 public class MockReporter implements Reporter {
 
   private MockInputSplit inputSplit = new MockInputSplit();
+  private Counters counters;
 
   public enum ReporterType {
     Mapper,
@@ -32,8 +34,9 @@
 
   private ReporterType typ;
 
-  public MockReporter(final ReporterType kind) {
+  public MockReporter(final ReporterType kind, final Counters ctrs) {
     this.typ = kind;
+    this.counters = ctrs;
   }
 
   @Override
@@ -48,12 +51,16 @@
 
   @Override
   public void incrCounter(Enum key, long amount) {
-    // do nothing.
+    if (null != counters) {
+      counters.incrCounter(key, amount);
+    }
   }
 
   @Override
   public void incrCounter(String group, String counter, long amount) {
-    // do nothing.
+    if (null != counters) {
+      counters.incrCounter(group, counter, amount);
+    }
   }
 
   @Override
@@ -67,15 +74,23 @@
   }
 
   @Override
-  public Counter getCounter(String s1, String s2) {
-    // do nothing
-    return null;
+  public Counter getCounter(String group, String name) {
+    Counters.Counter counter = null;
+    if (counters != null) {
+      counter = counters.findCounter(group, name);
+    }
+
+    return counter;
   }
 
   @Override
   public Counter getCounter(Enum key) {
-    // do nothing
-    return null;
+    Counters.Counter counter = null;
+    if (counters != null) {
+      counter = counters.findCounter(key);
+    }
+
+    return counter;
   }
 }
 

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/mrunit/src/test/org/apache/hadoop/mrunit/AllTests.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/mrunit/src/test/org/apache/hadoop/mrunit/AllTests.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/mrunit/src/test/org/apache/hadoop/mrunit/AllTests.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/mrunit/src/test/org/apache/hadoop/mrunit/AllTests.java Sat Nov 28 20:26:01 2009
@@ -42,6 +42,7 @@
     suite.addTestSuite(TestReduceDriver.class);
     suite.addTestSuite(TestTestDriver.class);
     suite.addTestSuite(TestExample.class);
+    suite.addTestSuite(TestCounters.class);
 
     suite.addTest(org.apache.hadoop.mrunit.types.AllTests.suite());
     suite.addTest(org.apache.hadoop.mrunit.mapreduce.AllTests.suite());

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/mrunit/src/test/org/apache/hadoop/mrunit/TestReduceDriver.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/mrunit/src/test/org/apache/hadoop/mrunit/TestReduceDriver.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/mrunit/src/test/org/apache/hadoop/mrunit/TestReduceDriver.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/mrunit/src/test/org/apache/hadoop/mrunit/TestReduceDriver.java Sat Nov 28 20:26:01 2009
@@ -22,13 +22,17 @@
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 
 import junit.framework.TestCase;
 
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.lib.LongSumReducer;
 import org.apache.hadoop.mrunit.types.Pair;
 import org.junit.Before;
@@ -222,5 +226,45 @@
       // expected.
     }
   }
+
+  /**
+   * Reducer that counts its values twice; the second iteration
+   * according to mapreduce semantics should be empty.
+   */
+  private static class DoubleIterReducer<K, V>
+      extends MapReduceBase implements Reducer<K, V, K, LongWritable> {
+    public void reduce(K key, Iterator<V> values,
+        OutputCollector<K, LongWritable> out, Reporter r) throws IOException {
+      long count = 0;
+
+      while (values.hasNext()) {
+        count++;
+        values.next();
+      }
+
+      // This time around, iteration should yield no values.
+      while (values.hasNext()) {
+        count++;
+        values.next();
+      }
+      out.collect(key, new LongWritable(count));
+    }
+  }
+
+  @Test
+  public void testDoubleIteration() {
+    reducer = new DoubleIterReducer<Text, LongWritable>();
+    driver = new ReduceDriver<Text, LongWritable, Text, LongWritable>(
+        reducer);
+
+    driver
+        .withInputKey(new Text("foo"))
+        .withInputValue(new LongWritable(1))
+        .withInputValue(new LongWritable(1))
+        .withInputValue(new LongWritable(1))
+        .withInputValue(new LongWritable(1))
+        .withOutput(new Text("foo"), new LongWritable(4))
+        .runTest();
+  }
 }
 

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/mrunit/src/test/org/apache/hadoop/mrunit/mapreduce/AllTests.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/mrunit/src/test/org/apache/hadoop/mrunit/mapreduce/AllTests.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/mrunit/src/test/org/apache/hadoop/mrunit/mapreduce/AllTests.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/mrunit/src/test/org/apache/hadoop/mrunit/mapreduce/AllTests.java Sat Nov 28 20:26:01 2009
@@ -34,6 +34,7 @@
     suite.addTestSuite(TestMapDriver.class);
     suite.addTestSuite(TestReduceDriver.class);
     suite.addTestSuite(TestMapReduceDriver.class);
+    suite.addTestSuite(TestCounters.class);
 
     return suite;
   }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/mrunit/src/test/org/apache/hadoop/mrunit/mapreduce/TestReduceDriver.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/mrunit/src/test/org/apache/hadoop/mrunit/mapreduce/TestReduceDriver.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/mrunit/src/test/org/apache/hadoop/mrunit/mapreduce/TestReduceDriver.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/mrunit/src/test/org/apache/hadoop/mrunit/mapreduce/TestReduceDriver.java Sat Nov 28 20:26:01 2009
@@ -223,5 +223,43 @@
       // expected.
     }
   }
+
+  /**
+   * Reducer that counts its values twice; the second iteration
+   * according to mapreduce semantics should be empty.
+   */
+  private static class DoubleIterReducer<K, V>
+      extends Reducer<K, V, K, LongWritable> {
+    public void reduce(K key, Iterable<V> values, Context c)
+        throws IOException, InterruptedException {
+      long count = 0;
+
+      for (V val : values) {
+        count++;
+      }
+
+      // This time around, iteration should yield no values.
+      for (V val : values) {
+        count++;
+      }
+      c.write(key, new LongWritable(count));
+    }
+  }
+
+  @Test
+  public void testDoubleIteration() {
+    reducer = new DoubleIterReducer<Text, LongWritable>();
+    driver = new ReduceDriver<Text, LongWritable, Text, LongWritable>(
+        reducer);
+
+    driver
+        .withInputKey(new Text("foo"))
+        .withInputValue(new LongWritable(1))
+        .withInputValue(new LongWritable(1))
+        .withInputValue(new LongWritable(1))
+        .withInputValue(new LongWritable(1))
+        .withOutput(new Text("foo"), new LongWritable(4))
+        .runTest();
+  }
 }
 

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/mrunit/src/test/org/apache/hadoop/mrunit/mock/TestMockReporter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/mrunit/src/test/org/apache/hadoop/mrunit/mock/TestMockReporter.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/mrunit/src/test/org/apache/hadoop/mrunit/mock/TestMockReporter.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/mrunit/src/test/org/apache/hadoop/mrunit/mock/TestMockReporter.java Sat Nov 28 20:26:01 2009
@@ -27,7 +27,7 @@
 
   @Test
   public void testGetInputSplitForMapper() {
-    InputSplit split = new MockReporter(MockReporter.ReporterType.Mapper).getInputSplit();
+    InputSplit split = new MockReporter(MockReporter.ReporterType.Mapper, null).getInputSplit();
     assertTrue(null != split);
   }
 
@@ -36,7 +36,7 @@
   @Test
   public void testGetInputSplitForReducer() {
     try {
-      new MockReporter(MockReporter.ReporterType.Reducer).getInputSplit();
+      new MockReporter(MockReporter.ReporterType.Reducer, null).getInputSplit();
       fail(); // shouldn't get here
     } catch (UnsupportedOperationException uoe) {
       // expected this.

Propchange: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Nov 28 20:26:01 2009
@@ -1,3 +1,3 @@
 /hadoop/core/branches/branch-0.19/src/contrib/sqoop:713112
 /hadoop/core/trunk/src/contrib/sqoop:784975-786373
-/hadoop/mapreduce/trunk/src/contrib/sqoop:804974-807678
+/hadoop/mapreduce/trunk/src/contrib/sqoop:804974-884916

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/build.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/build.xml?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/build.xml (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/build.xml Sat Nov 28 20:26:01 2009
@@ -26,6 +26,38 @@
   <import file="../build-contrib.xml"/>
   <property environment="env"/>
   <property name="sqoop.thirdparty.lib.dir" value="" />
+  <property name="mrunit.class.dir" value="${build.dir}/../mrunit/classes" />
+
+  <!-- ================================================================== -->
+  <!-- Compile test code                                                  -->
+  <!-- Override with our own version so we can enforce build dependencies -->
+  <!-- on compile-mapred-test for MiniMRCluster, and MRUnit.              -->
+  <!-- ================================================================== -->
+  <target name="compile-test" depends="compile-examples" if="test.available">
+    <echo message="Compiling ${name} dependencies" />
+    <!-- need top-level compile-mapred-test for MiniMRCluster -->
+    <subant target="compile-mapred-test">
+      <fileset dir="../../.." includes="build.xml" />
+    </subant>
+
+    <!-- Need MRUnit compiled for some tests -->
+    <subant target="compile">
+      <fileset dir="../mrunit" includes="build.xml" />
+    </subant>
+
+    <echo message="contrib: ${name}"/>
+    <javac
+     encoding="${build.encoding}"
+     srcdir="${src.test}"
+     includes="**/*.java"
+     destdir="${build.test}"
+     debug="${javac.debug}">
+    <classpath>
+      <path refid="test.classpath"/>
+      <pathelement path="${mrunit.class.dir}" />
+    </classpath>
+    </javac>
+  </target>
 
   <!-- ================================================================== -->
   <!-- Run unit tests                                                     -->
@@ -54,6 +86,11 @@
       <sysproperty key="build.test" value="${build.test}"/>
       <sysproperty key="contrib.name" value="${name}"/>
 
+
+      <!-- define this property to force Sqoop to throw better exceptions on errors
+           during testing, instead of printing a short message and exiting with status 1. -->
+      <sysproperty key="sqoop.throwOnError" value="" />
+
       <!--
            Added property needed to use the .class files for compilation
            instead of depending on hadoop-*-core.jar
@@ -92,13 +129,16 @@
       -->
       <sysproperty key="hive.home" value="${basedir}/testdata/hive" />
 
-      <!-- tools.jar from Sun JDK also required to invoke javac. -->
       <classpath>
         <path refid="test.classpath"/>
         <path refid="contrib-classpath"/>
+        <!-- tools.jar from Sun JDK also required to invoke javac. -->
         <pathelement path="${env.JAVA_HOME}/lib/tools.jar" />
+        <!-- need thirdparty JDBC drivers for thirdparty tests -->
         <fileset dir="${sqoop.thirdparty.lib.dir}"
             includes="*.jar" />
+        <!-- need MRUnit for some tests -->
+        <pathelement path="${mrunit.class.dir}" />
       </classpath>
       <formatter type="${test.junit.output.format}" />
       <batchtest todir="${build.test}" unless="testcase">
@@ -109,7 +149,15 @@
         <fileset dir="${src.test}" includes="**/${testcase}.java"/>
       </batchtest>
     </junit>
-    <fail if="tests.failed">Tests failed!</fail>
+    <antcall target="checkfailure"/>
+  </target>
+
+  <target name="doc">
+    <exec executable="make" failonerror="true">
+      <arg value="-C" />
+      <arg value="${basedir}/doc" />
+      <arg value="BUILDROOT=${build.dir}" />
+    </exec>
   </target>
 
 </project>

Propchange: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/doc/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Sat Nov 28 20:26:01 2009
@@ -0,0 +1,3 @@
+Sqoop-manpage.xml
+sqoop.1
+Sqoop-web.html

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/ivy.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/ivy.xml?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/ivy.xml (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/ivy.xml Sat Nov 28 20:26:01 2009
@@ -40,33 +40,23 @@
     <artifact conf="master"/>
   </publications>
   <dependencies>
-    <dependency org="commons-logging"
-      name="commons-logging"
-      rev="${commons-logging.version}"
-      conf="common->default"/>
-    <dependency org="commons-httpclient"
-      name="commons-httpclient"
-      rev="${commons-httpclient.version}"
-      conf="common->default"/>
-    <dependency org="commons-cli"
-      name="commons-cli"
-      rev="${commons-cli.version}"
-      conf="common->default"/>
-    <dependency org="junit"
-      name="junit"
-      rev="${junit.version}"
-      conf="common->default"/>
-    <dependency org="log4j"
-      name="log4j"
-      rev="${log4j.version}"
-      conf="common->master"/>
-    <dependency org="hsqldb"
-      name="hsqldb"
-      rev="${hsqldb.version}"
-      conf="common->default"/>
-    <dependency org="org.apache.hadoop"
-      name="avro"
-      rev="1.0.0"
-      conf="common->default"/>
-    </dependencies>
+     <dependency org="org.slf4j" name="slf4j-api" rev="${slf4j-api.version}" conf="common->master"/>
+     <dependency org="org.slf4j" name="slf4j-log4j12" rev="${slf4j-log4j12.version}" conf="common->master"/>
+     <dependency org="org.apache.hadoop" name="hadoop-core" rev="${hadoop-core.version}" conf="common->default"/>
+     <dependency org="org.apache.hadoop" name="hadoop-core-test" rev="${hadoop-core.version}" conf="common->default"/>
+     <dependency org="org.apache.hadoop" name="hadoop-hdfs" rev="${hadoop-hdfs.version}" conf="common->default"/>
+     <dependency org="org.apache.hadoop" name="hadoop-hdfs-test" rev="${hadoop-hdfs.version}" conf="common->default"/>
+     <dependency org="commons-logging" name="commons-logging" rev="${commons-logging.version}" conf="common->default"/>
+     <dependency org="log4j" name="log4j" rev="${log4j.version}" conf="common->master"/>
+     <dependency org="org.mortbay.jetty" name="servlet-api-2.5" rev="${servlet-api-2.5.version}" conf="common->default"/>
+     <dependency org="junit" name="junit" rev="${junit.version}" conf="common->default"/>
+     <dependency org="commons-httpclient" name="commons-httpclient" rev="${commons-httpclient.version}" conf="common->default"/>
+     <dependency org="commons-cli" name="commons-cli" rev="${commons-cli.version}" conf="common->default"/>
+     <dependency org="hsqldb" name="hsqldb" rev="${hsqldb.version}" conf="common->default"/>
+     <dependency org="org.apache.hadoop" name="avro" rev="${avro.version}" conf="common->default"/>
+     <dependency org="javax.servlet" name="servlet-api" rev="${servlet-api.version}" conf="common->master"/>
+     <dependency org="org.mortbay.jetty" name="jetty" rev="${jetty.version}" conf="common->master"/>
+     <dependency org="commons-io" name="commons-io" rev="${commons-io.version}" conf="common->default"/>
+     <dependency org="org.mortbay.jetty" name="jetty-util" rev="${jetty-util.version}" conf="common->master"/>
+  </dependencies>
 </ivy-module>

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ConnFactory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ConnFactory.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ConnFactory.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ConnFactory.java Sat Nov 28 20:26:01 2009
@@ -18,72 +18,87 @@
 
 package org.apache.hadoop.sqoop;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.sqoop.manager.ConnManager;
-import org.apache.hadoop.sqoop.manager.GenericJdbcManager;
-import org.apache.hadoop.sqoop.manager.HsqldbManager;
-import org.apache.hadoop.sqoop.manager.LocalMySQLManager;
-import org.apache.hadoop.sqoop.manager.MySQLManager;
-import org.apache.hadoop.sqoop.manager.OracleManager;
+import org.apache.hadoop.sqoop.manager.DefaultManagerFactory;
+import org.apache.hadoop.sqoop.manager.ManagerFactory;
+import org.apache.hadoop.util.ReflectionUtils;
 
 import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
 /**
- * Static factory class to create the ConnManager type required
+ * Factory class to create the ConnManager type required
  * for the current import job.
+ *
+ * This class delegates the actual responsibility for instantiating
+ * ConnManagers to one or more instances of ManagerFactory. ManagerFactories
+ * are consulted in the order specified in sqoop-site.xml (sqoop.connection.factories).
  */
-public final class ConnFactory {
+public class ConnFactory {
 
   public static final Log LOG = LogFactory.getLog(ConnFactory.class.getName());
 
-  private ConnFactory() { }
+  public ConnFactory(Configuration conf) {
+    factories = new LinkedList<ManagerFactory>();
+    instantiateFactories(conf);
+  }
 
-  /**
-   * Factory method to get a ConnManager for the given JDBC connect string
-   * @param opts The parsed command-line options
-   * @return a ConnManager instance for the appropriate database
-   * @throws IOException if it cannot find a ConnManager for this schema
+  /** The sqoop-site.xml configuration property used to set the list of 
+   * available ManagerFactories.
    */
-  public static ConnManager getManager(ImportOptions opts) throws IOException {
+  public final static String FACTORY_CLASS_NAMES_KEY = "sqoop.connection.factories";
 
-    String manualDriver = opts.getDriverClassName();
-    if (manualDriver != null) {
-      // User has manually specified JDBC implementation with --driver.
-      // Just use GenericJdbcManager.
-      return new GenericJdbcManager(manualDriver, opts);
-    }
-
-    String connectStr = opts.getConnectString();
+  // The default value for sqoop.connection.factories is the name of the DefaultManagerFactory.
+  final static String DEFAULT_FACTORY_CLASS_NAMES = DefaultManagerFactory.class.getName(); 
 
-    int schemeStopIdx = connectStr.indexOf("//");
-    if (-1 == schemeStopIdx) {
-      // no scheme component?
-      throw new IOException("Malformed connect string: " + connectStr);
-    }
-
-    String scheme = connectStr.substring(0, schemeStopIdx);
+  /** The list of ManagerFactory instances consulted by getManager().
+   */
+  private List<ManagerFactory> factories;
 
-    if (null == scheme) {
-      // We don't know if this is a mysql://, hsql://, etc.
-      // Can't do anything with this.
-      throw new IOException("Null scheme associated with connect string.");
+  /**
+   * Create the ManagerFactory instances that should populate
+   * the factories list.
+   */
+  private void instantiateFactories(Configuration conf) {
+    String [] classNameArray =
+        conf.getStrings(FACTORY_CLASS_NAMES_KEY, DEFAULT_FACTORY_CLASS_NAMES);
+
+    for (String className : classNameArray) {
+      try {
+        className = className.trim(); // Ignore leading/trailing whitespace.
+        ManagerFactory factory = ReflectionUtils.newInstance(
+            (Class<ManagerFactory>) conf.getClassByName(className), conf);
+        LOG.debug("Loaded manager factory: " + className);
+        factories.add(factory);
+      } catch (ClassNotFoundException cnfe) {
+        LOG.error("Could not load ManagerFactory " + className + " (not found)");
+      }
     }
+  }
 
-    if (scheme.equals("jdbc:mysql:")) {
-      if (opts.isDirect()) {
-        return new LocalMySQLManager(opts);
-      } else {
-        return new MySQLManager(opts);
+  /**
+   * Factory method to get a ConnManager for the given JDBC connect string.
+   * @param opts The parsed command-line options
+   * @return a ConnManager instance for the appropriate database
+   * @throws IOException if it cannot find a ConnManager for this schema
+   */
+  public ConnManager getManager(ImportOptions opts) throws IOException {
+    // Try all the available manager factories.
+    for (ManagerFactory factory : factories) {
+      LOG.debug("Trying ManagerFactory: " + factory.getClass().getName());
+      ConnManager mgr = factory.accept(opts);
+      if (null != mgr) {
+        LOG.debug("Instantiated ConnManager.");
+        return mgr;
       }
-    } else if (scheme.equals("jdbc:hsqldb:hsql:")) {
-      return new HsqldbManager(opts);
-    } else if (scheme.startsWith("jdbc:oracle:")) {
-      return new OracleManager(opts);
-    } else {
-      throw new IOException("Unknown connection scheme: " + scheme);
     }
+
+    throw new IOException("No manager for connect string: " + opts.getConnectString());
   }
 }
 



Mime
View raw message