hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ste...@apache.org
Subject svn commit: r783055 [2/6] - in /hadoop/core/branches/HADOOP-3628-2: ./ .eclipse.templates/ ivy/ lib/ lib/jsp-2.1/ src/contrib/ src/contrib/capacity-scheduler/ src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/ src/contrib/capacity-schedu...
Date Tue, 09 Jun 2009 16:11:23 GMT
Modified: hadoop/core/branches/HADOOP-3628-2/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java?rev=783055&r1=783054&r2=783055&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java Tue Jun  9 16:11:19 2009
@@ -39,8 +39,6 @@
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.conf.Configuration;
 
-
-
 public class TestCapacityScheduler extends TestCase {
 
   static final Log LOG =
@@ -611,13 +609,13 @@
   // represents a fake queue configuration info
   static class FakeQueueInfo {
     String queueName;
-    float gc;
+    float capacity;
     boolean supportsPrio;
     int ulMin;
 
-    public FakeQueueInfo(String queueName, float gc, boolean supportsPrio, int ulMin) {
+    public FakeQueueInfo(String queueName, float capacity, boolean supportsPrio, int ulMin) {
       this.queueName = queueName;
-      this.gc = gc;
+      this.capacity = capacity;
       this.supportsPrio = supportsPrio;
       this.ulMin = ulMin;
     }
@@ -647,10 +645,10 @@
     }*/
     
     public float getCapacity(String queue) {
-      if(queueMap.get(queue).gc == -1) {
+      if(queueMap.get(queue).capacity == -1) {
         return super.getCapacity(queue);
       }
-      return queueMap.get(queue).gc;
+      return queueMap.get(queue).capacity;
     }
     
     public int getMinimumUserLimitPercent(String queue) {
@@ -905,13 +903,6 @@
     return queue.toArray(new JobInProgress[0]);
   }
   
-  /*protected void submitJobs(int number, int state, int maps, int reduces)
-    throws IOException {
-    for (int i = 0; i < number; i++) {
-      submitJob(state, maps, reduces);
-    }
-  }*/
-  
   // tests if tasks can be assinged when there are multiple jobs from a same
   // user
   public void testJobFinished() throws Exception {
@@ -1048,7 +1039,7 @@
     String[] qs = {"default", "q2"};
     taskTrackerManager.addQueues(qs);
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
-    // set the gc % as 10%, so that gc will be zero initially as 
+    // set the capacity % as 10%, so that capacity will be zero initially as 
     // the cluster capacity increase slowly.
     queues.add(new FakeQueueInfo("default", 10.0f, true, 25));
     queues.add(new FakeQueueInfo("q2", 90.0f, true, 25));
@@ -1082,7 +1073,7 @@
     // add another tt to increase tt slots
     taskTrackerManager.addTaskTracker("tt5");
     // now job from default should run, as it is furthest away
-    // in terms of runningMaps / gc.
+    // in terms of runningMaps / capacity.
     checkAssignment("tt4", "attempt_test_0001_m_000001_0 on tt4");
     verifyCapacity("1", "default");
     verifyCapacity("9", "q2");
@@ -1093,7 +1084,7 @@
     String schedInfo = taskTrackerManager.getQueueManager().
                           getSchedulerInfo(queue).toString();    
     assertTrue(schedInfo.contains("Map tasks\nCapacity: " 
-        + expectedCapacity));
+        + expectedCapacity + " slots"));
   }
   
   // test capacity transfer
@@ -1274,7 +1265,82 @@
     // first in the queue
     checkAssignment("tt4", "attempt_test_0001_m_000007_0 on tt4");
   }
