hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sha...@apache.org
Subject svn commit: r829312 - in /hadoop/mapreduce/trunk: ./ src/examples/org/apache/hadoop/examples/pi/ src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/mapreduce/ src/java/org/apache/hadoop/mapreduce/protocol/ src/test/mapred/org/apache/hadoop/m...
Date Sat, 24 Oct 2009 06:32:51 GMT
Author: sharad
Date: Sat Oct 24 06:32:51 2009
New Revision: 829312

URL: http://svn.apache.org/viewvc?rev=829312&view=rev
Log:
MAPREDUCE-1048. Add occupied/reserved slot usage summary on jobtracker UI. Contributed by
Amareshwari Sriramadasu and Hemanth Yamijala.

Added:
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestClusterStatus.java
Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/pi/DistSum.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/ClusterMetrics.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTrackerReservation.java
    hadoop/mapreduce/trunk/src/webapps/job/jobtracker.jsp

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=829312&r1=829311&r2=829312&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Sat Oct 24 06:32:51 2009
@@ -22,6 +22,10 @@
 
     MAPREDUCE-1103. Added more metrics to Jobtracker. (sharad) 
 
+    MAPREDUCE-1048. Add occupied/reserved slot usage summary on jobtracker UI.
+    (Amareshwari Sriramadasu and Hemanth Yamijala via sharad)
+
+
   OPTIMIZATIONS
 
     MAPREDUCE-270. Fix the tasktracker to optionally send an out-of-band

