hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sha...@apache.org
Subject svn commit: r825469 - in /hadoop/mapreduce/trunk: ./ src/examples/org/apache/hadoop/examples/pi/ src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/mapreduce/ src/java/org/apache/hadoop/mapreduce/protocol/ src/java/org/apache/hadoop/mapreduc...
Date Thu, 15 Oct 2009 11:56:59 GMT
Author: sharad
Date: Thu Oct 15 11:56:57 2009
New Revision: 825469

URL: http://svn.apache.org/viewvc?rev=825469&view=rev
Log:
MAPREDUCE-1048. Add occupied/reserved slot usage summary on jobtracker UI. Contributed by
Amareshwari Sriramadasu.

Added:
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestClusterStatus.java
Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/pi/DistSum.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/ClusterMetrics.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/jobtracker/TaskTracker.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java
    hadoop/mapreduce/trunk/src/webapps/job/jobtracker.jsp

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=825469&r1=825468&r2=825469&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Thu Oct 15 11:56:57 2009
@@ -10,6 +10,9 @@
 
     MAPREDUCE-999. Improve Sqoop test speed and refactor tests.
     (Aaron Kimball via tomwhite)
+  
+    MAPREDUCE-1048. Add occupied/reserved slot usage summary on jobtracker UI.
+    (Amareshwari Sriramadasu via sharad)
 
   OPTIMIZATIONS
 