-  
+
+  /**
+   * Test to verify that high memory jobs hit user limits faster than any normal
+   * job.
+   * 
+   * @throws IOException
+   */
+  public void testUserLimitsForHighMemoryJobs()
+      throws IOException {
+    taskTrackerManager = new FakeTaskTrackerManager(1, 10, 10);
+    scheduler.setTaskTrackerManager(taskTrackerManager);
+    String[] qs = { "default" };
+    taskTrackerManager.addQueues(qs);
+    ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
+    queues.add(new FakeQueueInfo("default", 100.0f, true, 50));
+    resConf.setFakeQueues(queues);
+    // enabled memory-based scheduling
+    // Normal job in the cluster would be 1GB maps/reduces
+    scheduler.getConf().setLong(
+        JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY, 2 * 1024);
+    scheduler.getConf().setLong(
+        JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY, 1 * 1024);
+    scheduler.getConf().setLong(
+        JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY, 2 * 1024);
+    scheduler.getConf().setLong(
+        JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY, 1 * 1024);
+    scheduler.setResourceManagerConf(resConf);
+    scheduler.start();
+
+    // Submit one normal job to the other queue.
+    JobConf jConf = new JobConf(conf);
+    jConf.setMemoryForMapTask(1 * 1024);
+    jConf.setMemoryForReduceTask(1 * 1024);
+    jConf.setNumMapTasks(6);
+    jConf.setNumReduceTasks(6);
+    jConf.setUser("u1");
+    jConf.setQueueName("default");
+    FakeJobInProgress job1 = submitJobAndInit(JobStatus.PREP, jConf);
+
+    LOG.debug("Submit one high memory(2GB maps, 2GB reduces) job of "
+        + "6 map and 6 reduce tasks");
+    jConf = new JobConf(conf);
+    jConf.setMemoryForMapTask(2 * 1024);
+    jConf.setMemoryForReduceTask(2 * 1024);
+    jConf.setNumMapTasks(6);
+    jConf.setNumReduceTasks(6);
+    jConf.setQueueName("default");
+    jConf.setUser("u2");
+    FakeJobInProgress job2 = submitJobAndInit(JobStatus.PREP, jConf);
+
+    // Verify that normal job takes 3 task assignments to hit user limits
+    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_r_000002_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_m_000003_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_r_000003_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_m_000004_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_r_000004_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_m_000005_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_r_000005_0 on tt1");
+    // u1 has 5 map slots and 5 reduce slots. u2 has none. So u1's user limits
+    // are hit. So u2 should get slots
+
+    checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0002_m_000002_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0002_r_000002_0 on tt1");
+
+    // u1 has 5 map slots and 5 reduce slots. u2 has 4 map slots and 4 reduce
+    // slots. Because of high memory tasks, giving u2 another task would
+    // overflow limits. So, no more tasks should be given to anyone.
+    assertNull(scheduler.assignTasks(tracker("tt1")));
+    assertNull(scheduler.assignTasks(tracker("tt1")));
+  }
+
   /*
    * Following is the testing strategy for testing scheduling information.
    * - start capacity scheduler with two queues.
@@ -1324,21 +1390,35 @@
     scheduler.assignTasks(tracker("tt1")); // heartbeat
     scheduler.assignTasks(tracker("tt2")); // heartbeat
     int totalMaps = taskTrackerManager.getClusterStatus().getMaxMapTasks();
-    int totalReduces = taskTrackerManager.getClusterStatus().getMaxReduceTasks();
+    int totalReduces =
+        taskTrackerManager.getClusterStatus().getMaxReduceTasks();
     QueueManager queueManager = scheduler.taskTrackerManager.getQueueManager();
-    String schedulingInfo = queueManager.getJobQueueInfo("default").getSchedulingInfo();
-    String schedulingInfo2 = queueManager.getJobQueueInfo("q2").getSchedulingInfo();
+    String schedulingInfo =
+        queueManager.getJobQueueInfo("default").getSchedulingInfo();
+    String schedulingInfo2 =
+        queueManager.getJobQueueInfo("q2").getSchedulingInfo();
     String[] infoStrings = schedulingInfo.split("\n");
-    assertEquals(infoStrings.length, 16);
-    assertEquals(infoStrings[1] , "Capacity Percentage: 50.0%");
-    assertEquals(infoStrings[6] , "Capacity: " + totalMaps * 50/100);
-    assertEquals(infoStrings[10] , "Capacity: " + totalReduces * 50/100);
-    assertEquals(infoStrings[2] , "User Limit: 25%");
-    assertEquals(infoStrings[3] , "Priority Supported: YES");
-    assertEquals(infoStrings[7], "Running tasks: 0.0% of Capacity");
-    assertEquals(infoStrings[11],"Running tasks: 0.0% of Capacity");
-    assertEquals(infoStrings[14] , "Number of Waiting Jobs: 0");
-    assertEquals(infoStrings[15] , "Number of users who have submitted jobs: 0");
+    assertEquals(infoStrings.length, 18);
+    assertEquals(infoStrings[0], "Queue configuration");
+    assertEquals(infoStrings[1], "Capacity Percentage: 50.0%");
+    assertEquals(infoStrings[2], "User Limit: 25%");
+    assertEquals(infoStrings[3], "Priority Supported: YES");
+    assertEquals(infoStrings[4], "-------------");
+    assertEquals(infoStrings[5], "Map tasks");
+    assertEquals(infoStrings[6], "Capacity: " + totalMaps * 50 / 100
+        + " slots");
+    assertEquals(infoStrings[7], "Used capacity: 0 (0.0% of Capacity)");
+    assertEquals(infoStrings[8], "Running tasks: 0");
+    assertEquals(infoStrings[9], "-------------");
+    assertEquals(infoStrings[10], "Reduce tasks");
+    assertEquals(infoStrings[11], "Capacity: " + totalReduces * 50 / 100
+        + " slots");
+    assertEquals(infoStrings[12], "Used capacity: 0 (0.0% of Capacity)");
+    assertEquals(infoStrings[13], "Running tasks: 0");
+    assertEquals(infoStrings[14], "-------------");
+    assertEquals(infoStrings[15], "Job info");
+    assertEquals(infoStrings[16], "Number of Waiting Jobs: 0");
+    assertEquals(infoStrings[17], "Number of users who have submitted jobs: 0");
     assertEquals(schedulingInfo, schedulingInfo2);
 
     //Testing with actual job submission.
@@ -1349,10 +1429,13 @@
     infoStrings = schedulingInfo.split("\n");
 
     //waiting job should be equal to number of jobs submitted.
-    assertEquals(infoStrings.length, 16);
-    assertEquals(infoStrings[7], "Running tasks: 0.0% of Capacity");
-    assertEquals(infoStrings[11],"Running tasks: 0.0% of Capacity");
-    assertEquals(infoStrings[14] , "Number of Waiting Jobs: 5");
+    assertEquals(infoStrings.length, 18);
+    assertEquals(infoStrings[7], "Used capacity: 0 (0.0% of Capacity)");
+    assertEquals(infoStrings[8], "Running tasks: 0");
+    assertEquals(infoStrings[12], "Used capacity: 0 (0.0% of Capacity)");
+    assertEquals(infoStrings[13], "Running tasks: 0");
+    assertEquals(infoStrings[16], "Number of Waiting Jobs: 5");
+    assertEquals(infoStrings[17], "Number of users who have submitted jobs: 1");
 
     //Initalize the jobs but don't raise events
     controlledInitializationPoller.selectJobsToInitialize();
@@ -1360,12 +1443,14 @@
     schedulingInfo =
       queueManager.getJobQueueInfo("default").getSchedulingInfo();
     infoStrings = schedulingInfo.split("\n");
-    assertEquals(infoStrings.length, 16);
+    assertEquals(infoStrings.length, 18);
     //should be previous value as nothing is scheduled because no events
     //has been raised after initialization.
-    assertEquals(infoStrings[7], "Running tasks: 0.0% of Capacity");
-    assertEquals(infoStrings[11],"Running tasks: 0.0% of Capacity");
-    assertEquals(infoStrings[14] , "Number of Waiting Jobs: 5");
+    assertEquals(infoStrings[7], "Used capacity: 0 (0.0% of Capacity)");
+    assertEquals(infoStrings[8], "Running tasks: 0");
+    assertEquals(infoStrings[12], "Used capacity: 0 (0.0% of Capacity)");
+    assertEquals(infoStrings[13], "Running tasks: 0");
+    assertEquals(infoStrings[16], "Number of Waiting Jobs: 5");
 
     //Raise status change event so that jobs can move to running queue.
     raiseStatusChangeEvents(scheduler.jobQueuesManager);
@@ -1382,10 +1467,14 @@
     schedulingInfo =
       queueManager.getJobQueueInfo("default").getSchedulingInfo();
     infoStrings = schedulingInfo.split("\n");
-    assertEquals(infoStrings.length, 18);
-    assertEquals(infoStrings[7], "Running tasks: 100.0% of Capacity");
-    assertEquals(infoStrings[13],"Running tasks: 0.0% of Capacity");
-    assertEquals(infoStrings[16] , "Number of Waiting Jobs: 4");
+    assertEquals(infoStrings.length, 20);
+    assertEquals(infoStrings[7], "Used capacity: 1 (100.0% of Capacity)");
+    assertEquals(infoStrings[8], "Running tasks: 1");
+    assertEquals(infoStrings[9], "Active users:");
+    assertEquals(infoStrings[10], "User 'u1': 1 (100.0% of used capacity)");
+    assertEquals(infoStrings[14], "Used capacity: 0 (0.0% of Capacity)");
+    assertEquals(infoStrings[15], "Running tasks: 0");
+    assertEquals(infoStrings[18], "Number of Waiting Jobs: 4");
 
     //assign a reduce task
     Task t2 = checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
@@ -1394,10 +1483,16 @@
     schedulingInfo =
       queueManager.getJobQueueInfo("default").getSchedulingInfo();
     infoStrings = schedulingInfo.split("\n");
-    assertEquals(infoStrings.length, 20);
-    assertEquals(infoStrings[7], "Running tasks: 100.0% of Capacity");
-    assertEquals(infoStrings[13],"Running tasks: 100.0% of Capacity");
-    assertEquals(infoStrings[18] , "Number of Waiting Jobs: 4");
+    assertEquals(infoStrings.length, 22);
+    assertEquals(infoStrings[7], "Used capacity: 1 (100.0% of Capacity)");
+    assertEquals(infoStrings[8], "Running tasks: 1");
+    assertEquals(infoStrings[9], "Active users:");
+    assertEquals(infoStrings[10], "User 'u1': 1 (100.0% of used capacity)");
+    assertEquals(infoStrings[14], "Used capacity: 1 (100.0% of Capacity)");
+    assertEquals(infoStrings[15], "Running tasks: 1");
+    assertEquals(infoStrings[16], "Active users:");
+    assertEquals(infoStrings[17], "User 'u1': 1 (100.0% of used capacity)");
+    assertEquals(infoStrings[20], "Number of Waiting Jobs: 4");
 
     //Complete the job and check the running tasks count
     FakeJobInProgress u1j1 = userJobs.get(0);
@@ -1410,10 +1505,12 @@
     schedulingInfo =
       queueManager.getJobQueueInfo("default").getSchedulingInfo();
     infoStrings = schedulingInfo.split("\n");
-    assertEquals(infoStrings.length, 16);
-    assertEquals(infoStrings[7], "Running tasks: 0.0% of Capacity");
-    assertEquals(infoStrings[11],"Running tasks: 0.0% of Capacity");
-    assertEquals(infoStrings[14] , "Number of Waiting Jobs: 4");
+    assertEquals(infoStrings.length, 18);
+    assertEquals(infoStrings[7], "Used capacity: 0 (0.0% of Capacity)");
+    assertEquals(infoStrings[8], "Running tasks: 0");
+    assertEquals(infoStrings[12], "Used capacity: 0 (0.0% of Capacity)");
+    assertEquals(infoStrings[13], "Running tasks: 0");
+    assertEquals(infoStrings[16], "Number of Waiting Jobs: 4");
 
     //Fail a job which is initialized but not scheduled and check the count.
     FakeJobInProgress u1j2 = userJobs.get(1);
@@ -1427,10 +1524,14 @@
     schedulingInfo =
       queueManager.getJobQueueInfo("default").getSchedulingInfo();
     infoStrings = schedulingInfo.split("\n");
-    assertEquals(infoStrings.length, 16);
-    assertEquals(infoStrings[7], "Running tasks: 0.0% of Capacity");
-    assertEquals(infoStrings[11],"Running tasks: 0.0% of Capacity");
-    assertEquals(infoStrings[14] , "Number of Waiting Jobs: 3");
+    assertEquals(infoStrings.length, 18);
+    //should be previous value as nothing is scheduled because no events
+    //has been raised after initialization.
+    assertEquals(infoStrings[7], "Used capacity: 0 (0.0% of Capacity)");
+    assertEquals(infoStrings[8], "Running tasks: 0");
+    assertEquals(infoStrings[12], "Used capacity: 0 (0.0% of Capacity)");
+    assertEquals(infoStrings[13], "Running tasks: 0");
+    assertEquals(infoStrings[16], "Number of Waiting Jobs: 3");
 
     //Fail a job which is not initialized but is in the waiting queue.
     FakeJobInProgress u1j5 = userJobs.get(4);
@@ -1445,10 +1546,14 @@
     schedulingInfo =
       queueManager.getJobQueueInfo("default").getSchedulingInfo();
     infoStrings = schedulingInfo.split("\n");
-    assertEquals(infoStrings.length, 16);
-    assertEquals(infoStrings[7], "Running tasks: 0.0% of Capacity");
-    assertEquals(infoStrings[11],"Running tasks: 0.0% of Capacity");
-    assertEquals(infoStrings[14] , "Number of Waiting Jobs: 2");
+    assertEquals(infoStrings.length, 18);
+    //should be previous value as nothing is scheduled because no events
+    //has been raised after initialization.
+    assertEquals(infoStrings[7], "Used capacity: 0 (0.0% of Capacity)");
+    assertEquals(infoStrings[8], "Running tasks: 0");
+    assertEquals(infoStrings[12], "Used capacity: 0 (0.0% of Capacity)");
+    assertEquals(infoStrings[13], "Running tasks: 0");
+    assertEquals(infoStrings[16], "Number of Waiting Jobs: 2");
 
     //Raise status change events as none of the intialized jobs would be
     //in running queue as we just failed the second job which was initialized
@@ -1471,10 +1576,12 @@
     schedulingInfo =
       queueManager.getJobQueueInfo("default").getSchedulingInfo();
     infoStrings = schedulingInfo.split("\n");
-    assertEquals(infoStrings.length, 18);
-    assertEquals(infoStrings[7], "Running tasks: 100.0% of Capacity");
-    assertEquals(infoStrings[13],"Running tasks: 0.0% of Capacity");
-    assertEquals(infoStrings[16] , "Number of Waiting Jobs: 1");
+    assertEquals(infoStrings.length, 20);
+    assertEquals(infoStrings[7], "Used capacity: 1 (100.0% of Capacity)");
+    assertEquals(infoStrings[8], "Running tasks: 1");
+    assertEquals(infoStrings[9], "Active users:");
+    assertEquals(infoStrings[10], "User 'u1': 1 (100.0% of used capacity)");
+    assertEquals(infoStrings[18], "Number of Waiting Jobs: 1");
 
     //Fail the executing job
     taskTrackerManager.finalizeJob(u1j3, JobStatus.FAILED);
@@ -1484,11 +1591,10 @@
     schedulingInfo =
       queueManager.getJobQueueInfo("default").getSchedulingInfo();
     infoStrings = schedulingInfo.split("\n");
-    assertEquals(infoStrings.length, 16);
-    assertEquals(infoStrings[7], "Running tasks: 0.0% of Capacity");
-    assertEquals(infoStrings[11],"Running tasks: 0.0% of Capacity");
-    assertEquals(infoStrings[14] , "Number of Waiting Jobs: 1");
-
+    assertEquals(infoStrings.length, 18);
+    assertEquals(infoStrings[7], "Used capacity: 0 (0.0% of Capacity)");
+    assertEquals(infoStrings[8], "Running tasks: 0");
+    assertEquals(infoStrings[16], "Number of Waiting Jobs: 1");
   }
 
   /**
@@ -1562,8 +1668,10 @@
     scheduler.start();
 
     // The situation : Two jobs in the queue. First job with only maps and no
-    // reduces and is a high memory job. Second job is a normal job with both maps and reduces.
-    // First job cannot run for want of memory for maps. In this case, second job's reduces should run.
+    // reduces and is a high memory job. Second job is a normal job with both
+    // maps and reduces.
+    // First job cannot run for want of memory for maps. In this case, second
+    // job's reduces should run.
     
     LOG.debug("Submit one high memory(2GB maps, 0MB reduces) job of "
         + "2 map tasks");
@@ -1589,9 +1697,17 @@
     
     // first, a map from j1 will run
     checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+    // Total 2 map slots should be accounted for.
+    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 2, 100.0f);
+    checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 0L);
+
     // at this point, the scheduler tries to schedule another map from j1. 
     // there isn't enough space. The second job's reduce should be scheduled.
     checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1");
+    // Total 1 reduce slot should be accounted for.
+    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.REDUCE, 1, 1,
+        100.0f);
+    checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 1 * 1024L);
   }
 
   /**
@@ -1625,7 +1741,7 @@
     scheduler.start();
 
     LOG.debug("Submit one normal memory(1GB maps/reduces) job of "
-        + "1 map, 0 reduce tasks.");
+        + "1 map, 1 reduce tasks.");
     JobConf jConf = new JobConf(conf);
     jConf.setMemoryForMapTask(1 * 1024);
     jConf.setMemoryForReduceTask(1 * 1024);
@@ -1637,7 +1753,20 @@
 
     // Fill the second tt with this job.
     checkAssignment("tt2", "attempt_test_0001_m_000001_0 on tt2");
+    // Total 1 map slot should be accounted for.
+    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 1, 25.0f);
+    assertEquals(String.format(
+        CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING, 1, 1, 0, 0),
+        (String) job1.getSchedulingInfo());
+    checkMemReservedForTasksOnTT("tt2", 1 * 1024L, 0L);
     checkAssignment("tt2", "attempt_test_0001_r_000001_0 on tt2");
+    // Total 1 map slot should be accounted for.
+    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.REDUCE, 1, 1,
+        25.0f);
+    assertEquals(String.format(
+        CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING, 1, 1, 1, 1),
+        (String) job1.getSchedulingInfo());
+    checkMemReservedForTasksOnTT("tt2", 1 * 1024L, 1 * 1024L);
 
     LOG.debug("Submit one high memory(2GB maps/reduces) job of "
         + "2 map, 2 reduce tasks.");
@@ -1651,7 +1780,21 @@
     FakeJobInProgress job2 = submitJobAndInit(JobStatus.PREP, jConf);
 
     checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
+    // Total 3 map slots should be accounted for.
+    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 3, 75.0f);
+    assertEquals(String.format(
+        CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING, 1, 2, 0, 0),
+        (String) job2.getSchedulingInfo());
+    checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 0L);
+
     checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1");
+    // Total 3 reduce slots should be accounted for.
+    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.REDUCE, 1, 3,
+        75.0f);
+    assertEquals(String.format(
+        CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING, 1, 2, 1, 2),
+        (String) job2.getSchedulingInfo());
+    checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 2 * 1024L);
 
     LOG.debug("Submit one normal memory(1GB maps/reduces) job of "
         + "1 map, 0 reduce tasks.");
@@ -1669,6 +1812,17 @@
     assertNull(scheduler.assignTasks(tracker("tt2")));
     assertNull(scheduler.assignTasks(tracker("tt1")));
     assertNull(scheduler.assignTasks(tracker("tt2")));
+    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 3, 75.0f);
+    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.REDUCE, 1, 3,
+        75.0f);
+    checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 2 * 1024L);
+    checkMemReservedForTasksOnTT("tt2", 1 * 1024L, 1 * 1024L);
+    assertEquals(String.format(
+        CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING, 1, 2, 1, 2),
+        (String) job2.getSchedulingInfo());
+    assertEquals(String.format(
+        CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING, 0, 0, 0, 0),
+        (String) job3.getSchedulingInfo());
   }
 
   /**
@@ -1717,9 +1871,23 @@
 
     // 1st cycle - 1 map gets assigned.
     Task t = checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+    // Total 1 map slot should be accounted for.
+    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 1, 50.0f);
+    checkMemReservedForTasksOnTT("tt1",  512L, 0L);
+
+    // 1st cycle of reduces - 1 reduce gets assigned.
+    Task t1 = checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
+    // Total 1 reduce slot should be accounted for.
+    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.REDUCE, 1, 1,
+        50.0f);
+    checkMemReservedForTasksOnTT("tt1",  512L, 512L);
     
     // kill this job !
     taskTrackerManager.killJob(job1.getJobID());
+    // No more map/reduce slots should be accounted for.
+    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 0, 0, 0.0f);
+    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.REDUCE, 0, 0,
+        0.0f);
     
     // retire the job
     taskTrackerManager.removeJob(job1.getJobID());
@@ -1738,30 +1906,30 @@
     // 2nd cycle - nothing should get assigned. Memory matching code
     // will see the job is missing and fail memory requirements.
     assertNull(scheduler.assignTasks(tracker("tt1")));
+    checkMemReservedForTasksOnTT("tt1", null, null);
+
     // calling again should not make a difference, as the task is still running
     assertNull(scheduler.assignTasks(tracker("tt1")));
+    checkMemReservedForTasksOnTT("tt1", null, null);
     
-    // finish the task on the tracker.
+    // finish the tasks on the tracker.
     taskTrackerManager.finishTask("tt1", t.getTaskID().toString(), job1);
+    taskTrackerManager.finishTask("tt1", t1.getTaskID().toString(), job1);
+
     // now a new task can be assigned.
     t = checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
+    // Total 1 map slots should be accounted for.
+    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 1, 50.0f);
+    checkMemReservedForTasksOnTT("tt1", 512L, 0L);
+
     // reduce can be assigned.
     t = checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1");
+    // Total 1 reduce slots should be accounted for.
+    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.REDUCE, 1, 1,
+        50.0f);
+    checkMemReservedForTasksOnTT("tt1", 512L, 512L);
   }
-  
-  protected TaskTrackerStatus tracker(String taskTrackerName) {
-    return taskTrackerManager.getTaskTracker(taskTrackerName);
-  }
-  
-  protected Task checkAssignment(String taskTrackerName,
-      String expectedTaskString) throws IOException {
-    List<Task> tasks = scheduler.assignTasks(tracker(taskTrackerName));
-    assertNotNull(expectedTaskString, tasks);
-    assertEquals(expectedTaskString, 1, tasks.size());
-    assertEquals(expectedTaskString, tasks.get(0).toString());
-    return tasks.get(0);
-  }
-  
+
   /*
    * Test cases for Job Initialization poller.
    */