Modified: hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/pi/DistSum.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/pi/DistSum.java?rev=829312&r1=829311&r2=829312&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/pi/DistSum.java (original)
+++ hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/pi/DistSum.java Sat Oct
24 06:32:51 2009
@@ -38,9 +38,9 @@
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.ClusterStatus;
-import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobTracker;
+import org.apache.hadoop.mapreduce.Cluster;
+import org.apache.hadoop.mapreduce.ClusterMetrics;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Job;
@@ -379,14 +379,14 @@
   public static class MixMachine extends Machine {
     private static final MixMachine INSTANCE = new MixMachine();
     
-    private JobClient jobclient;
+    private Cluster cluster;
 
     /** {@inheritDoc} */
     @Override
     public synchronized void init(Job job) throws IOException {
       final Configuration conf = job.getConfiguration();
-      if (jobclient == null)
-        jobclient = new JobClient(JobTracker.getAddress(conf), conf);
+      if (cluster == null)
+        cluster = new Cluster(JobTracker.getAddress(conf), conf);
       chooseMachine(conf).init(job);
     }
 
@@ -398,9 +398,11 @@
       try {
         for(;; Thread.sleep(2000)) {
           //get cluster status
-          final ClusterStatus status = jobclient.getClusterStatus();
-          final int m = status.getMaxMapTasks() - status.getMapTasks();
-          final int r = status.getMaxReduceTasks() - status.getReduceTasks();
+          final ClusterMetrics status = cluster.getClusterStatus();
+          final int m = 
+            status.getMapSlotCapacity() - status.getOccupiedMapSlots();
+          final int r = 
+            status.getReduceSlotCapacity() - status.getOccupiedReduceSlots();
           if (m >= parts || r >= parts) {
             //favor ReduceSide machine
             final Machine value = r >= parts?

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=829312&r1=829311&r2=829312&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Sat Oct 24
06:32:51 2009
@@ -1692,6 +1692,7 @@
     else {
       jobtracker.getInstrumentation().addReservedReduceSlots(reservedSlots);
     }
+    jobtracker.incrementReservations(type, reservedSlots);
   }
   
   public synchronized void unreserveTaskTracker(TaskTracker taskTracker,
@@ -1724,6 +1725,7 @@
       jobtracker.getInstrumentation().decReservedReduceSlots(
         info.getNumSlots());
     }
+    jobtracker.decrementReservations(type, info.getNumSlots());
   }
   
   public int getNumReservedTaskTrackersForMaps() {

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=829312&r1=829311&r2=829312&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Sat Oct 24 06:32:51
2009
@@ -20,7 +20,6 @@
 
 import java.io.File;
 import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.io.UnsupportedEncodingException;
@@ -1225,6 +1224,10 @@
   //
   int totalMaps = 0;
   int totalReduces = 0;
+  private int occupiedMapSlots = 0;
+  private int occupiedReduceSlots = 0;
+  private int reservedMapSlots = 0;
+  private int reservedReduceSlots = 0;
   private HashMap<String, TaskTracker> taskTrackers =
     new HashMap<String, TaskTracker>();
   Map<String,Integer>uniqueHostsMap = new ConcurrentHashMap<String, Integer>();
@@ -2434,6 +2437,8 @@
     if (oldStatus != null) {
       totalMaps -= oldStatus.countMapTasks();
       totalReduces -= oldStatus.countReduceTasks();
+      occupiedMapSlots -= oldStatus.countOccupiedMapSlots();
+      occupiedReduceSlots -= oldStatus.countOccupiedReduceSlots();
       getInstrumentation().decOccupiedMapSlots(oldStatus.countOccupiedMapSlots());
       getInstrumentation().decOccupiedReduceSlots(oldStatus.countOccupiedReduceSlots());
       if (!faultyTrackers.isBlacklisted(oldStatus.getHost())) {
@@ -2458,6 +2463,8 @@
     if (status != null) {
       totalMaps += status.countMapTasks();
       totalReduces += status.countReduceTasks();
+      occupiedMapSlots += status.countOccupiedMapSlots();
+      occupiedReduceSlots += status.countOccupiedReduceSlots();
       getInstrumentation().addOccupiedMapSlots(status.countOccupiedMapSlots());
       getInstrumentation().addOccupiedReduceSlots(status.countOccupiedReduceSlots());
       if (!faultyTrackers.isBlacklisted(status.getHost())) {
@@ -2527,6 +2534,25 @@
     return oldStatus != null;
   }
   
+  // Increment the number of reserved slots in the cluster.
+  // This method assumes the caller has JobTracker lock.
+  void incrementReservations(TaskType type, int reservedSlots) {
+    if (type.equals(TaskType.MAP)) {
+      reservedMapSlots += reservedSlots;
+    } else if (type.equals(TaskType.REDUCE)) {
+      reservedReduceSlots += reservedSlots;
+    }
+  }
+
+  // Decrement the number of reserved slots in the cluster.
+  // This method assumes the caller has JobTracker lock.
+  void decrementReservations(TaskType type, int reservedSlots) {
+    if (type.equals(TaskType.MAP)) {
+      reservedMapSlots -= reservedSlots;
+    } else if (type.equals(TaskType.REDUCE)) {
+      reservedReduceSlots -= reservedSlots;
+    }
+  }
   
   private void updateNodeHealthStatus(TaskTrackerStatus trackerStatus) {
     TaskTrackerHealthStatus status = trackerStatus.getHealthStatus();
@@ -2965,9 +2991,12 @@
   }
   
   public synchronized ClusterMetrics getClusterMetrics() {
-    return new ClusterMetrics(totalMaps, totalReduces, totalMapTaskCapacity,
-      totalReduceTaskCapacity, taskTrackers.size() - 
-      getBlacklistedTrackerCount(), 
+    return new ClusterMetrics(totalMaps,
+      totalReduces, occupiedMapSlots, occupiedReduceSlots,
+      reservedMapSlots, reservedReduceSlots,
+      totalMapTaskCapacity, totalReduceTaskCapacity,
+      totalSubmissions,
+      taskTrackers.size() - getBlacklistedTrackerCount(), 
       getBlacklistedTrackerCount(), getExcludedNodes().size()) ;
   }
 

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java?rev=829312&r1=829311&r2=829312&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java Sat Oct 24
06:32:51 2009
@@ -507,7 +507,8 @@
   }
   
   public ClusterMetrics getClusterMetrics() {
-    return new ClusterMetrics(map_tasks, reduce_tasks, 1, 1, 1, 0, 0);
+    return new ClusterMetrics(map_tasks, reduce_tasks, map_tasks, reduce_tasks,
+      0, 0, 1, 1, jobs.size(), 1, 0, 0);
   }
 
   public State getJobTrackerState() throws IOException, InterruptedException {

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/ClusterMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/ClusterMetrics.java?rev=829312&r1=829311&r2=829312&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/ClusterMetrics.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/ClusterMetrics.java Sat Oct
24 06:32:51 2009
@@ -35,11 +35,17 @@
  *   Number of blacklisted and decommissioned trackers.  
  *   </li>
  *   <li>
- *   Task capacity of the cluster. 
+ *   Slot capacity of the cluster. 
+ *   </li>
+ *   <li>
+ *   The number of currently occupied/reserved map & reduce slots.
  *   </li>
  *   <li>
  *   The number of currently running map & reduce tasks.
  *   </li>
+ *   <li>
+ *   The number of job submissions.
+ *   </li>
  * </ol></p>
  * 
  * <p>Clients can query for the latest <code>ClusterMetrics</code>, via

@@ -48,27 +54,59 @@
  * @see Cluster
  */
 public class ClusterMetrics implements Writable {
-  int runningMaps;
-  int runningReduces;
-  int mapSlots;
-  int reduceSlots;
-  int numTrackers;
-  int numBlacklistedTrackers;
-  int numDecommissionedTrackers;
+  private int runningMaps;
+  private int runningReduces;
+  private int occupiedMapSlots;
+  private int occupiedReduceSlots;
+  private int reservedMapSlots;
+  private int reservedReduceSlots;
+  private int totalMapSlots;
+  private int totalReduceSlots;
+  private int totalJobSubmissions;
+  private int numTrackers;
+  private int numBlacklistedTrackers;
+  private int numDecommissionedTrackers;
 
   public ClusterMetrics() {
   }
   
-  public ClusterMetrics(int runningMaps, int runningReduces, int mapSlots, 
-    int reduceSlots, int numTrackers, int numBlacklistedTrackers,
-    int numDecommisionedNodes) {
+  public ClusterMetrics(int runningMaps, int runningReduces,
+      int occupiedMapSlots, int occupiedReduceSlots,
+      int reservedMapSlots, int reservedReduceSlots,
+      int mapSlots, int reduceSlots, 
+      int totalJobSubmissions,
+      int numTrackers, int numBlacklistedTrackers,
+      int numDecommissionedNodes) {
     this.runningMaps = runningMaps;
     this.runningReduces = runningReduces;
-    this.mapSlots = mapSlots;
-    this.reduceSlots = reduceSlots;
+    this.occupiedMapSlots = occupiedMapSlots;
+    this.occupiedReduceSlots = occupiedReduceSlots;
+    this.reservedMapSlots = reservedMapSlots;
+    this.reservedReduceSlots = reservedReduceSlots;
+    this.totalMapSlots = mapSlots;
+    this.totalReduceSlots = reduceSlots;
+    this.totalJobSubmissions = totalJobSubmissions;
     this.numTrackers = numTrackers;
     this.numBlacklistedTrackers = numBlacklistedTrackers;
-    this.numDecommissionedTrackers = numDecommisionedNodes;
+    this.numDecommissionedTrackers = numDecommissionedNodes;
+  }
+
+  /**
+   * Get the number of running map tasks in the cluster.
+   * 
+   * @return running maps
+   */
+  public int getRunningMaps() {
+    return runningMaps;
+  }
+  
+  /**
+   * Get the number of running reduce tasks in the cluster.
+   * 
+   * @return running reduces
+   */
+  public int getRunningReduces() {
+    return runningReduces;
   }
   
   /**
@@ -77,7 +115,7 @@
    * @return occupied map slot count
    */
   public int getOccupiedMapSlots() { 
-    return runningMaps;
+    return occupiedMapSlots;
   }
   
   /**
@@ -86,16 +124,34 @@
    * @return occupied reduce slot count
    */
   public int getOccupiedReduceSlots() { 
-    return runningReduces; 
+    return occupiedReduceSlots; 
+  }
+
+  /**
+   * Get number of reserved map slots in the cluster.
+   * 
+   * @return reserved map slot count
+   */
+  public int getReservedMapSlots() { 
+    return reservedMapSlots;
   }
   
   /**
+   * Get the number of reserved reduce slots in the cluster.
+   * 
+   * @return reserved reduce slot count
+   */
+  public int getReservedReduceSlots() { 
+    return reservedReduceSlots; 
+  }
+
+  /**
    * Get the total number of map slots in the cluster.
    * 
    * @return map slot capacity
    */
   public int getMapSlotCapacity() {
-    return mapSlots;
+    return totalMapSlots;
   }
   
   /**
@@ -104,7 +160,16 @@
    * @return reduce slot capacity
    */
   public int getReduceSlotCapacity() {
-    return reduceSlots;
+    return totalReduceSlots;
+  }
+  
+  /**
+   * Get the total number of job submissions in the cluster.
+   * 
+   * @return total number of job submissions
+   */
+  public int getTotalJobSubmissions() {
+    return totalJobSubmissions;
   }
   
   /**
@@ -138,8 +203,13 @@
   public void readFields(DataInput in) throws IOException {
     runningMaps = in.readInt();
     runningReduces = in.readInt();
-    mapSlots = in.readInt();
-    reduceSlots = in.readInt();
+    occupiedMapSlots = in.readInt();
+    occupiedReduceSlots = in.readInt();
+    reservedMapSlots = in.readInt();
+    reservedReduceSlots = in.readInt();
+    totalMapSlots = in.readInt();
+    totalReduceSlots = in.readInt();
+    totalJobSubmissions = in.readInt();
     numTrackers = in.readInt();
     numBlacklistedTrackers = in.readInt();
     numDecommissionedTrackers = in.readInt();
@@ -149,8 +219,13 @@
   public void write(DataOutput out) throws IOException {
     out.writeInt(runningMaps);
     out.writeInt(runningReduces);
-    out.writeInt(mapSlots);
-    out.writeInt(reduceSlots);
+    out.writeInt(occupiedMapSlots);
+    out.writeInt(occupiedReduceSlots);
+    out.writeInt(reservedMapSlots);
+    out.writeInt(reservedReduceSlots);
+    out.writeInt(totalMapSlots);
+    out.writeInt(totalReduceSlots);
+    out.writeInt(totalJobSubmissions);
     out.writeInt(numTrackers);
     out.writeInt(numBlacklistedTrackers);
     out.writeInt(numDecommissionedTrackers);

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java?rev=829312&r1=829311&r2=829312&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java
(original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java
Sat Oct 24 06:32:51 2009
@@ -85,8 +85,10 @@
    * Version 27: Changed protocol to use new api objects. And the protocol is 
    *             renamed from JobSubmissionProtocol to ClientProtocol.
    * Version 28: Added getJobHistoryDir() as part of MAPREDUCE-975.
+   * Version 29: Added reservedSlots, runningTasks and totalJobSubmissions
+   *             to ClusterMetrics as part of MAPREDUCE-1048.
    */
-  public static final long versionID = 28L;
+  public static final long versionID = 29L;
 
   /**
    * Allocate a name for the job.

Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestClusterStatus.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestClusterStatus.java?rev=829312&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestClusterStatus.java
(added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestClusterStatus.java
Sat Oct 24 06:32:51 2009
@@ -0,0 +1,270 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.FakeObjectUtilities.FakeJobInProgress;
+import org.apache.hadoop.mapreduce.Cluster;
+import org.apache.hadoop.mapreduce.ClusterMetrics;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
+import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
+
+import junit.extensions.TestSetup;
+import junit.framework.Test;
+import junit.framework.TestCase;
+import junit.framework.TestSuite;
+
+public class TestClusterStatus extends TestCase {
+
+  private static String[] trackers = new String[] { "tracker_tracker1:1000",
+      "tracker_tracker2:1000", "tracker_tracker3:1000" };
+  private static JobTracker jobTracker;
+  private static int mapSlotsPerTracker = 4;
+  private static int reduceSlotsPerTracker = 2;
+  private static MiniMRCluster mr;
+  private static FakeJobInProgress fakeJob = null;
+  private static Cluster cluster;
+  // heartbeat responseId. increment this after sending a heartbeat
+  private static short responseId = 1;
+  
+  public static Test suite() {
+    TestSetup setup = new TestSetup(new TestSuite(TestClusterStatus.class)) {
+      protected void setUp() throws Exception {
+        Configuration conf = new Configuration();
+        conf.setClass(JTConfig.JT_TASK_SCHEDULER, FakeTaskScheduler.class,
+                  TaskScheduler.class);
+        mr = new MiniMRCluster(0, "file:///", 1, null, null, new JobConf(conf));
+        jobTracker = mr.getJobTrackerRunner().getJobTracker();
+        for (String tracker : trackers) {
+          FakeObjectUtilities.establishFirstContact(jobTracker, tracker);
+        }
+        cluster = new Cluster(mr.createJobConf());
+      }
+
+      protected void tearDown() throws Exception {
+        cluster.close();
+        mr.shutdown();
+      }
+    };
+    return setup;
+  }
+  
+  /**
+   * Fake scheduler to test reservations.
+   * 
+   * The reservations are updated incrementally in each
+   * heartbeat to pass through the re-reservation logic.
+   */
+  static class FakeTaskScheduler extends JobQueueTaskScheduler {
+    
+    private Map<TaskTracker, Integer> reservedCounts 
+      = new HashMap<TaskTracker, Integer>();
+    
+    public FakeTaskScheduler() {
+      super();
+    }
+
+    public List<Task> assignTasks(TaskTracker tt) {
+      int currCount = 1;
+      if (reservedCounts.containsKey(tt)) {
+        currCount = reservedCounts.get(tt) + 1;
+      }
+      reservedCounts.put(tt, currCount);
+      tt.reserveSlots(TaskType.MAP, fakeJob, currCount);
+      tt.reserveSlots(TaskType.REDUCE, fakeJob, currCount);
+      return new ArrayList<Task>();  
+    }
+  }
+  
+  private TaskTrackerStatus getTTStatus(String trackerName,
+      List<TaskStatus> taskStatuses) {
+    return new TaskTrackerStatus(trackerName, 
+      JobInProgress.convertTrackerNameToHostName(trackerName), 0,
+      taskStatuses, 0, mapSlotsPerTracker, reduceSlotsPerTracker);
+  }
+  
+  public void testClusterMetrics() throws IOException, InterruptedException {
+    assertEquals("tasktracker count doesn't match", trackers.length,
+      cluster.getClusterStatus().getTaskTrackerCount());
+    
+    List<TaskStatus> list = new ArrayList<TaskStatus>();
+
+    // create a map task status, which uses 2 slots. 
+    int mapSlotsPerTask = 2;
+    addMapTaskAttemptToList(list, mapSlotsPerTask, TaskStatus.State.RUNNING);
+    
+    // create a reduce task status, which uses 1 slot.
+    int reduceSlotsPerTask = 1;
+    addReduceTaskAttemptToList(list, 
+        reduceSlotsPerTask, TaskStatus.State.RUNNING);
+    
+    // create TaskTrackerStatus and send heartbeats
+    sendHeartbeats(list);
+    
+    // assert ClusterMetrics
+    ClusterMetrics metrics = cluster.getClusterStatus();
+    assertEquals("occupied map slots do not match", mapSlotsPerTask,
+      metrics.getOccupiedMapSlots());
+    assertEquals("occupied reduce slots do not match", reduceSlotsPerTask,
+      metrics.getOccupiedReduceSlots());
+    assertEquals("map slot capacities do not match",
+      mapSlotsPerTracker * trackers.length,
+      metrics.getMapSlotCapacity());
+    assertEquals("reduce slot capacities do not match",
+      reduceSlotsPerTracker * trackers.length,
+      metrics.getReduceSlotCapacity());
+    assertEquals("running map tasks do not match", 1,
+      metrics.getRunningMaps());
+    assertEquals("running reduce tasks do not match", 1,
+      metrics.getRunningReduces());
+    
+    // assert the values in ClusterStatus also
+    assertEquals("running map tasks do not match", 1,
+      jobTracker.getClusterStatus().getMapTasks());
+    assertEquals("running reduce tasks do not match", 1,
+      jobTracker.getClusterStatus().getReduceTasks());
+    assertEquals("map slot capacities do not match",
+      mapSlotsPerTracker * trackers.length,
+      jobTracker.getClusterStatus().getMaxMapTasks());
+    assertEquals("reduce slot capacities do not match",
+      reduceSlotsPerTracker * trackers.length,
+      jobTracker.getClusterStatus().getMaxReduceTasks());
+    
+    // send a heartbeat finishing only a map and check
+    // counts are updated.
+    list.clear();
+    addMapTaskAttemptToList(list, mapSlotsPerTask, TaskStatus.State.SUCCEEDED);
+    addReduceTaskAttemptToList(list, 
+        reduceSlotsPerTask, TaskStatus.State.RUNNING);
+    sendHeartbeats(list);
+    metrics = jobTracker.getClusterMetrics();
+    assertEquals(0, metrics.getOccupiedMapSlots());
+    assertEquals(reduceSlotsPerTask, metrics.getOccupiedReduceSlots());
+    
+    // send a heartbeat finishing the reduce task also.
+    list.clear();
+    addReduceTaskAttemptToList(list, 
+        reduceSlotsPerTask, TaskStatus.State.SUCCEEDED);
+    sendHeartbeats(list);
+    metrics = jobTracker.getClusterMetrics();
+    assertEquals(0, metrics.getOccupiedReduceSlots());
+  }
+
+  private void sendHeartbeats(List<TaskStatus> list) throws IOException {
+    TaskTrackerStatus[] status = new TaskTrackerStatus[trackers.length];
+    status[0] = getTTStatus(trackers[0], list);
+    status[1] = getTTStatus(trackers[1], new ArrayList<TaskStatus>());
+    status[2] = getTTStatus(trackers[2], new ArrayList<TaskStatus>());
+    for (int i = 0; i< trackers.length; i++) {
+      FakeObjectUtilities.sendHeartBeat(jobTracker, status[i], false, false,
+        trackers[i], responseId);
+    }
+    responseId++;
+  }
+
+  private void addReduceTaskAttemptToList(List<TaskStatus> list, 
+      int reduceSlotsPerTask, TaskStatus.State state) {
+    TaskStatus ts = TaskStatus.createTaskStatus(false, 
+      new TaskAttemptID("jt", 1, TaskType.REDUCE, 0, 0), 0.0f,
+      reduceSlotsPerTask,
+      state, "", "", trackers[0], 
+      TaskStatus.Phase.REDUCE, null);
+    list.add(ts);
+  }
+
+  private void addMapTaskAttemptToList(List<TaskStatus> list, 
+      int mapSlotsPerTask, TaskStatus.State state) {
+    TaskStatus ts = TaskStatus.createTaskStatus(true, 
+      new TaskAttemptID("jt", 1, TaskType.MAP, 0, 0), 0.0f, mapSlotsPerTask,
+      state, "", "", trackers[0], 
+      TaskStatus.Phase.MAP, null);
+    list.add(ts);
+  }
+  
+  public void testReservedSlots() throws Exception {
+    Configuration conf = mr.createJobConf();
+    conf.setInt(JobContext.NUM_MAPS, 1);
+
+    Job job = Job.getInstance(cluster, conf);
+    job.setNumReduceTasks(1);
+    job.setSpeculativeExecution(false);
+    job.setJobSetupCleanupNeeded(false);
+    
+    //Set task tracker objects for reservation.
+    TaskTracker tt1 = jobTracker.getTaskTracker(trackers[0]);
+    TaskTracker tt2 = jobTracker.getTaskTracker(trackers[1]);
+    TaskTrackerStatus status1 = new TaskTrackerStatus(
+        trackers[0],JobInProgress.convertTrackerNameToHostName(
+            trackers[0]),0,new ArrayList<TaskStatus>(), 0, 2, 2);
+    TaskTrackerStatus status2 = new TaskTrackerStatus(
+        trackers[1],JobInProgress.convertTrackerNameToHostName(
+            trackers[1]),0,new ArrayList<TaskStatus>(), 0, 2, 2);
+    tt1.setStatus(status1);
+    tt2.setStatus(status2);
+    
+    fakeJob = new FakeJobInProgress(new JobConf(job.getConfiguration()),
+                    jobTracker);
+    fakeJob.setClusterSize(3);
+    fakeJob.initTasks();
+    
+    FakeObjectUtilities.sendHeartBeat(jobTracker, status1, false,
+      true, trackers[0], responseId);
+    FakeObjectUtilities.sendHeartBeat(jobTracker, status2, false,
+      true, trackers[1], responseId);
+    responseId++; 
+    ClusterMetrics metrics = cluster.getClusterStatus();
+    assertEquals("reserved map slots do not match", 
+      2, metrics.getReservedMapSlots());
+    assertEquals("reserved reduce slots do not match", 
+      2, metrics.getReservedReduceSlots());
+
+    // redo to test re-reservations.
+    FakeObjectUtilities.sendHeartBeat(jobTracker, status1, false,
+        true, trackers[0], responseId);
+    FakeObjectUtilities.sendHeartBeat(jobTracker, status2, false,
+        true, trackers[1], responseId);
+    responseId++; 
+    metrics = cluster.getClusterStatus();
+    assertEquals("reserved map slots do not match", 
+        4, metrics.getReservedMapSlots());
+    assertEquals("reserved reduce slots do not match", 
+        4, metrics.getReservedReduceSlots());
+
+    TaskAttemptID mTid = fakeJob.findMapTask(trackers[1]);
+    TaskAttemptID rTid = fakeJob.findReduceTask(trackers[1]);
+
+    fakeJob.finishTask(mTid);
+    fakeJob.finishTask(rTid);
+    
+    assertEquals("Job didnt complete successfully complete", 
+      fakeJob.getStatus().getRunState(), JobStatus.SUCCEEDED);
+    metrics = cluster.getClusterStatus();
+    assertEquals("reserved map slots do not match", 
+      0, metrics.getReservedMapSlots());
+    assertEquals("reserved reduce slots do not match", 
+      0, metrics.getReservedReduceSlots());
+  }
+}

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTrackerReservation.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTrackerReservation.java?rev=829312&r1=829311&r2=829312&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTrackerReservation.java
(original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTrackerReservation.java
Sat Oct 24 06:32:51 2009
@@ -23,6 +23,7 @@
 import javax.security.auth.login.LoginException;
 
 import org.apache.hadoop.mapred.FakeObjectUtilities.FakeJobInProgress;
+import org.apache.hadoop.mapreduce.ClusterMetrics;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
 import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
@@ -124,6 +125,11 @@
         2, fjob.getNumReservedTaskTrackersForMaps());
     assertEquals("Trackers not reserved for the job : reduces", 
         2, fjob.getNumReservedTaskTrackersForReduces());
+    ClusterMetrics metrics = jobTracker.getClusterMetrics();
+    assertEquals("reserved map slots do not match",
+          4, metrics.getReservedMapSlots());
+    assertEquals("reserved reduce slots do not match",
+          4, metrics.getReservedReduceSlots());
     
     TaskAttemptID mTid = fjob.findMapTask(trackers[1]);
     TaskAttemptID rTid = fjob.findReduceTask(trackers[1]);
@@ -138,6 +144,11 @@
         0, fjob.getNumReservedTaskTrackersForMaps());
     assertEquals("Reservation for the job not released : Reduces", 
         0, fjob.getNumReservedTaskTrackersForReduces());
+    metrics = jobTracker.getClusterMetrics();
+    assertEquals("reserved map slots do not match",
+        0, metrics.getReservedMapSlots());
+    assertEquals("reserved reduce slots do not match",
+        0, metrics.getReservedReduceSlots());
   }
   
   /**
@@ -165,6 +176,11 @@
         0, job.getNumReservedTaskTrackersForMaps());
     assertEquals("Reservation for the job not released : Reduces", 
         0, job.getNumReservedTaskTrackersForReduces());
+    ClusterMetrics metrics = jobTracker.getClusterMetrics();
+    assertEquals("reserved map slots do not match",
+        0, metrics.getReservedMapSlots());
+    assertEquals("reserved reduce slots do not match",
+        0, metrics.getReservedReduceSlots());
   }
   
   /**
@@ -212,6 +228,11 @@
         2, job.getNumReservedTaskTrackersForMaps());
     assertEquals("Trackers not reserved for the job : reduces", 
         2, job.getNumReservedTaskTrackersForReduces());
+    ClusterMetrics metrics = jobTracker.getClusterMetrics();
+    assertEquals("reserved map slots do not match",
+        4, metrics.getReservedMapSlots());
+    assertEquals("reserved reduce slots do not match",
+        4, metrics.getReservedReduceSlots());
   
     /*
      * FakeJobInProgress.findMapTask does not handle
@@ -230,6 +251,12 @@
         1, job.getNumReservedTaskTrackersForMaps());
     assertEquals("Extra Trackers reserved for the job : reduces", 
         1, job.getNumReservedTaskTrackersForReduces());
+    metrics = jobTracker.getClusterMetrics();
+    assertEquals("reserved map slots do not match",
+        2, metrics.getReservedMapSlots());
+    assertEquals("reserved reduce slots do not match",
+        2, metrics.getReservedReduceSlots());
+
     //Finish the map task on the tracker 1. Finishing it here to work
     //around bug in the FakeJobInProgress object
     job.finishTask(mTid);
@@ -245,7 +272,11 @@
         0, job.getNumReservedTaskTrackersForMaps());
     assertEquals("Trackers not unreserved for the job : reduces", 
         0, job.getNumReservedTaskTrackersForReduces());
-    
+    metrics = jobTracker.getClusterMetrics();
+    assertEquals("reserved map slots do not match",
+        0, metrics.getReservedMapSlots());
+    assertEquals("reserved reduce slots do not match",
+        0, metrics.getReservedReduceSlots());
   }
 }
   
\ No newline at end of file

Modified: hadoop/mapreduce/trunk/src/webapps/job/jobtracker.jsp
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/webapps/job/jobtracker.jsp?rev=829312&r1=829311&r2=829312&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/webapps/job/jobtracker.jsp (original)
+++ hadoop/mapreduce/trunk/src/webapps/job/jobtracker.jsp Sat Oct 24 06:32:51 2009
@@ -25,6 +25,7 @@
   import="java.util.*"
   import="java.text.DecimalFormat"
   import="org.apache.hadoop.mapred.*"
+  import="org.apache.hadoop.mapreduce.*"
   import="org.apache.hadoop.util.*"
 %>
 <%!	private static final long serialVersionUID = 1L;
@@ -32,6 +33,7 @@
 <%
   JobTracker tracker = (JobTracker) application.getAttribute("job.tracker");
   ClusterStatus status = tracker.getClusterStatus();
+  ClusterMetrics metrics = tracker.getClusterMetrics();
   String trackerName = 
            StringUtils.simpleHostname(tracker.getJobTrackerMachine());
   JobQueueInfo[] queues = tracker.getJobQueues();
@@ -42,34 +44,41 @@
 <%!
   private static DecimalFormat percentFormat = new DecimalFormat("##0.00");
   
-  public void generateSummaryTable(JspWriter out, ClusterStatus status,
+  public void generateSummaryTable(JspWriter out, ClusterMetrics metrics,
                                    JobTracker tracker) throws IOException {
-    String tasksPerNode = status.getTaskTrackers() > 0 ?
-      percentFormat.format(((double)(status.getMaxMapTasks() +
-                      status.getMaxReduceTasks())) / status.getTaskTrackers()):
+    String tasksPerNode = metrics.getTaskTrackerCount() > 0 ?
+      percentFormat.format(((double)(metrics.getMapSlotCapacity() +
+      metrics.getReduceSlotCapacity())) / metrics.getTaskTrackerCount()):
       "-";
     out.print("<table border=\"1\" cellpadding=\"5\" cellspacing=\"0\">\n"+
               "<tr><th>Queues</th>" +
-              "<th>Maps</th><th>Reduces</th>" + 
+              "<th>Running Map Tasks</th><th>Running Reduce Tasks</th>"
+ 
               "<th>Total Submissions</th>" +
-              "<th>Nodes</th><th>Map Task Capacity</th>" +
-              "<th>Reduce Task Capacity</th><th>Avg. Tasks/Node</th>"
+ 
+              "<th>Nodes</th>" +
+              "<th>Occupied Map Slots</th><th>Occupied Reduce Slots</th>"
+ 
+              "<th>Reserved Map Slots</th><th>Reserved Reduce Slots</th>"
+ 
+              "<th>Map Slot Capacity</th>" +
+              "<th>Reduce Slot Capacity</th><th>Avg. Slots/Node</th>"
+ 
               "<th>Blacklisted Nodes</th>" +
               "<th>Excluded Nodes</th></tr>\n");
     out.print("<tr><td><a href=\"queueinfo.jsp\">" +
               tracker.getRootQueues().length + "</a></td><td>" + 
-              status.getMapTasks() + "</td><td>" +
-              status.getReduceTasks() + "</td><td>" + 
-              tracker.getTotalSubmissions() +
+              metrics.getRunningMaps() + "</td><td>" +
+              metrics.getRunningReduces() + "</td><td>" + 
+              metrics.getTotalJobSubmissions() +
               "</td><td><a href=\"machines.jsp?type=active\">" +
-              status.getTaskTrackers() +
-              "</a></td><td>" + status.getMaxMapTasks() +
-              "</td><td>" + status.getMaxReduceTasks() +
+              metrics.getTaskTrackerCount() + "</a></td><td>" + 
+              metrics.getOccupiedMapSlots() + "</td><td>" +
+              metrics.getOccupiedReduceSlots() + "</td><td>" + 
+              metrics.getReservedMapSlots() + "</td><td>" +
+              metrics.getReservedReduceSlots() + "</td><td>" + 
+              + metrics.getMapSlotCapacity() +
+              "</td><td>" + metrics.getReduceSlotCapacity() +
               "</td><td>" + tasksPerNode +
               "</td><td><a href=\"machines.jsp?type=blacklisted\">" +
-              status.getBlacklistedTrackers() + "</a>" +
+              metrics.getBlackListedTaskTrackerCount() + "</a>" +
               "</td><td><a href=\"machines.jsp?type=excluded\">" +
-              status.getNumExcludedNodes() + "</a>" +
+              metrics.getDecommissionedTaskTrackerCount() + "</a>" +
               "</td></tr></table>\n");
 
     out.print("<br>");
@@ -120,7 +129,7 @@
 <hr>
 <h2>Cluster Summary (Heap Size is <%= StringUtils.byteDesc(status.getUsedMemory())
%>/<%= StringUtils.byteDesc(status.getMaxMemory()) %>)</h2>
 <% 
- generateSummaryTable(out, status, tracker); 
+ generateSummaryTable(out, metrics, tracker); 
 %>
 <hr>
 <b>Filter (Jobid, Priority, User, Name)</b> <input type="text" id="filter"
onkeyup="applyfilter()"> <br>



Mime
View raw message