Modified: hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/pi/DistSum.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/pi/DistSum.java?rev=825469&r1=825468&r2=825469&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/pi/DistSum.java (original)
+++ hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/pi/DistSum.java Thu Oct
15 11:56:57 2009
@@ -38,9 +38,9 @@
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.ClusterStatus;
-import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobTracker;
+import org.apache.hadoop.mapreduce.Cluster;
+import org.apache.hadoop.mapreduce.ClusterMetrics;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Job;
@@ -379,14 +379,14 @@
   public static class MixMachine extends Machine {
     private static final MixMachine INSTANCE = new MixMachine();
     
-    private JobClient jobclient;
+    private Cluster cluster;
 
     /** {@inheritDoc} */
     @Override
     public synchronized void init(Job job) throws IOException {
       final Configuration conf = job.getConfiguration();
-      if (jobclient == null)
-        jobclient = new JobClient(JobTracker.getAddress(conf), conf);
+      if (cluster == null)
+        cluster = new Cluster(JobTracker.getAddress(conf), conf);
       chooseMachine(conf).init(job);
     }
 
@@ -398,9 +398,11 @@
       try {
         for(;; Thread.sleep(2000)) {
           //get cluster status
-          final ClusterStatus status = jobclient.getClusterStatus();
-          final int m = status.getMaxMapTasks() - status.getMapTasks();
-          final int r = status.getMaxReduceTasks() - status.getReduceTasks();
+          final ClusterMetrics status = cluster.getClusterStatus();
+          final int m = 
+            status.getMapSlotCapacity() - status.getOccupiedMapSlots();
+          final int r = 
+            status.getReduceSlotCapacity() - status.getOccupiedReduceSlots();
           if (m >= parts || r >= parts) {
             //favor ReduceSide machine
             final Machine value = r >= parts?

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=825469&r1=825468&r2=825469&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Thu Oct 15
11:56:57 2009
@@ -1703,6 +1703,14 @@
     return trackersReservedForReduces.size();
   }
   
+  public int getReservedMapSlots(TaskTracker taskTracker) {
+    return trackersReservedForMaps.get(taskTracker).getNumSlots();
+  }
+
+  public int getReservedReduceSlots(TaskTracker taskTracker) {
+    return trackersReservedForReduces.get(taskTracker).getNumSlots();
+  }
+
   private int getTrackerTaskFailures(String trackerName) {
     String trackerHostName = convertTrackerNameToHostName(trackerName);
     Integer failedTasks = trackerToFailuresMap.get(trackerHostName);

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=825469&r1=825468&r2=825469&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Thu Oct 15 11:56:57
2009
@@ -20,7 +20,6 @@
 
 import java.io.File;
 import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.io.UnsupportedEncodingException;
@@ -1215,6 +1214,10 @@
   //
   int totalMaps = 0;
   int totalReduces = 0;
+  private int occupiedMapSlots = 0;
+  private int occupiedReduceSlots = 0;
+  private int reservedMapSlots = 0;
+  private int reservedReduceSlots = 0;
   private HashMap<String, TaskTracker> taskTrackers =
     new HashMap<String, TaskTracker>();
   Map<String,Integer>uniqueHostsMap = new ConcurrentHashMap<String, Integer>();
@@ -2314,6 +2317,7 @@
     HeartbeatResponse response = new HeartbeatResponse(newResponseId, null);
     List<TaskTrackerAction> actions = new ArrayList<TaskTrackerAction>();
     isBlacklisted = faultyTrackers.isBlacklisted(status.getHost());
+    removeTrackerReservations(getTaskTracker(trackerName));
     // Check for new tasks to be executed on the tasktracker
     if (acceptNewTasks && !isBlacklisted) {
       TaskTrackerStatus taskTrackerStatus = getTaskTrackerStatus(trackerName) ;
@@ -2333,6 +2337,7 @@
         }
       }
     }
+    addTrackerReservations(getTaskTracker(trackerName));
       
     // Check for tasks to be killed
     List<TaskTrackerAction> killTasksList = getTasksToKill(trackerName);
@@ -2423,6 +2428,8 @@
     if (oldStatus != null) {
       totalMaps -= oldStatus.countMapTasks();
       totalReduces -= oldStatus.countReduceTasks();
+      occupiedMapSlots  -= oldStatus.countOccupiedMapSlots();
+      occupiedReduceSlots -= oldStatus.countOccupiedReduceSlots();
       if (!faultyTrackers.isBlacklisted(oldStatus.getHost())) {
         int mapSlots = oldStatus.getMaxMapSlots();
         totalMapTaskCapacity -= mapSlots;
@@ -2445,6 +2452,8 @@
     if (status != null) {
       totalMaps += status.countMapTasks();
       totalReduces += status.countReduceTasks();
+      occupiedMapSlots  += status.countOccupiedMapSlots();
+      occupiedReduceSlots += status.countOccupiedReduceSlots();
       if (!faultyTrackers.isBlacklisted(status.getHost())) {
         int mapSlots = status.getMaxMapSlots();
         totalMapTaskCapacity += mapSlots;
@@ -2513,6 +2522,18 @@
   }
   
   
+  // remove the tracker reservations from statistics
+  private void removeTrackerReservations(TaskTracker tt) {
+    reservedMapSlots -= tt.getReservedMapSlots();
+    reservedReduceSlots -= tt.getReservedReduceSlots();
+  }
+
+  // add the tracker reservations to statistics
+  private void addTrackerReservations(TaskTracker tt) {
+    reservedMapSlots += tt.getReservedMapSlots();
+    reservedReduceSlots += tt.getReservedReduceSlots();
+  }
+
   private void updateNodeHealthStatus(TaskTrackerStatus trackerStatus) {
     TaskTrackerHealthStatus status = trackerStatus.getHealthStatus();
     synchronized (faultyTrackers) {
@@ -2559,9 +2580,10 @@
         }
       }
     }
-
+    removeTrackerReservations(getTaskTracker(trackerName));
     updateTaskStatuses(trackerStatus);
     updateNodeHealthStatus(trackerStatus);
+    addTrackerReservations(getTaskTracker(trackerName));
     
     return true;
   }
@@ -2950,8 +2972,9 @@
   }
   
   public synchronized ClusterMetrics getClusterMetrics() {
-    return new ClusterMetrics(totalMaps, totalReduces, totalMapTaskCapacity,
-      totalReduceTaskCapacity, taskTrackers.size() - 
+    return new ClusterMetrics(occupiedMapSlots, occupiedReduceSlots,
+      reservedMapSlots, reservedReduceSlots,
+      totalMapTaskCapacity, totalReduceTaskCapacity, taskTrackers.size() - 
       getBlacklistedTrackerCount(), 
       getBlacklistedTrackerCount(), getExcludedNodes().size()) ;
   }
@@ -3724,6 +3747,7 @@
 
       // Cleanup
       taskTracker.cancelAllReservations();
+      removeTrackerReservations(taskTracker);
 
       // Purge 'marked' tasks, needs to be done  
       // here to prevent hanging references!

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java?rev=825469&r1=825468&r2=825469&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java Thu Oct 15
11:56:57 2009
@@ -506,7 +506,7 @@
   }
   
   public ClusterMetrics getClusterMetrics() {
-    return new ClusterMetrics(map_tasks, reduce_tasks, 1, 1, 1, 0, 0);
+    return new ClusterMetrics(map_tasks, reduce_tasks, 0, 0, 1, 1, 1, 0, 0);
   }
 
   public State getJobTrackerState() throws IOException, InterruptedException {

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/ClusterMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/ClusterMetrics.java?rev=825469&r1=825468&r2=825469&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/ClusterMetrics.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/ClusterMetrics.java Thu Oct
15 11:56:57 2009
@@ -35,10 +35,10 @@
  *   Number of blacklisted and decommissioned trackers.  
  *   </li>
  *   <li>
- *   Task capacity of the cluster. 
+ *   Slot capacity of the cluster. 
  *   </li>
  *   <li>
- *   The number of currently running map & reduce tasks.
+ *   The number of currently occupied/reserved map & reduce slots.
  *   </li>
  * </ol></p>
  * 
@@ -48,24 +48,30 @@
  * @see Cluster
  */
 public class ClusterMetrics implements Writable {
-  int runningMaps;
-  int runningReduces;
-  int mapSlots;
-  int reduceSlots;
-  int numTrackers;
-  int numBlacklistedTrackers;
-  int numDecommissionedTrackers;
+  private int occupiedMapSlots;
+  private int occupiedReduceSlots;
+  private int reservedMapSlots;
+  private int reservedReduceSlots;
+  private int totalMapSlots;
+  private int totalReduceSlots;
+  private int numTrackers;
+  private int numBlacklistedTrackers;
+  private int numDecommissionedTrackers;
 
   public ClusterMetrics() {
   }
   
-  public ClusterMetrics(int runningMaps, int runningReduces, int mapSlots, 
-    int reduceSlots, int numTrackers, int numBlacklistedTrackers,
-    int numDecommisionedNodes) {
-    this.runningMaps = runningMaps;
-    this.runningReduces = runningReduces;
-    this.mapSlots = mapSlots;
-    this.reduceSlots = reduceSlots;
+  public ClusterMetrics(int occupiedMapSlots, int occupiedReduceSlots,
+      int reservedMapSlots, int reservedReduceSlots,
+      int mapSlots, int reduceSlots, 
+      int numTrackers, int numBlacklistedTrackers,
+      int numDecommisionedNodes) {
+    this.occupiedMapSlots = occupiedMapSlots;
+    this.occupiedReduceSlots = occupiedReduceSlots;
+    this.reservedMapSlots = reservedMapSlots;
+    this.reservedReduceSlots = reservedReduceSlots;
+    this.totalMapSlots = mapSlots;
+    this.totalReduceSlots = reduceSlots;
     this.numTrackers = numTrackers;
     this.numBlacklistedTrackers = numBlacklistedTrackers;
     this.numDecommissionedTrackers = numDecommisionedNodes;
@@ -77,7 +83,7 @@
    * @return occupied map slot count
    */
   public int getOccupiedMapSlots() { 
-    return runningMaps;
+    return occupiedMapSlots;
   }
   
   /**
@@ -86,16 +92,34 @@
    * @return occupied reduce slot count
    */
   public int getOccupiedReduceSlots() { 
-    return runningReduces; 
+    return occupiedReduceSlots; 
+  }
+
+  /**
+   * Get number of reserved map slots in the cluster.
+   * 
+   * @return reserved map slot count
+   */
+  public int getReservedMapSlots() { 
+    return reservedMapSlots;
   }
   
   /**
+   * Get the number of reserved reduce slots in the cluster.
+   * 
+   * @return reserved reduce slot count
+   */
+  public int getReservedReduceSlots() { 
+    return reservedReduceSlots; 
+  }
+
+  /**
    * Get the total number of map slots in the cluster.
    * 
    * @return map slot capacity
    */
   public int getMapSlotCapacity() {
-    return mapSlots;
+    return totalMapSlots;
   }
   
   /**
@@ -104,7 +128,7 @@
    * @return reduce slot capacity
    */
   public int getReduceSlotCapacity() {
-    return reduceSlots;
+    return totalReduceSlots;
   }
   
   /**
@@ -136,10 +160,12 @@
 
   @Override
   public void readFields(DataInput in) throws IOException {
-    runningMaps = in.readInt();
-    runningReduces = in.readInt();
-    mapSlots = in.readInt();
-    reduceSlots = in.readInt();
+    occupiedMapSlots = in.readInt();
+    occupiedReduceSlots = in.readInt();
+    reservedMapSlots = in.readInt();
+    reservedReduceSlots = in.readInt();
+    totalMapSlots = in.readInt();
+    totalReduceSlots = in.readInt();
     numTrackers = in.readInt();
     numBlacklistedTrackers = in.readInt();
     numDecommissionedTrackers = in.readInt();
@@ -147,10 +173,12 @@
 
   @Override
   public void write(DataOutput out) throws IOException {
-    out.writeInt(runningMaps);
-    out.writeInt(runningReduces);
-    out.writeInt(mapSlots);
-    out.writeInt(reduceSlots);
+    out.writeInt(occupiedMapSlots);
+    out.writeInt(occupiedReduceSlots);
+    out.writeInt(reservedMapSlots);
+    out.writeInt(reservedReduceSlots);
+    out.writeInt(totalMapSlots);
+    out.writeInt(totalReduceSlots);
     out.writeInt(numTrackers);
     out.writeInt(numBlacklistedTrackers);
     out.writeInt(numDecommissionedTrackers);

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java?rev=825469&r1=825468&r2=825469&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java
(original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java
Thu Oct 15 11:56:57 2009
@@ -85,8 +85,9 @@
    * Version 27: Changed protocol to use new api objects. And the protocol is 
    *             renamed from JobSubmissionProtocol to ClientProtocol.
    * Version 28: Added getJobHistoryDir() as part of MAPREDUCE-975.
+   * Version 29: Added reservedSlots to ClusterMetrics.
    */
-  public static final long versionID = 28L;
+  public static final long versionID = 29L;
 
   /**
    * Allocate a name for the job.

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/jobtracker/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/jobtracker/TaskTracker.java?rev=825469&r1=825468&r2=825469&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/jobtracker/TaskTracker.java
(original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/jobtracker/TaskTracker.java
Thu Oct 15 11:56:57 2009
@@ -100,6 +100,30 @@
   }
   
   /**
+   * Get the reserved map slots for the tracker.
+   * 
+   * @return reserved map slots
+   */
+  public int getReservedMapSlots() {
+    if (jobForFallowMapSlot != null) {
+      return jobForFallowMapSlot.getReservedMapSlots(this);
+    }
+    return 0;
+  }
+	
+  /**
+   * Get the reserved reduce slots for the tracker.
+   * 
+   * @return reserved reduce slots
+   */
+  public int getReservedReduceSlots() {
+    if (jobForFallowReduceSlot != null) {
+      return jobForFallowReduceSlot.getReservedReduceSlots(this);
+    }
+    return 0;
+  }
+
+  /**
    * Get the {@link JobInProgress} for which the fallow slot(s) are held.
    * @param taskType {@link TaskType} of the task
    * @return the task for which the fallow slot(s) are held, 

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java?rev=825469&r1=825468&r2=825469&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java
(original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java
Thu Oct 15 11:56:57 2009
@@ -223,7 +223,7 @@
   }
   
   static short sendHeartBeat(JobTracker jt, TaskTrackerStatus status, 
-                                             boolean initialContact, 
+                             boolean initialContact, boolean acceptNewTasks,
                                              String tracker, short responseId) 
     throws IOException {
     if (status == null) {
@@ -231,13 +231,13 @@
           JobInProgress.convertTrackerNameToHostName(tracker));
 
     }
-      jt.heartbeat(status, false, initialContact, false, responseId);
+      jt.heartbeat(status, false, initialContact, acceptNewTasks, responseId);
       return ++responseId ;
   }
   
   static void establishFirstContact(JobTracker jt, String tracker) 
     throws IOException {
-    sendHeartBeat(jt, null, true, tracker, (short) 0);
+    sendHeartBeat(jt, null, true, false, tracker, (short) 0);
   }
 
   static class FakeTaskInProgress extends TaskInProgress {

Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestClusterStatus.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestClusterStatus.java?rev=825469&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestClusterStatus.java
(added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestClusterStatus.java
Thu Oct 15 11:56:57 2009
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.FakeObjectUtilities.FakeJobInProgress;
+import org.apache.hadoop.mapreduce.Cluster;
+import org.apache.hadoop.mapreduce.ClusterMetrics;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
+import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
+
+import junit.extensions.TestSetup;
+import junit.framework.Test;
+import junit.framework.TestCase;
+import junit.framework.TestSuite;
+
+public class TestClusterStatus extends TestCase {
+
+  private static String[] trackers = new String[] { "tracker_tracker1:1000",
+      "tracker_tracker2:1000", "tracker_tracker3:1000" };
+  private static JobTracker jobTracker;
+  private static int mapSlotsPerTracker = 4;
+  private static int reduceSlotsPerTracker = 2;
+  private static MiniMRCluster mr;
+  private static FakeJobInProgress fakeJob = null;
+  private static Cluster cluster;
+  // heartbeat responseId. increment this after sending a heartbeat
+  private static short responseId = 1;
+  
+  public static Test suite() {
+    TestSetup setup = new TestSetup(new TestSuite(TestClusterStatus.class)) {
+      protected void setUp() throws Exception {
+      
+        Configuration conf = new Configuration();
+        conf.setClass(JTConfig.JT_TASK_SCHEDULER, FakeTaskScheduler.class,
+                  TaskScheduler.class);
+        mr = new MiniMRCluster(0, "file:///", 1, null, null, new JobConf(conf));
+        jobTracker = mr.getJobTrackerRunner().getJobTracker();
+        for (String tracker : trackers) {
+          FakeObjectUtilities.establishFirstContact(jobTracker, tracker);
+        }
+        cluster = new Cluster(mr.createJobConf());
+      }
+
+      protected void tearDown() throws Exception {
+        cluster.close();
+        mr.shutdown();
+      }
+    };
+    return setup;
+  }
+  
+  static class FakeTaskScheduler extends JobQueueTaskScheduler {
+    public FakeTaskScheduler() {
+      super();
+    }
+    public List<Task> assignTasks(TaskTracker tt) {
+      tt.reserveSlots(TaskType.MAP, fakeJob, 2);
+      tt.reserveSlots(TaskType.REDUCE, fakeJob, 2);
+  return new ArrayList<Task>();  
+}
+  }
+  
+  private TaskTrackerStatus getTTStatus(String trackerName,
+      List<TaskStatus> taskStatuses) {
+    return new TaskTrackerStatus(trackerName, 
+      JobInProgress.convertTrackerNameToHostName(trackerName), 0,
+      taskStatuses, 0, mapSlotsPerTracker, reduceSlotsPerTracker);
+  }
+  
+  public void testClusterMetrics() throws IOException, InterruptedException {
+    assertEquals("tasktracker count doesn't match", trackers.length,
+      cluster.getClusterStatus().getTaskTrackerCount());
+    
+    List<TaskStatus> list = new ArrayList<TaskStatus>();
+
+    // create a map task status, which uses 2 slots. 
+    int mapSlotsPerTask = 2;
+    TaskStatus ts = TaskStatus.createTaskStatus(true, 
+      new TaskAttemptID("jt", 1, TaskType.MAP, 0, 0), 0.0f, mapSlotsPerTask,
+      TaskStatus.State.RUNNING, "", "", trackers[0], 
+      TaskStatus.Phase.MAP, null);
+    list.add(ts);
+    
+    // create a reduce task status, which uses 1 slot.
+    int reduceSlotsPerTask = 1;
+    ts = TaskStatus.createTaskStatus(false, 
+      new TaskAttemptID("jt", 1, TaskType.REDUCE, 0, 0), 0.0f,
+      reduceSlotsPerTask,
+      TaskStatus.State.RUNNING, "", "", trackers[0], 
+      TaskStatus.Phase.REDUCE, null);
+    list.add(ts);
+    
+    // create TaskTrackerStatus and send heartbeats
+    TaskTrackerStatus[] status = new TaskTrackerStatus[trackers.length];
+    status[0] = getTTStatus(trackers[0], list);
+    status[1] = getTTStatus(trackers[1], new ArrayList<TaskStatus>());
+    status[2] = getTTStatus(trackers[2], new ArrayList<TaskStatus>());
+    for (int i = 0; i< trackers.length; i++) {
+      FakeObjectUtilities.sendHeartBeat(jobTracker, status[i], false, false,
+        trackers[i], responseId);
+    }
+    responseId++;
+    // assert ClusterMetrics
+    ClusterMetrics metrics = cluster.getClusterStatus();
+    assertEquals("occupied map slots do not match", mapSlotsPerTask,
+      metrics.getOccupiedMapSlots());
+    assertEquals("occupied reduce slots do not match", reduceSlotsPerTask,
+      metrics.getOccupiedReduceSlots());
+    assertEquals("map slot capacities do not match",
+      mapSlotsPerTracker * trackers.length,
+      metrics.getMapSlotCapacity());
+    assertEquals("reduce slot capacities do not match",
+      reduceSlotsPerTracker * trackers.length,
+      metrics.getReduceSlotCapacity());
+    
+    // assert the values in ClusterStatus also
+    assertEquals("running map tasks do not match", 1,
+      jobTracker.getClusterStatus().getMapTasks());
+    assertEquals("running reduce tasks do not match", 1,
+      jobTracker.getClusterStatus().getReduceTasks());
+    assertEquals("map slot capacities do not match",
+      mapSlotsPerTracker * trackers.length,
+      jobTracker.getClusterStatus().getMaxMapTasks());
+    assertEquals("reduce slot capacities do not match",
+      reduceSlotsPerTracker * trackers.length,
+      jobTracker.getClusterStatus().getMaxReduceTasks());
+    cluster.close();
+  }
+  
+  public void testReservedSlots() throws Exception {
+    Configuration conf = mr.createJobConf();
+    conf.setInt(JobContext.NUM_MAPS, 1);
+
+    Job job = Job.getInstance(cluster, conf);
+    job.setNumReduceTasks(1);
+    job.setSpeculativeExecution(false);
+    job.setJobSetupCleanupNeeded(false);
+    
+    //Set task tracker objects for reservation.
+    TaskTracker tt1 = jobTracker.getTaskTracker(trackers[0]);
+    TaskTracker tt2 = jobTracker.getTaskTracker(trackers[1]);
+    TaskTrackerStatus status1 = new TaskTrackerStatus(
+        trackers[0],JobInProgress.convertTrackerNameToHostName(
+            trackers[0]),0,new ArrayList<TaskStatus>(), 0, 2, 2);
+    TaskTrackerStatus status2 = new TaskTrackerStatus(
+        trackers[1],JobInProgress.convertTrackerNameToHostName(
+            trackers[1]),0,new ArrayList<TaskStatus>(), 0, 2, 2);
+    tt1.setStatus(status1);
+    tt2.setStatus(status2);
+    
+    fakeJob = new FakeJobInProgress(new JobConf(job.getConfiguration()),
+                    jobTracker);
+    fakeJob.setClusterSize(3);
+    fakeJob.initTasks();
+    
+    FakeObjectUtilities.sendHeartBeat(jobTracker, status1, false,
+      true, trackers[0], responseId);
+    FakeObjectUtilities.sendHeartBeat(jobTracker, status2, false,
+      true, trackers[1], responseId);
+    responseId++; 
+    ClusterMetrics metrics = cluster.getClusterStatus();
+    assertEquals("reserved map slots do not match", 
+      4, metrics.getReservedMapSlots());
+    assertEquals("reserved reduce slots do not match", 
+      4, metrics.getReservedReduceSlots());
+    
+    TaskAttemptID mTid = fakeJob.findMapTask(trackers[1]);
+    TaskAttemptID rTid = fakeJob.findReduceTask(trackers[1]);
+
+    fakeJob.finishTask(mTid);
+    fakeJob.finishTask(rTid);
+    
+    assertEquals("Job didnt complete successfully complete", 
+      fakeJob.getStatus().getRunState(), JobStatus.SUCCEEDED);
+  }
+}

Modified: hadoop/mapreduce/trunk/src/webapps/job/jobtracker.jsp
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/webapps/job/jobtracker.jsp?rev=825469&r1=825468&r2=825469&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/webapps/job/jobtracker.jsp (original)
+++ hadoop/mapreduce/trunk/src/webapps/job/jobtracker.jsp Thu Oct 15 11:56:57 2009
@@ -25,6 +25,7 @@
   import="java.util.*"
   import="java.text.DecimalFormat"
   import="org.apache.hadoop.mapred.*"
+  import="org.apache.hadoop.mapreduce.*"
   import="org.apache.hadoop.util.*"
 %>
 <%!	private static final long serialVersionUID = 1L;
@@ -44,22 +45,29 @@
   
   public void generateSummaryTable(JspWriter out, ClusterStatus status,
                                    JobTracker tracker) throws IOException {
+    ClusterMetrics metrics = tracker.getClusterMetrics();
     String tasksPerNode = status.getTaskTrackers() > 0 ?
       percentFormat.format(((double)(status.getMaxMapTasks() +
                       status.getMaxReduceTasks())) / status.getTaskTrackers()):
       "-";
     out.print("<table border=\"1\" cellpadding=\"5\" cellspacing=\"0\">\n"+
               "<tr><th>Queues</th>" +
-              "<th>Maps</th><th>Reduces</th>" + 
+              "<th>Running Map Tasks</th><th>Running Reduce Tasks</th>"
+ 
+              "<th>Occupied Map Slots</th><th>Occupied Reduce Slots</th>"
+ 
+              "<th>Reserved Map Slots</th><th>Reserved Reduce Slots</th>"
+ 
               "<th>Total Submissions</th>" +
-              "<th>Nodes</th><th>Map Task Capacity</th>" +
-              "<th>Reduce Task Capacity</th><th>Avg. Tasks/Node</th>"
+ 
+              "<th>Nodes</th><th>Map Slot Capacity</th>" +
+              "<th>Reduce Slot Capacity</th><th>Avg. Slots/Node</th>"
+ 
               "<th>Blacklisted Nodes</th>" +
               "<th>Excluded Nodes</th></tr>\n");
     out.print("<tr><td><a href=\"queueinfo.jsp\">" +
               tracker.getRootQueues().length + "</a></td><td>" + 
               status.getMapTasks() + "</td><td>" +
               status.getReduceTasks() + "</td><td>" + 
+              metrics.getOccupiedMapSlots() + "</td><td>" +
+              metrics.getOccupiedReduceSlots() + "</td><td>" + 
+              metrics.getReservedMapSlots() + "</td><td>" +
+              metrics.getReservedReduceSlots() + "</td><td>" + 
               tracker.getTotalSubmissions() +
               "</td><td><a href=\"machines.jsp?type=active\">" +
               status.getTaskTrackers() +



Mime
View raw message