@@ -1968,10 +2136,9 @@
     checkFailedInitializedJobMovement();
 
     // Check failed waiting job movement
-    checkFailedWaitingJobMovement();
-    
+    checkFailedWaitingJobMovement(); 
   }
-  
+
   public void testStartWithoutDefaultQueueConfigured() throws Exception {
     //configure a single queue which is not default queue
     String[] qs = {"q1"};
@@ -2087,9 +2254,9 @@
     checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1");
     taskTrackerManager.finishTask("tt1", "attempt_test_0002_m_000001_0", fjob2);
     taskTrackerManager.finishTask("tt2", "attempt_test_0002_r_000001_0", fjob2);
-    taskTrackerManager.finalizeJob(fjob2);
-    
+    taskTrackerManager.finalizeJob(fjob2);    
   }
+
   /**
    * Test case to test scheduling of
    * <ol> 
@@ -2105,17 +2272,16 @@
    * <ul>
    * <li>Submit one high ram job which has speculative reduce.</li>
    * <li>Submit a normal job which has no speculative reduce.</li>
-   * <li>Scheduler should schedule first all reduce tasks from first job and block
-   * the cluster till both reduces are completed.</li>
+   * <li>Scheduler should schedule first all reduce tasks from first job and
+   * block the cluster till both reduces are completed.</li>
    * </ul>
    * </li>
    * </ol>
    * @throws IOException
    */
   public void testHighRamJobWithSpeculativeExecution() throws IOException {
-    // 2 map and 2 reduce slots
+    // 2 TTs, 3 map and 3 reduce slots on each TT
     taskTrackerManager = new FakeTaskTrackerManager(2, 3, 3);
-    // 1GB for each map, 1GB for each reduce
 
     taskTrackerManager.addQueues(new String[] { "default" });
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
@@ -2123,6 +2289,7 @@
     resConf.setFakeQueues(queues);
     scheduler.setTaskTrackerManager(taskTrackerManager);
     // enabled memory-based scheduling
+    // 1GB for each map, 1GB for each reduce
     scheduler.getConf().setLong(
         JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY,
         3 * 1024L);
@@ -2146,8 +2313,9 @@
     jConf.setUser("u1");
     jConf.setMapSpeculativeExecution(true);
     jConf.setReduceSpeculativeExecution(false);
-    FakeJobInProgress job1 = new FakeJobInProgress(new JobID("test", ++jobCounter),
-          jConf, taskTrackerManager,"u1");
+    FakeJobInProgress job1 =
+        new FakeJobInProgress(new JobID("test", ++jobCounter), jConf,
+            taskTrackerManager, "u1");
     taskTrackerManager.submitJob(job1);
 
     //Submit normal job
@@ -2170,17 +2338,31 @@
     //scheduled. This task would be scheduled. Till the tasks from job1 gets
     //complete none of the tasks from other jobs would be scheduled.
     checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+    checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 0L);
     assertEquals("pending maps greater than zero " , job1.pendingMaps(), 0);
