hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yhema...@apache.org
Subject svn commit: r723788 - in /hadoop/core/trunk: CHANGES.txt src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
Date Fri, 05 Dec 2008 16:52:23 GMT
Author: yhemanth
Date: Fri Dec  5 08:52:23 2008
New Revision: 723788

URL: http://svn.apache.org/viewvc?rev=723788&view=rev
Log:
HADOOP-4445. Replace running task counts with running task percentage in capacity scheduler
UI. Contributed by Sreekanth Ramakrishnan.

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
    hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=723788&r1=723787&r2=723788&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Fri Dec  5 08:52:23 2008
@@ -50,6 +50,10 @@
     HADOOP-3497. Fix bug in overly restrictive file globbing with a
     PathFilter. (tomwhite)
 
+    HADOOP-4445. Replace running task counts with running task
+    percentage in capacity scheduler UI. (Sreekanth Ramakrishnan via
+    yhemanth)
+
   NEW FEATURES
 
     HADOOP-4575. Add a proxy service for relaying HsftpFileSystem requests.

Modified: hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java?rev=723788&r1=723787&r2=723788&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
(original)
+++ hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
Fri Dec  5 08:52:23 2008
@@ -212,26 +212,42 @@
     
     @Override
     public String toString(){
+      float runningMaps = 0;
+      float runningReduces = 0;
+      
+      Collection<JobInProgress> runningJobs = 
+        mgr.getRunningJobQueue(mqsi.queueName);
+      
+      for(JobInProgress job : runningJobs) {
+        runningMaps += job.runningMaps();
+        runningReduces += job.runningReduces();
+      }
+      float usedMaps = mqsi.guaranteedCapacity!= 0 ? 
+          (runningMaps * 100/mqsi.guaranteedCapacity):0;
+      float usedReduces = rqsi.guaranteedCapacity != 0 ? 
+          (runningReduces * 100/rqsi.guaranteedCapacity) :0;
       StringBuffer sb = new StringBuffer();
-      sb.append("Guaranteed Capacity (%) : ");
+      sb.append("Guaranteed Capacity : ");
       sb.append(mqsi.guaranteedCapacityPercent);
-      sb.append(" \n");
+      sb.append(" %\n");
       sb.append(String.format("Guaranteed Capacity Maps : %d \n",
           mqsi.guaranteedCapacity));
       sb.append(String.format("Guaranteed Capacity Reduces : %d \n",
           rqsi.guaranteedCapacity));
-      sb.append(String.format("User Limit : %d \n",mqsi.ulMin));
-      sb.append(String.format("Reclaim Time limit : %d \n",mqsi.reclaimTime));
-      sb.append(String.format("Number of Running Maps : %d \n", 
-          mqsi.numRunningTasks));
-      sb.append(String.format("Number of Running Reduces : %d \n", 
-          rqsi.numRunningTasks));
+      sb.append(String.format("User Limit : %d %s\n",mqsi.ulMin, "%"));
+      sb.append(String.format("Reclaim Time limit : %s \n", 
+          StringUtils.formatTime(mqsi.reclaimTime)));
+      sb.append(String.format("Priority Supported : %s \n",
+          supportsPriority?"YES":"NO"));
+      sb.append("-------------\n");
+      sb.append(String.format("Running Maps : %s %s\n",
+          Float.valueOf(usedMaps).toString(),
+          "% of Guaranteed Capacity"));
+      sb.append(String.format("Running Reduces : %s %s\n",
+          Float.valueOf(usedReduces).toString(),
+          "% of Guaranteed Capacity" ));
       sb.append(String.format("Number of Waiting Jobs : %d \n", mgr
           .getWaitingJobCount(mqsi.queueName)));
-      sb.append(String.format("Priority Supported : %s \n",
-          supportsPriority?"YES":"NO"));      
-      sb.append(String.format("* Scheduling information can be off by "
-          + "maximum of %s\n", StringUtils.formatTime(pollingInterval)));
       return sb.toString();
     }
   }

Modified: hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java?rev=723788&r1=723787&r2=723788&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
(original)
+++ hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
Fri Dec  5 08:52:23 2008
@@ -243,10 +243,6 @@
       return (Set<TaskInProgress>)reduceTips;
     }
     