+    // Total 2 map slots should be accounted for.
+    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 2, 33.3f);
+
     //make same tracker get back, check if you are blocking. Your job
     //has speculative map task so tracker should be blocked even tho' it
     //can run job2's map.
     assertNull(scheduler.assignTasks(tracker("tt1")));
+    // Total 2 map slots should be accounted for.
+    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 2, 33.3f);
+    checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 0L);
+
     //TT2 now gets speculative map of the job1
     checkAssignment("tt2", "attempt_test_0001_m_000001_1 on tt2");
+    // Total 4 map slots should be accounted for.
+    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 4, 66.7f);
+    checkMemReservedForTasksOnTT("tt2", 2 * 1024L, 0L);
 
     // Now since the first job has no more speculative maps, it can schedule
     // the second job.
     checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
+    // Total 5 map slots should be accounted for.
+    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 5, 83.3f);
+    checkMemReservedForTasksOnTT("tt1", 3 * 1024L, 0L);
 
     //finish everything
     taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000001_0", 
@@ -2202,14 +2384,15 @@
     jConf.setUser("u1");
     jConf.setMapSpeculativeExecution(false);
     jConf.setReduceSpeculativeExecution(true);
-    FakeJobInProgress job3 = new FakeJobInProgress(new JobID("test", ++jobCounter),
-          jConf, taskTrackerManager,"u1");
+    FakeJobInProgress job3 =
+        new FakeJobInProgress(new JobID("test", ++jobCounter), jConf,
+            taskTrackerManager, "u1");
     taskTrackerManager.submitJob(job3);
 
     //Submit normal job w.r.t reduces
     jConf = new JobConf();
-    jConf.setMemoryForMapTask(2 * 1024L);
-    jConf.setMemoryForReduceTask(1 * 104L);
+    jConf.setMemoryForMapTask(1 * 1024L);
+    jConf.setMemoryForReduceTask(1 * 1024L);
     jConf.setNumMapTasks(1);
     jConf.setNumReduceTasks(1);
     jConf.setQueueName("default");
@@ -2223,24 +2406,165 @@
 
     // Finish up the map scheduler
     checkAssignment("tt1", "attempt_test_0003_m_000001_0 on tt1");
+    // Total 2 map slots should be accounted for.
+    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 2, 33.3f);
+    checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 0L);
+
     checkAssignment("tt2", "attempt_test_0004_m_000001_0 on tt2");
+    // Total 3 map slots should be accounted for.
+    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 3, 50.0f);
+    checkMemReservedForTasksOnTT("tt2", 1 * 1024L, 0L);
 
     // first, a reduce from j3 will run
     // at this point, there is a speculative task for the same job to be
     //scheduled. This task would be scheduled. Till the tasks from job3 gets
     //complete none of the tasks from other jobs would be scheduled.
     checkAssignment("tt1", "attempt_test_0003_r_000001_0 on tt1");
-    assertEquals("pending reduces greater than zero " , job3.pendingMaps(), 0);
+    assertEquals("pending reduces greater than zero ", job3.pendingReduces(),
+        0);
+    // Total 2 reduce slots should be accounted for.
+    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.REDUCE, 1, 2,
+        33.3f);
+    checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 2*1024L);
+
     //make same tracker get back, check if you are blocking. Your job
     //has speculative reduce task so tracker should be blocked even tho' it
     //can run job4's reduce.
     assertNull(scheduler.assignTasks(tracker("tt1")));
-    //TT2 now gets speculative map of the job1
+    // Total 2 reduce slots should be accounted for.
+    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.REDUCE, 1, 2,
+        33.3f);
+
+    //TT2 now gets speculative reduce of the job3
     checkAssignment("tt2", "attempt_test_0003_r_000001_1 on tt2");
+    // Total 4 reduce slots should be accounted for.
+    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.REDUCE, 1, 4,
+        66.7f);
+    checkMemReservedForTasksOnTT("tt2", 1 * 1024L, 2 * 1024L);
 
     // Now since j3 has no more speculative reduces, it can schedule
     // the j4.
     checkAssignment("tt1", "attempt_test_0004_r_000001_0 on tt1");
+    // Total 5 reduce slots should be accounted for.
+    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.REDUCE, 1, 5,
+        83.3f);
+    checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 3 * 1024L);
+  }
+
+  /**
+   * Test to verify that queue ordering is based on the number of slots occupied
+   * and hence to verify that presence of high memory jobs is reflected properly
+   * while determining used capacities of queues and hence the queue ordering.
+   * 
+   * @throws IOException
+   */
+  public void testQueueOrdering()
+      throws IOException {
+    taskTrackerManager = new FakeTaskTrackerManager(2, 6, 6);
+    scheduler.setTaskTrackerManager(taskTrackerManager);
+    String[] qs = { "default", "q1" };
+    String[] reversedQs = { qs[1], qs[0] };
+    taskTrackerManager.addQueues(qs);
+    ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
+    queues.add(new FakeQueueInfo("default", 50.0f, true, 100));
+    queues.add(new FakeQueueInfo("q1", 50.0f, true, 100));
+    resConf.setFakeQueues(queues);
+    // enabled memory-based scheduling
+    // Normal job in the cluster would be 1GB maps/reduces
+    scheduler.getConf().setLong(
+        JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY, 2 * 1024);
+    scheduler.getConf().setLong(
+        JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY, 1 * 1024);
+    scheduler.getConf().setLong(
+        JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY, 1 * 1024);
+    scheduler.getConf().setLong(
+        JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY, 1 * 1024);
+    scheduler.setResourceManagerConf(resConf);
+    scheduler.start();
+
+    LOG.debug("Submit one high memory(2GB maps, 2GB reduces) job of "
+        + "6 map and 6 reduce tasks");
+    JobConf jConf = new JobConf(conf);
+    jConf.setMemoryForMapTask(2 * 1024);
+    jConf.setMemoryForReduceTask(2 * 1024);
+    jConf.setNumMapTasks(6);
+    jConf.setNumReduceTasks(6);
+    jConf.setQueueName("default");
+    jConf.setUser("u1");
+    FakeJobInProgress job1 = submitJobAndInit(JobStatus.PREP, jConf);
+
+    // Submit a normal job to the other queue.
+    jConf = new JobConf(conf);
+    jConf.setMemoryForMapTask(1 * 1024);
+    jConf.setMemoryForReduceTask(1 * 1024);
+    jConf.setNumMapTasks(6);
+    jConf.setNumReduceTasks(6);
+    jConf.setUser("u1");
+    jConf.setQueueName("q1");
+    FakeJobInProgress job2 = submitJobAndInit(JobStatus.PREP, jConf);
+
+    // Map 1 of high memory job
+    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+    checkQueuesOrder(qs, scheduler
+        .getOrderedQueues(CapacityTaskScheduler.TYPE.MAP));
+
+    // Reduce 1 of high memory job
+    checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
+    checkQueuesOrder(qs, scheduler
+        .getOrderedQueues(CapacityTaskScheduler.TYPE.REDUCE));
+
+    // Map 1 of normal job
+    checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
+    checkQueuesOrder(reversedQs, scheduler
+        .getOrderedQueues(CapacityTaskScheduler.TYPE.MAP));
+
+    // Reduce 1 of normal job
+    checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1");
+    checkQueuesOrder(reversedQs, scheduler
+        .getOrderedQueues(CapacityTaskScheduler.TYPE.REDUCE));
+
+    // Map 2 of normal job
+    checkAssignment("tt1", "attempt_test_0002_m_000002_0 on tt1");
+    checkQueuesOrder(reversedQs, scheduler
+        .getOrderedQueues(CapacityTaskScheduler.TYPE.MAP));
+
+    // Reduce 2 of normal job
+    checkAssignment("tt1", "attempt_test_0002_r_000002_0 on tt1");
+    checkQueuesOrder(reversedQs, scheduler
+        .getOrderedQueues(CapacityTaskScheduler.TYPE.REDUCE));
+
+    // Now both the queues are equally served. But the comparator doesn't change
+    // the order if queues are equally served.
+
+    // Map 3 of normal job
+    checkAssignment("tt2", "attempt_test_0002_m_000003_0 on tt2");
+    checkQueuesOrder(reversedQs, scheduler
+        .getOrderedQueues(CapacityTaskScheduler.TYPE.MAP));
+
+    // Reduce 3 of normal job
+    checkAssignment("tt2", "attempt_test_0002_r_000003_0 on tt2");
+    checkQueuesOrder(reversedQs, scheduler
+        .getOrderedQueues(CapacityTaskScheduler.TYPE.REDUCE));
+
+    // Map 2 of high memory job
+    checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2");
+    checkQueuesOrder(qs, scheduler
+        .getOrderedQueues(CapacityTaskScheduler.TYPE.MAP));
+
+    // Reduce 2 of high memory job
+    checkAssignment("tt2", "attempt_test_0001_r_000002_0 on tt2");
+    checkQueuesOrder(qs, scheduler
+        .getOrderedQueues(CapacityTaskScheduler.TYPE.REDUCE));
+
+    // Map 4 of normal job
+    checkAssignment("tt2", "attempt_test_0002_m_000004_0 on tt2");
+    checkQueuesOrder(reversedQs, scheduler
+        .getOrderedQueues(CapacityTaskScheduler.TYPE.MAP));
+
+    // Reduce 4 of normal job
+    checkAssignment("tt2", "attempt_test_0002_r_000004_0 on tt2");
+    checkQueuesOrder(reversedQs, scheduler
+        .getOrderedQueues(CapacityTaskScheduler.TYPE.REDUCE));
   }
 
   private void checkRunningJobMovementAndCompletion() throws IOException {
@@ -2386,6 +2710,89 @@
       userJobs.put(user, jips);
     }
     return userJobs;
+  }
+
+  
+  protected TaskTrackerStatus tracker(String taskTrackerName) {
+    return taskTrackerManager.getTaskTracker(taskTrackerName);
+  }
+  
+  protected Task checkAssignment(String taskTrackerName,
+      String expectedTaskString) throws IOException {
+    List<Task> tasks = scheduler.assignTasks(tracker(taskTrackerName));
+    assertNotNull(expectedTaskString, tasks);
+    assertEquals(expectedTaskString, 1, tasks.size());
+    assertEquals(expectedTaskString, tasks.get(0).toString());
+    return tasks.get(0);
+  }
+
+  /**
+   * Get the amount of memory that is reserved for tasks on the taskTracker and
+   * verify that it matches what is expected.
+   * 
+   * @param taskTracker
+   * @param expectedMemForMapsOnTT
+   * @param expectedMemForReducesOnTT
+   */
+  private void checkMemReservedForTasksOnTT(String taskTracker,
+      Long expectedMemForMapsOnTT, Long expectedMemForReducesOnTT) {
+    Long observedMemForMapsOnTT =
+        scheduler.memoryMatcher.getMemReservedForTasks(tracker(taskTracker),
+            CapacityTaskScheduler.TYPE.MAP);
+    Long observedMemForReducesOnTT =
+        scheduler.memoryMatcher.getMemReservedForTasks(tracker(taskTracker),
+            CapacityTaskScheduler.TYPE.REDUCE);
+    if (expectedMemForMapsOnTT == null) {
+      assertTrue(observedMemForMapsOnTT == null);
+    } else {
+      assertTrue(observedMemForMapsOnTT.equals(expectedMemForMapsOnTT));
+    }
+    if (expectedMemForReducesOnTT == null) {
+      assertTrue(observedMemForReducesOnTT == null);
+    } else {
+      assertTrue(observedMemForReducesOnTT.equals(expectedMemForReducesOnTT));
+    }
+  }
 
+  /**
+   * Verify the number of slots of type 'type' from the queue 'queue'.
+   * 
+   * @param queue
+   * @param type
+   * @param numActiveUsers in the queue at present.
+   * @param expectedOccupiedSlots
+   * @param expectedOccupiedSlotsPercent
+   * @return
+   */
+  private void checkOccupiedSlots(String queue,
+      CapacityTaskScheduler.TYPE type, int numActiveUsers,
+      int expectedOccupiedSlots, float expectedOccupiedSlotsPercent) {
+    scheduler.updateQSIInfoForTests();
+    QueueManager queueManager = scheduler.taskTrackerManager.getQueueManager();
+    String schedulingInfo =
+        queueManager.getJobQueueInfo(queue).getSchedulingInfo();
+    String[] infoStrings = schedulingInfo.split("\n");
+    int index = -1;
+    if (type.equals(CapacityTaskScheduler.TYPE.MAP)) {
+      index = 7;
+    } else if (type.equals(CapacityTaskScheduler.TYPE.REDUCE)) {
+      index = (numActiveUsers == 0 ? 12 : 13 + numActiveUsers);
+    }
+    LOG.info(infoStrings[index]);
+    assertEquals(String.format("Used capacity: %d (%.1f%% of Capacity)",
+        expectedOccupiedSlots, expectedOccupiedSlotsPercent),
+        infoStrings[index]);
+  }
+
+  private void checkQueuesOrder(String[] expectedOrder, String[] observedOrder) {
+    assertTrue("Observed and expected queues are not of same length.",
+        expectedOrder.length == observedOrder.length);
+    int i = 0;
+    for (String expectedQ : expectedOrder) {
+      assertTrue("Observed and expected queues are not in the same order. "
+          + "Differ at index " + i + ". Got " + observedOrder[i]
+          + " instead of " + expectedQ, expectedQ.equals(observedOrder[i]));
+      i++;
+    }
   }
 }