-    @Override
-    synchronized void fail() {
-      getStatus().setRunState(JobStatus.FAILED);
-    }
   }
   
   static class FakeTaskInProgress extends TaskInProgress {
@@ -1343,6 +1339,39 @@
     assertEquals(j1.runningMapTasks, 2);
     
   }
+  /*
+   * Following is the testing strategy for testing scheduling information.
+   * - start capacity scheduler with two queues.
+   * - check the scheduling information with respect to the configuration
+   * which was used to configure the queues.
+   * - Submit 5 jobs to a queue.
+   * - Check the waiting jobs count, it should be 5.
+   * - Then run initializationPoller()
+   * - Check once again the waiting queue, it should be 5 jobs again.
+   * - Then raise status change events.
+   * - Assign one task to a task tracker. (Map)
+   * - Check waiting job count, it should be 4 now and used map (%) = 100
+   * - Assign another one task (Reduce)
+   * - Check waiting job count, it should be 4 now and used map (%) = 100
+   * and used reduce (%) = 100
+   * - finish the job and then check the used percentage it should go
+   * back to zero
+   * - Then pick an initialized job but not scheduled job and fail it.
+   * - Run the poller
+   * - Check the waiting job count should now be 3.
+   * - Now fail a job which has not been initialized at all.
+   * - Run the poller, so that it can clean up the job queue.
+   * - Check the count, the waiting job count should be 2.
+   * - Now raise status change events to move the initialized jobs which 
+   * should be two in count to running queue.
+   * - Then schedule a map of the job in running queue. 
+   * - Run the poller because the poller is responsible for waiting
+   * jobs count. Check the count, it should be using 100% map and one
+   * waiting job
+   * - fail the running job.
+   * - Check the count, it should be now one waiting job and zero running
+   * tasks
+   */
   
   public void testSchedulingInformation() throws Exception {
     String[] qs = {"default", "q2"};
@@ -1371,35 +1400,20 @@
     String[] infoStrings = schedulingInfo.split("\n");
     
     assertEquals(infoStrings.length, 10);
-    assertEquals(infoStrings[0] , "Guaranteed Capacity (%) : 50.0 ");
+    assertEquals(infoStrings[0] , "Guaranteed Capacity : 50.0 %");
     assertEquals(infoStrings[1] , "Guaranteed Capacity Maps : " + totalMaps * 50/100 + "
");
     assertEquals(infoStrings[2] , "Guaranteed Capacity Reduces : " + totalReduces * 50/100
+ " ");
-    assertEquals(infoStrings[3] , "User Limit : 25 ");
-    assertEquals(infoStrings[4] , "Reclaim Time limit : 1000000 " );
-    assertEquals(infoStrings[5] , "Number of Running Maps : 0 ");
-    assertEquals(infoStrings[6] , "Number of Running Reduces : 0 ");
-    assertEquals(infoStrings[7] , "Number of Waiting Jobs : 0 ");
-    assertEquals(infoStrings[8] , "Priority Supported : YES ");
-    assertEquals(infoStrings[9] , "* Scheduling information can be off by " +
-        "maximum of "+ StringUtils.formatTime(resConf.getSleepInterval()));
+    assertEquals(infoStrings[3] , "User Limit : 25 %");
+    assertEquals(infoStrings[4] , "Reclaim Time limit : " + 
+        StringUtils.formatTime(1000000) + " ");
+    assertEquals(infoStrings[5] , "Priority Supported : YES ");
+    assertEquals(infoStrings[7] , 
+        "Running Maps : 0.0 % of Guaranteed Capacity");
+    assertEquals(infoStrings[8] , 
+        "Running Reduces : 0.0 % of Guaranteed Capacity");
+    assertEquals(infoStrings[9] , "Number of Waiting Jobs : 0 ");
     assertEquals(schedulingInfo, schedulingInfo2);
     
-    /*
-     * Following is the testing strategy for testing scheduling information.
-     * - Submit 5 jobs to a queue.
-     * - Check the waiting jobs count, it should be 5.
-     * - Then run initializationPoller()
-     * - Check once again the waiting queue, it should be 5 jobs again.
-     * - Then raise status change events.
-     * - Assign one task to a task tracker.
-     * - Check waiting job count, it should be 4 now.
-     * - Then pick an initialized job but not scheduled job and fail it.
-     * - Run the poller
-     * - Check the waiting job count should now be 3.
-     * - Now fail a job which has not been initialized at all.
-     * - Run the poller, so that it can clean up the job queue.
-     * - Check the count, the waiting job count should be 2.
-     */
     //Testing with actual job submission.
     ArrayList<FakeJobInProgress> userJobs = 
       submitJobs(1, 5, "default").get("u1");
@@ -1409,9 +1423,11 @@
     
     //waiting job should be equal to number of jobs submitted.
     assertEquals(infoStrings.length, 10);
-    assertEquals(infoStrings[5] , "Number of Running Maps : 0 ");
-    assertEquals(infoStrings[6] , "Number of Running Reduces : 0 ");
-    assertEquals(infoStrings[7] , "Number of Waiting Jobs : 5 ");
+    assertEquals(infoStrings[7] ,
+        "Running Maps : 0.0 % of Guaranteed Capacity");
+    assertEquals(infoStrings[8] , 
+        "Running Reduces : 0.0 % of Guaranteed Capacity");
+    assertEquals(infoStrings[9] , "Number of Waiting Jobs : 5 ");
     
     //Initalize the jobs but don't raise events
     p.selectJobsToInitialize();
@@ -1422,14 +1438,16 @@
     assertEquals(infoStrings.length, 10);
     //should be previous value as nothing is scheduled because no events
     //has been raised after initialization.
-    assertEquals(infoStrings[5] , "Number of Running Maps : 0 ");
-    assertEquals(infoStrings[6] , "Number of Running Reduces : 0 ");
-    assertEquals(infoStrings[7] , "Number of Waiting Jobs : 5 ");
+    assertEquals(infoStrings[7] ,
+        "Running Maps : 0.0 % of Guaranteed Capacity");
+    assertEquals(infoStrings[8] , 
+        "Running Reduces : 0.0 % of Guaranteed Capacity");
+    assertEquals(infoStrings[9] , "Number of Waiting Jobs : 5 ");
     
     //Raise status change event so that jobs can move to running queue.
     raiseStatusChangeEvents(scheduler.jobQueuesManager);
     //assign one job
-    scheduler.assignTasks(tracker("tt1")); // heartbeat
+    Task t1 = checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
     //Initalize extra job.
     p.selectJobsToInitialize();
     
@@ -1439,35 +1457,116 @@
       queueManager.getJobQueueInfo("default").getSchedulingInfo();
     infoStrings = schedulingInfo.split("\n");
     assertEquals(infoStrings.length, 10);
-    //TODO check running task count also fix in HADOOP-4445
-    assertEquals(infoStrings[7] , "Number of Waiting Jobs : 4 ");
+    assertEquals(infoStrings[7] , 
+        "Running Maps : 100.0 % of Guaranteed Capacity");
+    assertEquals(infoStrings[8] , 
+        "Running Reduces : 0.0 % of Guaranteed Capacity");
+    assertEquals(infoStrings[9] , "Number of Waiting Jobs : 4 ");
+    
+    //assign a reduce task
+    
+    Task t2 = checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
+    schedulingInfo = 
+      queueManager.getJobQueueInfo("default").getSchedulingInfo();
+    infoStrings = schedulingInfo.split("\n");
+    assertEquals(infoStrings.length, 10);
+    assertEquals(infoStrings[7] , 
+        "Running Maps : 100.0 % of Guaranteed Capacity");
+    assertEquals(infoStrings[8] , 
+        "Running Reduces : 100.0 % of Guaranteed Capacity");
+    assertEquals(infoStrings[9] , "Number of Waiting Jobs : 4 ");
+    
+    //Complete the job and check the running tasks count
+    FakeJobInProgress u1j1 = userJobs.get(0);
+    taskTrackerManager.finishTask("tt1", t1.getTaskID().toString(), u1j1);
+    taskTrackerManager.finishTask("tt1", t2.getTaskID().toString(), u1j1);
+    taskTrackerManager.finalizeJob(u1j1);
+    
+    schedulingInfo = 
+      queueManager.getJobQueueInfo("default").getSchedulingInfo();
+    infoStrings = schedulingInfo.split("\n");
+    assertEquals(infoStrings.length, 10);
+    assertEquals(infoStrings[7] , 
+        "Running Maps : 0.0 % of Guaranteed Capacity");
+    assertEquals(infoStrings[8] ,
+        "Running Reduces : 0.0 % of Guaranteed Capacity");
+    assertEquals(infoStrings[9] , "Number of Waiting Jobs : 4 ");
     
     
     //Fail a job which is initialized but not scheduled and check the count.
     FakeJobInProgress u1j2 = userJobs.get(1);
     assertTrue("User1 job 2 not initalized ", 
         u1j2.getStatus().getRunState() == JobStatus.RUNNING);
-    u1j2.fail();
+    taskTrackerManager.finalizeJob(u1j2, JobStatus.FAILED);
     //Run initializer to clean up failed jobs
     p.selectJobsToInitialize();
     schedulingInfo = 
       queueManager.getJobQueueInfo("default").getSchedulingInfo();
     infoStrings = schedulingInfo.split("\n");
     assertEquals(infoStrings.length, 10);
-    assertEquals(infoStrings[7] , "Number of Waiting Jobs : 3 ");
+    assertEquals(infoStrings[7] , 
+        "Running Maps : 0.0 % of Guaranteed Capacity");
+    assertEquals(infoStrings[8] ,
+        "Running Reduces : 0.0 % of Guaranteed Capacity");
+    assertEquals(infoStrings[9] , "Number of Waiting Jobs : 3 ");
     
     //Fail a job which is not initialized but is in the waiting queue.
     FakeJobInProgress u1j5 = userJobs.get(4);
     assertFalse("User1 job 5 initalized ", 
         u1j5.getStatus().getRunState() == JobStatus.RUNNING);
-    u1j5.fail();
+    
+    taskTrackerManager.finalizeJob(u1j5, JobStatus.FAILED);
     //run initializer to clean up failed job
     p.selectJobsToInitialize();
     schedulingInfo = 
       queueManager.getJobQueueInfo("default").getSchedulingInfo();
     infoStrings = schedulingInfo.split("\n");
     assertEquals(infoStrings.length, 10);
-    assertEquals(infoStrings[7] , "Number of Waiting Jobs : 2 ");
+    assertEquals(infoStrings[7] , 
+        "Running Maps : 0.0 % of Guaranteed Capacity");
+    assertEquals(infoStrings[8] , 
+        "Running Reduces : 0.0 % of Guaranteed Capacity");
+    assertEquals(infoStrings[9] , "Number of Waiting Jobs : 2 ");
+    
+    //Raise status change events as none of the intialized jobs would be
+    //in running queue as we just failed the second job which was initialized
+    //and completed the first one.
+    
+    raiseStatusChangeEvents(scheduler.jobQueuesManager);
+    
+    //Now schedule a map should be job3 of the user as job1 succeeded job2
+    //failed and now job3 is running
+    
+    t1 = checkAssignment("tt1", "attempt_test_0003_m_000001_0 on tt1");
+    FakeJobInProgress u1j3 = userJobs.get(2);
+    assertTrue("User Job 3 not running ", 
+        u1j3.getStatus().getRunState() == JobStatus.RUNNING);
+    
+    //now the running count of map should be one and waiting jobs should be
+    //one. run the poller as it is responsible for waiting count
+    p.selectJobsToInitialize();
+    schedulingInfo = 
+      queueManager.getJobQueueInfo("default").getSchedulingInfo();
+    infoStrings = schedulingInfo.split("\n");
+    assertEquals(infoStrings.length, 10);
+    assertEquals(infoStrings[7] , 
+        "Running Maps : 100.0 % of Guaranteed Capacity");
+    assertEquals(infoStrings[8] , 
+        "Running Reduces : 0.0 % of Guaranteed Capacity");
+    assertEquals(infoStrings[9] , "Number of Waiting Jobs : 1 ");
+    
+    //Fail the executing job
+    taskTrackerManager.finalizeJob(u1j3, JobStatus.FAILED);
+    //Now running counts should become zero
+    schedulingInfo = 
+      queueManager.getJobQueueInfo("default").getSchedulingInfo();
+    infoStrings = schedulingInfo.split("\n");
+    assertEquals(infoStrings.length, 10);
+    assertEquals(infoStrings[7] , 
+        "Running Maps : 0.0 % of Guaranteed Capacity");
+    assertEquals(infoStrings[8] , 
+        "Running Reduces : 0.0 % of Guaranteed Capacity");
+    assertEquals(infoStrings[9] , "Number of Waiting Jobs : 1 ");
     
   }
 



Mime
View raw message