Modified: hadoop/core/branches/HADOOP-3628-2/src/contrib/eclipse-plugin/build.xml
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/contrib/eclipse-plugin/build.xml?rev=783055&r1=783054&r2=783055&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/contrib/eclipse-plugin/build.xml (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/contrib/eclipse-plugin/build.xml Tue Jun  9 16:11:19 2009
@@ -67,7 +67,8 @@
   <target name="jar" depends="compile" unless="skip.contrib">
     <mkdir dir="${build.dir}/lib"/>
     <copy file="${hadoop.root}/build/hadoop-${version}-core.jar" tofile="${build.dir}/lib/hadoop-core.jar" verbose="true"/>
-    <copy file="${hadoop.root}/lib/commons-cli-2.0-SNAPSHOT.jar" todir="${build.dir}/lib" verbose="true"/>
+    <copy file="${hadoop.root}/build/ivy/lib/Hadoop/common/commons-cli-${commons-cli.version}.jar"  todir="${build.dir}/lib" verbose="true"/>
+    <copy file="${hadoop.root}/build/ivy/lib/Hadoop/common/commons-cli-${commons-cli2.version}.jar"  todir="${build.dir}/lib" verbose="true"/>
     <jar
       jarfile="${build.dir}/hadoop-${version}-${name}.jar"
       manifest="${root}/META-INF/MANIFEST.MF">

Modified: hadoop/core/branches/HADOOP-3628-2/src/contrib/hdfsproxy/bin/hdfsproxy
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/contrib/hdfsproxy/bin/hdfsproxy?rev=783055&r1=783054&r2=783055&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/contrib/hdfsproxy/bin/hdfsproxy (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/contrib/hdfsproxy/bin/hdfsproxy Tue Jun  9 16:11:19 2009
@@ -114,12 +114,17 @@
     CLASSPATH=${CLASSPATH}:$f;
   done
 fi
-
 if [ -d "$HDFSPROXY_HOME/../../lib" ]; then
 	for f in $HDFSPROXY_HOME/../../lib/*.jar; do
     CLASSPATH=${CLASSPATH}:$f;
   done
 fi
+if [ -d "$HDFSPROXY_HOME/../../lib/jsp-2.1" ]; then
+	for f in $HDFSPROXY_HOME/../../lib/jsp-2.1/*.jar; do
+    CLASSPATH=${CLASSPATH}:$f;
+  done
+fi
+
 
 # add user-specified CLASSPATH last
 if [ "$HDFSPROXY_CLASSPATH" != "" ]; then

Modified: hadoop/core/branches/HADOOP-3628-2/src/contrib/hdfsproxy/build.xml
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/contrib/hdfsproxy/build.xml?rev=783055&r1=783054&r2=783055&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/contrib/hdfsproxy/build.xml (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/contrib/hdfsproxy/build.xml Tue Jun  9 16:11:19 2009
@@ -123,7 +123,7 @@
         <include name="slf4j-api-${slf4j-api.version}.jar"/>
         <include name="slf4j-log4j12-${slf4j-log4j12.version}.jar"/>
         <include name="xmlenc-${xmlenc.version}.jar"/>
-        <include name="core-${core.version}.jar"/> 
+        <include name="core-${core.vesion}.jar"/> 
 	    </lib>
 	    <classes dir="${proxy.conf.dir}" excludes="**/*.example **/*.template **/*.sh hadoop-site.xml"/>
 	    <classes dir="${build.classes}"/>
@@ -143,7 +143,7 @@
         <include name="slf4j-api-${slf4j-api.version}.jar"/>
         <include name="slf4j-log4j12-${slf4j-log4j12.version}.jar"/>
         <include name="xmlenc-${xmlenc.version}.jar"/>
-        <include name="core-${core.version}.jar"/> 
+        <include name="core-${core.vesion}.jar"/> 
 	    </lib>
 	    <classes dir="${proxy.conf.dir}" excludes="**/*.example **/*.template **/*.sh hadoop-site.xml"/>
 	    <classes dir="${build.classes}"/>
@@ -347,10 +347,12 @@
         <include name="jetty-util-${jetty-util.version}.jar"/>
         <include name="jetty-${jetty.version}.jar"/>
         <include name="servlet-api-2.5-${servlet-api-2.5.version}.jar"/>
-        <include name="core-${core.version}.jar"/> 
-        <include name="jsp-${jsp.version}-${jetty.version}.jar"/> 
-        <include name="jsp-api-${jsp.version}-${jetty.version}.jar"/> 
-		       </fileset>
+        <include name="core-${core.vesion}.jar"/> 
+                       </fileset>
+		       <fileset dir="${hadoop.root}/lib/jsp-${jsp.version}">
+        <include name="jsp-${jsp.version}.jar"/> 
+        <include name="jsp-api-${jsp.version}.jar"/> 
+			</fileset>
 		</copy>
 
 		<copy todir="${build.dir}/${final.name}/lib" includeEmptyDirs="false">

Modified: hadoop/core/branches/HADOOP-3628-2/src/contrib/hdfsproxy/ivy.xml
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/contrib/hdfsproxy/ivy.xml?rev=783055&r1=783054&r2=783055&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/contrib/hdfsproxy/ivy.xml (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/contrib/hdfsproxy/ivy.xml Tue Jun  9 16:11:19 2009
@@ -22,6 +22,10 @@
     <artifact conf="master"/>
   </publications>
   <dependencies>
+    <dependency org="commons-cli"
+      name="commons-cli"
+      rev="${commons-cli.version}"
+      conf="common->default"/>
     <dependency org="log4j"
       name="log4j"
       rev="${log4j.version}"

Modified: hadoop/core/branches/HADOOP-3628-2/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ConnFactory.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ConnFactory.java?rev=783055&r1=783054&r2=783055&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ConnFactory.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ConnFactory.java Tue Jun  9 16:11:19 2009
@@ -21,6 +21,7 @@
 import org.apache.hadoop.sqoop.manager.ConnManager;
 import org.apache.hadoop.sqoop.manager.GenericJdbcManager;
 import org.apache.hadoop.sqoop.manager.HsqldbManager;
+import org.apache.hadoop.sqoop.manager.LocalMySQLManager;
 import org.apache.hadoop.sqoop.manager.MySQLManager;
 
 import java.io.IOException;
@@ -70,7 +71,11 @@
     }
 
     if (scheme.equals("jdbc:mysql:")) {
-      return new MySQLManager(opts);
+      if (opts.isLocal()) {
+        return new LocalMySQLManager(opts);
+      } else {
+        return new MySQLManager(opts);
+      }
     } else if (scheme.equals("jdbc:hsqldb:hsql:")) {
       return new HsqldbManager(opts);
     } else {

Modified: hadoop/core/branches/HADOOP-3628-2/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ImportOptions.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ImportOptions.java?rev=783055&r1=783054&r2=783055&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ImportOptions.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ImportOptions.java Tue Jun  9 16:11:19 2009
@@ -92,6 +92,9 @@
   private String driverClassName;
   private String warehouseDir;
   private FileLayout layout;
+  private boolean local; // if true and conn is mysql, use mysqldump.
+
+  private String tmpDir; // where temp data goes; usually /tmp
 
   private static final String DEFAULT_CONFIG_FILE = "sqoop.properties";
 
@@ -136,6 +139,10 @@
       this.driverClassName = props.getProperty("jdbc.driver", this.driverClassName);
       this.warehouseDir = props.getProperty("hdfs.warehouse.dir", this.warehouseDir);
 
+      String localImport = props.getProperty("local.import",
+          Boolean.toString(this.local)).toLowerCase();
+      this.local = "true".equals(localImport) || "yes".equals(localImport)
+          || "1".equals(localImport);
     } catch (IOException ioe) {
       LOG.error("Could not read properties file " + DEFAULT_CONFIG_FILE + ": " + ioe.toString());
     } finally {
@@ -156,11 +163,12 @@
     this.hadoopHome = System.getenv("HADOOP_HOME");
     this.codeOutputDir = System.getProperty("sqoop.src.dir", ".");
 
-    String tmpDir = System.getProperty("test.build.data", "/tmp/");
-    if (!tmpDir.endsWith(File.separator)) {
-      tmpDir = tmpDir + File.separator;
+    String myTmpDir = System.getProperty("test.build.data", "/tmp/");
+    if (!myTmpDir.endsWith(File.separator)) {
+      myTmpDir = myTmpDir + File.separator;
     }
 
+    this.tmpDir = myTmpDir;
     this.jarOutputDir = tmpDir + "sqoop/compile";
     this.layout = FileLayout.TextFile;
 
@@ -178,6 +186,7 @@
     System.out.println("--driver (class-name)        Manually specify JDBC driver class to use");
     System.out.println("--username (username)        Set authentication username");
     System.out.println("--password (password)        Set authentication password");
+    System.out.println("--local                      Use local import fast path (mysql only)");
     System.out.println("");
     System.out.println("Import control options:");
     System.out.println("--table (tablename)          Table to read");
@@ -232,6 +241,8 @@
           this.action = ControlAction.ListTables;
         } else if (args[i].equals("--all-tables")) {
           this.allTables = true;
+        } else if (args[i].equals("--local")) {
+          this.local = true;
         } else if (args[i].equals("--username")) {
           this.username = args[++i];
           if (null == this.password) {
@@ -300,6 +311,13 @@
     }
   }
 
+  /** get the temporary directory; guaranteed to end in File.separator
+   * (e.g., '/')
+   */
+  public String getTmpDir() {
+    return tmpDir;
+  }
+
   public String getConnectString() {
     return connectString;
   }
@@ -336,6 +354,10 @@
     return password;
   }
 
+  public boolean isLocal() {
+    return local;
+  }
+
   /**
    * @return location where .java files go; guaranteed to end with '/'
    */
@@ -393,4 +415,8 @@
   public FileLayout getFileLayout() {
     return this.layout;
   }
+
+  public void setUsername(String name) {
+    this.username = name;
+  }
 }

Modified: hadoop/core/branches/HADOOP-3628-2/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/MySQLManager.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/MySQLManager.java?rev=783055&r1=783054&r2=783055&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/MySQLManager.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/MySQLManager.java Tue Jun  9 16:11:19 2009
@@ -41,6 +41,19 @@
 
   public MySQLManager(final ImportOptions opts) {
     super(DRIVER_CLASS, opts);
+
+    String connectString = opts.getConnectString();
+    if (null != connectString && connectString.indexOf("//localhost") != -1) {
+      // if we're not doing a remote connection, they should have a LocalMySQLManager.
+      LOG.warn("It looks like you are importing from mysql on localhost.");
+      LOG.warn("This transfer can be faster! Use the --local option to exercise a");
+      LOG.warn("MySQL-specific fast path.");
+    }
+  }
+
+  protected MySQLManager(final ImportOptions opts, boolean ignored) {
+    // constructor used by subclasses to avoid the --local warning.
+    super(DRIVER_CLASS, opts);
   }
 
   @Override

Modified: hadoop/core/branches/HADOOP-3628-2/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapred/ImportJob.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapred/ImportJob.java?rev=783055&r1=783054&r2=783055&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapred/ImportJob.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapred/ImportJob.java Tue Jun  9 16:11:19 2009
@@ -43,9 +43,6 @@
 
 /**
  * Actually runs a jdbc import job using the ORM files generated by the sqoop.orm package.
- *
- * 
- *
  */
 public class ImportJob {
 

Modified: hadoop/core/branches/HADOOP-3628-2/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/orm/CompilationManager.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/orm/CompilationManager.java?rev=783055&r1=783054&r2=783055&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/orm/CompilationManager.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/orm/CompilationManager.java Tue Jun  9 16:11:19 2009
@@ -130,6 +130,15 @@
       }
     }
 
+    // find sqoop jar for compilation classpath
+    String sqoopJar = findThisJar();
+    if (null != sqoopJar) {
+      sqoopJar = File.pathSeparator + sqoopJar;
+    } else {
+      LOG.warn("Could not find sqoop jar; child compilation may fail");
+      sqoopJar = "";
+    }
+
     String curClasspath = System.getProperty("java.class.path");
 
     args.add("-sourcepath");
@@ -140,7 +149,7 @@
     args.add(jarOutDir);
 
     args.add("-classpath");
-    args.add(curClasspath + File.pathSeparator + coreJar);
+    args.add(curClasspath + File.pathSeparator + coreJar + sqoopJar);
 
     // add all the source files
     for (String srcfile : sources) {

Modified: hadoop/core/branches/HADOOP-3628-2/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/AllTests.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/AllTests.java?rev=783055&r1=783054&r2=783055&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/AllTests.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/AllTests.java Tue Jun  9 16:11:19 2009
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.sqoop;
 
+import org.apache.hadoop.sqoop.manager.LocalMySQLTest;
 import org.apache.hadoop.sqoop.manager.TestHsqldbManager;
 import org.apache.hadoop.sqoop.manager.TestSqlManager;
 import org.apache.hadoop.sqoop.orm.TestClassWriter;
@@ -44,6 +45,7 @@
     suite.addTestSuite(TestColumnTypes.class);
     suite.addTestSuite(TestMultiCols.class);
     suite.addTestSuite(TestOrderBy.class);
+    suite.addTestSuite(LocalMySQLTest.class);
 
     return suite;
   }

Modified: hadoop/core/branches/HADOOP-3628-2/src/contrib/streaming/ivy.xml
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/contrib/streaming/ivy.xml?rev=783055&r1=783054&r2=783055&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/contrib/streaming/ivy.xml (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/contrib/streaming/ivy.xml Tue Jun  9 16:11:19 2009
@@ -24,6 +24,10 @@
     <artifact conf="master"/>
   </publications>
   <dependencies>
+    <dependency org="org.apache.mahout.commons"
+      name="commons-cli"
+      rev="${commons-cli2.version}"
+      conf="common->default"/>
     <dependency org="commons-logging"
       name="commons-logging"
       rev="${commons-logging.version}"

Modified: hadoop/core/branches/HADOOP-3628-2/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java?rev=783055&r1=783054&r2=783055&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java Tue Jun  9 16:11:19 2009
@@ -376,6 +376,9 @@
       if (errThread_ != null) {
         errThread_.join(joinDelay_);
       }
+      if (outerrThreadsThrowable != null) {
+        throw new RuntimeException(outerrThreadsThrowable);
+      }
     } catch (InterruptedException e) {
       //ignore
     }
@@ -425,7 +428,11 @@
           if (now-lastStdoutReport > reporterOutDelay_) {
             lastStdoutReport = now;
             String hline = "Records R/W=" + numRecRead_ + "/" + numRecWritten_;
-            reporter.setStatus(hline);
+            if (!processProvidedStatus_) {
+              reporter.setStatus(hline);
+            } else {
+              reporter.progress();
+            }
             logprintln(hline);
             logflush();
           }
@@ -476,6 +483,7 @@
             if (matchesCounter(lineStr)) {
               incrCounter(lineStr);
             } else if (matchesStatus(lineStr)) {
+              processProvidedStatus_ = true;
               setStatus(lineStr);
             } else {
               LOG.warn("Cannot parse reporter line: " + lineStr);
@@ -572,6 +580,7 @@
       if (sim != null) sim.destroy();
       logprintln("mapRedFinished");
     } catch (RuntimeException e) {
+      logprintln("PipeMapRed failed!");
       logStackTrace(e);
       throw e;
     }
@@ -682,4 +691,5 @@
   String LOGNAME;
   PrintStream log_;
 
+  volatile boolean processProvidedStatus_ = false;
 }

Modified: hadoop/core/branches/HADOOP-3628-2/src/contrib/streaming/src/test/org/apache/hadoop/streaming/StderrApp.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/contrib/streaming/src/test/org/apache/hadoop/streaming/StderrApp.java?rev=783055&r1=783054&r2=783055&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/contrib/streaming/src/test/org/apache/hadoop/streaming/StderrApp.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/contrib/streaming/src/test/org/apache/hadoop/streaming/StderrApp.java Tue Jun  9 16:11:19 2009
@@ -32,8 +32,16 @@
    * postWriteLines to stderr.
    */
   public static void go(int preWriteLines, int sleep, int postWriteLines) throws IOException {
+    go(preWriteLines, sleep, postWriteLines, false);
+  }
+  
+  public static void go(int preWriteLines, int sleep, int postWriteLines, boolean status) throws IOException {
     BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
     String line;
+    
+    if (status) {
+      System.err.println("reporter:status:starting echo");
+    }      
        
     while (preWriteLines > 0) {
       --preWriteLines;
@@ -57,13 +65,14 @@
 
   public static void main(String[] args) throws IOException {
     if (args.length < 3) {
-      System.err.println("Usage: StderrApp PREWRITE SLEEP POSTWRITE");
+      System.err.println("Usage: StderrApp PREWRITE SLEEP POSTWRITE [STATUS]");
       return;
     }
     int preWriteLines = Integer.parseInt(args[0]);
     int sleep = Integer.parseInt(args[1]);
     int postWriteLines = Integer.parseInt(args[2]);
+    boolean status = args.length > 3 ? Boolean.parseBoolean(args[3]) : false;
     
-    go(preWriteLines, sleep, postWriteLines);
+    go(preWriteLines, sleep, postWriteLines, status);
   }
 }

Modified: hadoop/core/branches/HADOOP-3628-2/src/core/core-default.xml
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/core/core-default.xml?rev=783055&r1=783054&r2=783055&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/core/core-default.xml (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/core/core-default.xml Tue Jun  9 16:11:19 2009
@@ -257,6 +257,12 @@
   </description>
 </property>
 
+<property>
+  <name>fs.s3n.block.size</name>
+  <value>67108864</value>
+  <description>Block size to use when reading files using the native S3
+  filesystem (s3n: URIs).</description>
+</property>
 
 <property>
   <name>local.cache.size</name>

Modified: hadoop/core/branches/HADOOP-3628-2/src/core/org/apache/hadoop/filecache/DistributedCache.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/core/org/apache/hadoop/filecache/DistributedCache.java?rev=783055&r1=783054&r2=783055&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/core/org/apache/hadoop/filecache/DistributedCache.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/core/org/apache/hadoop/filecache/DistributedCache.java Tue Jun  9 16:11:19 2009
@@ -683,7 +683,7 @@
     throws IOException {
     String classpath = conf.get("mapred.job.classpath.files");
     conf.set("mapred.job.classpath.files", classpath == null ? file.toString()
-             : classpath + System.getProperty("path.separator") + file.toString());
+             : classpath + "," + file.toString());
     FileSystem fs = FileSystem.get(conf);
     URI uri = fs.makeQualified(file).toUri();
 
@@ -696,14 +696,14 @@
    * @param conf Configuration that contains the classpath setting
    */
   public static Path[] getFileClassPaths(Configuration conf) {
-    String classpath = conf.get("mapred.job.classpath.files");
-    if (classpath == null)
-      return null;
-    ArrayList list = Collections.list(new StringTokenizer(classpath, System
-                                                          .getProperty("path.separator")));
+    ArrayList<String> list = (ArrayList<String>)conf.getStringCollection(
+                                "mapred.job.classpath.files");
+    if (list.size() == 0) { 
+      return null; 
+    }
     Path[] paths = new Path[list.size()];
     for (int i = 0; i < list.size(); i++) {
-      paths[i] = new Path((String) list.get(i));
+      paths[i] = new Path(list.get(i));
     }
     return paths;
   }
@@ -719,8 +719,7 @@
     throws IOException {
     String classpath = conf.get("mapred.job.classpath.archives");
     conf.set("mapred.job.classpath.archives", classpath == null ? archive
-             .toString() : classpath + System.getProperty("path.separator")
-             + archive.toString());
+             .toString() : classpath + "," + archive.toString());
     FileSystem fs = FileSystem.get(conf);
     URI uri = fs.makeQualified(archive).toUri();
 
@@ -733,14 +732,14 @@
    * @param conf Configuration that contains the classpath setting
    */
   public static Path[] getArchiveClassPaths(Configuration conf) {
-    String classpath = conf.get("mapred.job.classpath.archives");
-    if (classpath == null)
-      return null;
-    ArrayList list = Collections.list(new StringTokenizer(classpath, System
-                                                          .getProperty("path.separator")));
+    ArrayList<String> list = (ArrayList<String>)conf.getStringCollection(
+                                "mapred.job.classpath.archives");
+    if (list.size() == 0) { 
+      return null; 
+    }
     Path[] paths = new Path[list.size()];
     for (int i = 0; i < list.size(); i++) {
-      paths[i] = new Path((String) list.get(i));
+      paths[i] = new Path(list.get(i));
     }
     return paths;
   }

Modified: hadoop/core/branches/HADOOP-3628-2/src/core/org/apache/hadoop/fs/FsShell.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/core/org/apache/hadoop/fs/FsShell.java?rev=783055&r1=783054&r2=783055&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/core/org/apache/hadoop/fs/FsShell.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/core/org/apache/hadoop/fs/FsShell.java Tue Jun  9 16:11:19 2009
@@ -1170,10 +1170,13 @@
 
     while (true) {
       FSDataInputStream in = srcFs.open(path);
-      in.seek(offset);
-      IOUtils.copyBytes(in, System.out, 1024, false);
-      offset = in.getPos();
-      in.close();
+      try {
+        in.seek(offset);
+        IOUtils.copyBytes(in, System.out, 1024);
+        offset = in.getPos();
+      } finally {
+        in.close();
+      }
       if (!foption) {
         break;
       }

Modified: hadoop/core/branches/HADOOP-3628-2/src/core/org/apache/hadoop/fs/kfs/KosmosFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/core/org/apache/hadoop/fs/kfs/KosmosFileSystem.java?rev=783055&r1=783054&r2=783055&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/core/org/apache/hadoop/fs/kfs/KosmosFileSystem.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/core/org/apache/hadoop/fs/kfs/KosmosFileSystem.java Tue Jun  9 16:11:19 2009
@@ -122,7 +122,6 @@
     }
 
     @Override
-    @Deprecated
     public boolean isDirectory(Path path) throws IOException {
 	Path absolute = makeAbsolute(path);
         String srep = absolute.toUri().getPath();
@@ -133,7 +132,6 @@
     }
 
     @Override
-    @Deprecated
     public boolean isFile(Path path) throws IOException {
 	Path absolute = makeAbsolute(path);
         String srep = absolute.toUri().getPath();

Modified: hadoop/core/branches/HADOOP-3628-2/src/core/org/apache/hadoop/fs/s3/S3FileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/core/org/apache/hadoop/fs/s3/S3FileSystem.java?rev=783055&r1=783054&r2=783055&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/core/org/apache/hadoop/fs/s3/S3FileSystem.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/core/org/apache/hadoop/fs/s3/S3FileSystem.java Tue Jun  9 16:11:19 2009
@@ -332,6 +332,11 @@
     }
     return new S3FileStatus(f.makeQualified(this), inode);
   }
+  
+  @Override
+  public long getDefaultBlockSize() {
+    return getConf().getLong("fs.s3.block.size", 64 * 1024 * 1024);
+  }
 
   // diagnostic methods
 

Modified: hadoop/core/branches/HADOOP-3628-2/src/core/org/apache/hadoop/fs/s3native/Jets3tNativeFileSystemStore.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/core/org/apache/hadoop/fs/s3native/Jets3tNativeFileSystemStore.java?rev=783055&r1=783054&r2=783055&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/core/org/apache/hadoop/fs/s3native/Jets3tNativeFileSystemStore.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/core/org/apache/hadoop/fs/s3native/Jets3tNativeFileSystemStore.java Tue Jun  9 16:11:19 2009
@@ -24,6 +24,7 @@
 import java.io.ByteArrayInputStream;
 import java.io.File;
 import java.io.FileInputStream;
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.URI;
@@ -53,10 +54,7 @@
             s3Credentials.getSecretAccessKey());
       this.s3Service = new RestS3Service(awsCredentials);
     } catch (S3ServiceException e) {
-      if (e.getCause() instanceof IOException) {
-        throw (IOException) e.getCause();
-      }
-      throw new S3Exception(e);
+      handleServiceException(e);
     }
     bucket = new S3Bucket(uri.getHost());
   }
@@ -76,10 +74,7 @@
       }
       s3Service.putObject(bucket, object);
     } catch (S3ServiceException e) {
-      if (e.getCause() instanceof IOException) {
-        throw (IOException) e.getCause();
-      }
-      throw new S3Exception(e);
+      handleServiceException(e);
     } finally {
       if (in != null) {
         try {
@@ -99,10 +94,7 @@
       object.setContentLength(0);
       s3Service.putObject(bucket, object);
     } catch (S3ServiceException e) {
-      if (e.getCause() instanceof IOException) {
-        throw (IOException) e.getCause();
-      }
-      throw new S3Exception(e);
+      handleServiceException(e);
     }
   }
   
@@ -116,10 +108,8 @@
       if (e.getMessage().contains("ResponseCode=404")) {
         return null;
       }
-      if (e.getCause() instanceof IOException) {
-        throw (IOException) e.getCause();
-      }
-      throw new S3Exception(e);
+      handleServiceException(e);
+      return null; //never returned - keep compiler happy
     }
   }
   
@@ -128,13 +118,8 @@
       S3Object object = s3Service.getObject(bucket, key);
       return object.getDataInputStream();
     } catch (S3ServiceException e) {
-      if ("NoSuchKey".equals(e.getS3ErrorCode())) {
-        return null;
-      }
-      if (e.getCause() instanceof IOException) {
-        throw (IOException) e.getCause();
-      }
-      throw new S3Exception(e);
+      handleServiceException(key, e);
+      return null; //never returned - keep compiler happy
     }
   }
   
@@ -145,32 +130,22 @@
                                             null, byteRangeStart, null);
       return object.getDataInputStream();
     } catch (S3ServiceException e) {
-      if ("NoSuchKey".equals(e.getS3ErrorCode())) {
-        return null;
-      }
-      if (e.getCause() instanceof IOException) {
-        throw (IOException) e.getCause();
-      }
-      throw new S3Exception(e);
+      handleServiceException(key, e);
+      return null; //never returned - keep compiler happy
     }
   }
 
   public PartialListing list(String prefix, int maxListingLength)
     throws IOException {
-    return list(prefix, maxListingLength, null);
+    return list(prefix, maxListingLength, null, false);
   }
   
-  public PartialListing list(String prefix, int maxListingLength,
-      String priorLastKey) throws IOException {
+  public PartialListing list(String prefix, int maxListingLength, String priorLastKey,
+      boolean recurse) throws IOException {
 
-    return list(prefix, PATH_DELIMITER, maxListingLength, priorLastKey);
+    return list(prefix, recurse ? null : PATH_DELIMITER, maxListingLength, priorLastKey);
   }
 
-  public PartialListing listAll(String prefix, int maxListingLength,
-      String priorLastKey) throws IOException {
-
-    return list(prefix, null, maxListingLength, priorLastKey);
-  }
 
   private PartialListing list(String prefix, String delimiter,
       int maxListingLength, String priorLastKey) throws IOException {
@@ -191,10 +166,8 @@
       return new PartialListing(chunk.getPriorLastKey(), fileMetadata,
           chunk.getCommonPrefixes());
     } catch (S3ServiceException e) {
-      if (e.getCause() instanceof IOException) {
-        throw (IOException) e.getCause();
-      }
-      throw new S3Exception(e);
+      handleServiceException(e);
+      return null; //never returned - keep compiler happy
     }
   }
 
@@ -202,36 +175,27 @@
     try {
       s3Service.deleteObject(bucket, key);
     } catch (S3ServiceException e) {
-      if (e.getCause() instanceof IOException) {
-        throw (IOException) e.getCause();
-      }
-      throw new S3Exception(e);
+      handleServiceException(key, e);
     }
   }
   
-  public void rename(String srcKey, String dstKey) throws IOException {
+  public void copy(String srcKey, String dstKey) throws IOException {
     try {
-      s3Service.moveObject(bucket.getName(), srcKey, bucket.getName(),
+      s3Service.copyObject(bucket.getName(), srcKey, bucket.getName(),
           new S3Object(dstKey), false);
     } catch (S3ServiceException e) {
-      if (e.getCause() instanceof IOException) {
-        throw (IOException) e.getCause();
-      }
-      throw new S3Exception(e);
+      handleServiceException(srcKey, e);
     }
   }
 
   public void purge(String prefix) throws IOException {
     try {
       S3Object[] objects = s3Service.listObjects(bucket, prefix, null);
-      for (int i = 0; i < objects.length; i++) {
-        s3Service.deleteObject(bucket, objects[i].getKey());
+      for (S3Object object : objects) {
+        s3Service.deleteObject(bucket, object.getKey());
       }
     } catch (S3ServiceException e) {
-      if (e.getCause() instanceof IOException) {
-        throw (IOException) e.getCause();
-      }
-      throw new S3Exception(e);
+      handleServiceException(e);
     }
   }
 
@@ -240,16 +204,29 @@
     sb.append(bucket.getName()).append("\n");
     try {
       S3Object[] objects = s3Service.listObjects(bucket);
-      for (int i = 0; i < objects.length; i++) {
-        sb.append(objects[i].getKey()).append("\n");
+      for (S3Object object : objects) {
+        sb.append(object.getKey()).append("\n");
       }
     } catch (S3ServiceException e) {
-      if (e.getCause() instanceof IOException) {
-        throw (IOException) e.getCause();
-      }
-      throw new S3Exception(e);
+      handleServiceException(e);
     }
     System.out.println(sb);
   }
-  
+
+  private void handleServiceException(String key, S3ServiceException e) throws IOException {
+    if ("NoSuchKey".equals(e.getS3ErrorCode())) {
+      throw new FileNotFoundException("Key '" + key + "' does not exist in S3");
+    } else {
+      handleServiceException(e);
+    }
+  }
+
+  private void handleServiceException(S3ServiceException e) throws IOException {
+    if (e.getCause() instanceof IOException) {
+      throw (IOException) e.getCause();
+    }
+    else {
+      throw new S3Exception(e);
+    }
+  }
 }

Modified: hadoop/core/branches/HADOOP-3628-2/src/core/org/apache/hadoop/fs/s3native/NativeFileSystemStore.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/core/org/apache/hadoop/fs/s3native/NativeFileSystemStore.java?rev=783055&r1=783054&r2=783055&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/core/org/apache/hadoop/fs/s3native/NativeFileSystemStore.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/core/org/apache/hadoop/fs/s3native/NativeFileSystemStore.java Tue Jun  9 16:11:19 2009
@@ -42,14 +42,12 @@
   InputStream retrieve(String key, long byteRangeStart) throws IOException;
   
   PartialListing list(String prefix, int maxListingLength) throws IOException;
-  PartialListing list(String prefix, int maxListingLength, String priorLastKey)
+  PartialListing list(String prefix, int maxListingLength, String priorLastKey, boolean recursive)
     throws IOException;
-  PartialListing listAll(String prefix, int maxListingLength,
-      String priorLastKey) throws IOException;
   
   void delete(String key) throws IOException;
 
-  void rename(String srcKey, String dstKey) throws IOException;
+  void copy(String srcKey, String dstKey) throws IOException;
   
   /**
    * Delete all keys with the given prefix. Used for testing.



Mime
View raw message