hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sha...@apache.org
Subject svn commit: r828979 - in /hadoop/mapreduce/trunk: ./ src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/ src/contrib/fairscheduler/ src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/mapred/ src/test/m...
Date Fri, 23 Oct 2009 09:54:23 GMT
Author: sharad
Date: Fri Oct 23 09:54:22 2009
New Revision: 828979

URL: http://svn.apache.org/viewvc?rev=828979&view=rev
Log:
MAPREDUCE-1103. Added more metrics to Jobtracker. Contributed by Sharad Agarwal.

Added:
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobTrackerInstrumentation.java
Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/CapacityTestUtils.java
    hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
    hadoop/mapreduce/trunk/src/contrib/fairscheduler/ivy.xml
    hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTrackerInstrumentation.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTrackerMetricsInst.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobKillAndFail.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestParallelInitialization.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestResourceEstimation.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/UtilsForTests.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=828979&r1=828978&r2=828979&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Fri Oct 23 09:54:22 2009
@@ -20,6 +20,8 @@
     Enhanced FileOutputCommitter to create a _SUCCESS file for successful
     jobs. (Amar Kamat & Jothi Padmanabhan via acmurthy) 
 
+    MAPREDUCE-1103. Added more metrics to Jobtracker. (sharad) 
+
   OPTIMIZATIONS
 
     MAPREDUCE-270. Fix the tasktracker to optionally send an out-of-band

Modified: hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/CapacityTestUtils.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/CapacityTestUtils.java?rev=828979&r1=828978&r2=828979&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/CapacityTestUtils.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/CapacityTestUtils.java Fri Oct 23 09:54:22 2009
@@ -307,8 +307,9 @@
 
     public FakeJobInProgress(
       JobID jId, JobConf jobConf,
-      FakeTaskTrackerManager taskTrackerManager, String user) {
-      super(jId, jobConf, null);
+      FakeTaskTrackerManager taskTrackerManager, String user, 
+      JobTracker jt) throws IOException {
+      super(jId, jobConf, jt);
       this.taskTrackerManager = taskTrackerManager;
       this.startTime = System.currentTimeMillis();
       this.status = new JobStatus(
@@ -472,8 +473,9 @@
 
     public FakeFailingJobInProgress(
       JobID id, JobConf jobConf,
-      FakeTaskTrackerManager taskTrackerManager, String user) {
-      super(id, jobConf, taskTrackerManager, user);
+      FakeTaskTrackerManager taskTrackerManager, String user, 
+      JobTracker jt) throws IOException {
+      super(id, jobConf, taskTrackerManager, user, jt);
     }
 
     @Override
@@ -786,7 +788,7 @@
       FakeJobInProgress job =
           new FakeJobInProgress(new JobID("test", ++jobCounter),
               (jobConf == null ? new JobConf(defaultJobConf) : jobConf), this,
-              jobConf.getUser());
+              jobConf.getUser(), UtilsForTests.getJobTracker());
       job.getStatus().setRunState(state);
       this.submitJob(job);
       return job;

Modified: hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java?rev=828979&r1=828978&r2=828979&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java Fri Oct 23 09:54:22 2009
@@ -2028,7 +2028,7 @@
     FakeJobInProgress job =
       new FakeFailingJobInProgress(
         new JobID("test", ++jobCounter),
-        new JobConf(), taskTrackerManager, "u1");
+        new JobConf(), taskTrackerManager, "u1", UtilsForTests.getJobTracker());
     job.getStatus().setRunState(JobStatus.PREP);
     taskTrackerManager.submitJob(job);
     //check if job is present in waiting list.

Modified: hadoop/mapreduce/trunk/src/contrib/fairscheduler/ivy.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/fairscheduler/ivy.xml?rev=828979&r1=828978&r2=828979&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/fairscheduler/ivy.xml (original)
+++ hadoop/mapreduce/trunk/src/contrib/fairscheduler/ivy.xml Fri Oct 23 09:54:22 2009
@@ -54,5 +54,21 @@
       name="paranamer-ant"
       rev="${paranamer.version}"
       conf="common->default"/>
+    <dependency org="org.mortbay.jetty"
+      name="jetty-util"
+      rev="${jetty-util.version}"
+      conf="common->master"/>
+    <dependency org="org.mortbay.jetty"
+      name="jetty"
+      rev="${jetty.version}"
+      conf="common->master"/>
+    <dependency org="org.mortbay.jetty"
+      name="jsp-api-2.1"
+      rev="${jetty.version}"
+      conf="common->master"/>
+    <dependency org="org.mortbay.jetty"
+      name="jsp-2.1"
+      rev="${jetty.version}"
+      conf="common->master"/>
   </dependencies>
 </ivy-module>

Modified: hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java?rev=828979&r1=828978&r2=828979&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java Fri Oct 23 09:54:22 2009
@@ -64,8 +64,8 @@
     
     public FakeJobInProgress(JobConf jobConf,
         FakeTaskTrackerManager taskTrackerManager, 
-        String[][] mapInputLocations) throws IOException {
-      super(new JobID("test", ++jobCounter), jobConf, null);
+        String[][] mapInputLocations, JobTracker jt) throws IOException {
+      super(new JobID("test", ++jobCounter), jobConf, jt);
       this.taskTrackerManager = taskTrackerManager;
       this.mapInputLocations = mapInputLocations;
       this.startTime = System.currentTimeMillis();
@@ -536,7 +536,7 @@
     if (pool != null)
       jobConf.set(POOL_PROPERTY, pool);
     JobInProgress job = new FakeJobInProgress(jobConf, taskTrackerManager,
-        mapInputLocations);
+        mapInputLocations, UtilsForTests.getJobTracker());
     job.getStatus().setRunState(state);
     taskTrackerManager.submitJob(job);
     job.startTime = clock.time;

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=828979&r1=828978&r2=828979&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Fri Oct 23 09:54:22 2009
@@ -323,6 +323,7 @@
     this.status = new JobStatus(jobid, 0.0f, 0.0f, JobStatus.PREP, 
         this.profile.getUser(), this.profile.getJobName(), 
         this.profile.getJobFile(), "");
+    this.jobtracker.getInstrumentation().addPrepJob(conf, jobid);
     this.taskCompletionEvents = new ArrayList<TaskCompletionEvent>
     (numMapTasks + numReduceTasks + 10);
     
@@ -377,6 +378,7 @@
     this.status = new JobStatus(jobid, 0.0f, 0.0f, JobStatus.PREP, 
         profile.getUser(), profile.getJobName(), profile.getJobFile(), 
         profile.getURL().toString());
+    this.jobtracker.getInstrumentation().addPrepJob(conf, jobid);
     status.setStartTime(startTime);
     this.status.setJobPriority(this.priority);
 
@@ -748,7 +750,7 @@
   void setupComplete() {
     status.setSetupProgress(1.0f);
     if (this.status.getRunState() == JobStatus.PREP) {
-      this.status.setRunState(JobStatus.RUNNING);
+      changeStateTo(JobStatus.RUNNING);
       JobStatusChangedEvent jse = 
         new JobStatusChangedEvent(profile.getJobID(),
          JobStatus.getJobRunState(JobStatus.RUNNING));
@@ -1539,6 +1541,7 @@
       name = TaskType.JOB_CLEANUP;
     } else if (tip.isMapTask()) {
       ++runningMapTasks;
+      metrics.addRunningMaps(jobId, 1);
       name = TaskType.MAP;
       counter = JobCounter.TOTAL_LAUNCHED_MAPS;
       splits = tip.getSplitNodes();
@@ -1550,6 +1553,7 @@
       metrics.launchMap(id);
     } else {
       ++runningReduceTasks;
+      metrics.addRunningReduces(jobId, 1);
       name = TaskType.REDUCE;
       counter = JobCounter.TOTAL_LAUNCHED_REDUCES;
       if (tip.isSpeculating()) {
@@ -1661,8 +1665,10 @@
     long now = System.currentTimeMillis();
     
     FallowSlotInfo info = map.get(taskTracker);
+    int reservedSlots = 0;
     if (info == null) {
       info = new FallowSlotInfo(now, numSlots);
+      reservedSlots = numSlots;
     } else {
       // Increment metering info if the reservation is changing
       if (info.getNumSlots() != numSlots) {
@@ -1674,11 +1680,18 @@
         jobCounters.incrCounter(counter, fallowSlotMillis);
         
         // Update 
+        reservedSlots = numSlots - info.getNumSlots();
         info.setTimestamp(now);
         info.setNumSlots(numSlots);
       }
     }
     map.put(taskTracker, info);
+    if (type == TaskType.MAP) {
+      jobtracker.getInstrumentation().addReservedMapSlots(reservedSlots);
+    }
+    else {
+      jobtracker.getInstrumentation().addReservedReduceSlots(reservedSlots);
+    }
   }
   
   public synchronized void unreserveTaskTracker(TaskTracker taskTracker,
@@ -1704,6 +1717,13 @@
     jobCounters.incrCounter(counter, fallowSlotMillis);
 
     map.remove(taskTracker);
+    if (type == TaskType.MAP) {
+      jobtracker.getInstrumentation().decReservedMapSlots(info.getNumSlots());
+    }
+    else {
+      jobtracker.getInstrumentation().decReservedReduceSlots(
+        info.getNumSlots());
+    }
   }
   
   public int getNumReservedTaskTrackersForMaps() {
@@ -2582,6 +2602,7 @@
       jobtracker.markCompletedTaskAttempt(status.getTaskTracker(), taskid);
     } else if (tip.isMapTask()) {
       runningMapTasks -= 1;
+      metrics.decRunningMaps(jobId, 1);
       finishedMapTasks += 1;
       metrics.completeMap(taskid);
       if (!tip.isJobSetupTask() && hasSpeculativeMaps) {
@@ -2594,6 +2615,7 @@
       }
     } else {
       runningReduceTasks -= 1;
+      metrics.decRunningReduces(jobId, 1);
       finishedReduceTasks += 1;
       metrics.completeReduce(taskid);
       if (!tip.isJobSetupTask() && hasSpeculativeReduces) {
@@ -2657,7 +2679,32 @@
   public float getSlowTaskThreshold() {
     return slowTaskThreshold;
   }
-  
+
+  /**
+   * Job state change must happen thru this call
+   */
+  private void changeStateTo(int newState) {
+    int oldState = this.status.getRunState();
+    if (oldState == newState) {
+      return; //old and new states are same
+    }
+    this.status.setRunState(newState);
+    
+    //update the metrics
+    if (oldState == JobStatus.PREP) {
+      this.jobtracker.getInstrumentation().decPrepJob(conf, jobId);
+    } else if (oldState == JobStatus.RUNNING) {
+      this.jobtracker.getInstrumentation().decRunningJob(conf, jobId);
+    }
+    
+    if (newState == JobStatus.PREP) {
+      this.jobtracker.getInstrumentation().addPrepJob(conf, jobId);
+    } else if (newState == JobStatus.RUNNING) {
+      this.jobtracker.getInstrumentation().addRunningJob(conf, jobId);
+    }
+    
+  }
+
   /**
    * The job is done since all it's component tasks are either
    * successful or have failed.
@@ -2669,7 +2716,7 @@
     //
     if (this.status.getRunState() == JobStatus.RUNNING ||
         this.status.getRunState() == JobStatus.PREP) {
-      this.status.setRunState(JobStatus.SUCCEEDED);
+      changeStateTo(JobStatus.SUCCEEDED);
       this.status.setCleanupProgress(1.0f);
       if (maps.length == 0) {
         this.status.setMapProgress(1.0f);
@@ -2718,9 +2765,9 @@
       this.status.setFinishTime(this.finishTime);
 
       if (jobTerminationState == JobStatus.FAILED) {
-        this.status.setRunState(JobStatus.FAILED);
+        changeStateTo(JobStatus.FAILED);
       } else {
-        this.status.setRunState(JobStatus.KILLED);
+        changeStateTo(JobStatus.KILLED);
       }
       // Log the job summary
       JobSummary.logJobSummary(this, jobtracker.getClusterStatus(false));
@@ -2739,6 +2786,13 @@
 
       jobtracker.getInstrumentation().terminateJob(
           this.conf, this.status.getJobID());
+      if (jobTerminationState == JobStatus.FAILED) {
+        jobtracker.getInstrumentation().failedJob(
+            this.conf, this.status.getJobID());
+      } else {
+        jobtracker.getInstrumentation().killedJob(
+            this.conf, this.status.getJobID());
+      }
     }
   }
 
@@ -2910,6 +2964,7 @@
         launchedSetup = false;
       } else if (tip.isMapTask()) {
         runningMapTasks -= 1;
+        metrics.decRunningMaps(jobId, 1);
         metrics.failedMap(taskid);
         // remove from the running queue and put it in the non-running cache
         // if the tip is not complete i.e if the tip still needs to be run
@@ -2919,6 +2974,7 @@
         }
       } else {
         runningReduceTasks -= 1;
+        metrics.decRunningReduces(jobId, 1);
         metrics.failedReduce(taskid);
         // remove from the running queue and put in the failed queue if the tip
         // is not complete

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=828979&r1=828978&r2=828979&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Fri Oct 23 09:54:22 2009
@@ -425,10 +425,11 @@
                 // tracker is lost, and if it is blacklisted, remove 
                 // it from the count of blacklisted trackers in the cluster
                 if (isBlacklisted(trackerName)) {
-                  faultyTrackers.numBlacklistedTrackers -= 1;
+                  faultyTrackers.decrBlackListedTrackers(1);
                 }
                 updateTaskTrackerStatus(trackerName, null);
                 statistics.taskTrackerRemoved(trackerName);
+                getInstrumentation().decTrackers(1);
                 // remove the mapping from the hosts list
                 String hostname = newProfile.getHost();
                 hostnameToTaskTracker.get(hostname).remove(trackerName);
@@ -650,7 +651,16 @@
       }        
     }
 
-    
+    private void incrBlackListedTrackers(int count) {
+      numBlacklistedTrackers += count;
+      getInstrumentation().addBlackListedTrackers(count);
+    }
+
+    private void decrBlackListedTrackers(int count) {
+      numBlacklistedTrackers -= count;
+      getInstrumentation().decBlackListedTrackers(count);
+    }
+
     private void blackListTracker(String hostName, String reason, ReasonForBlackListing rfb) {
       FaultInfo fi = getFaultInfo(hostName, true);
       boolean blackListed = fi.isBlacklisted();
@@ -809,7 +819,7 @@
           getInstrumentation().addBlackListedReduceSlots(
               reduceSlots);
         }
-        numBlacklistedTrackers += uniqueHostsMap.remove(hostName);
+        incrBlackListedTrackers(uniqueHostsMap.remove(hostName));
       }
     }
     
@@ -829,7 +839,7 @@
         }
         uniqueHostsMap.put(hostName,
                            numTrackersOnHost);
-        numBlacklistedTrackers -= numTrackersOnHost;
+        decrBlackListedTrackers(numTrackersOnHost);
       }
     }
 
@@ -2126,6 +2136,7 @@
       hostnameToTaskTracker.put(hostname, trackers);
     }
     statistics.taskTrackerAdded(status.getTrackerName());
+    getInstrumentation().addTrackers(1);
     LOG.info("Adding tracker " + status.getTrackerName() + " to host " 
              + hostname);
     trackers.add(taskTracker);
@@ -2423,6 +2434,8 @@
     if (oldStatus != null) {
       totalMaps -= oldStatus.countMapTasks();
       totalReduces -= oldStatus.countReduceTasks();
+      getInstrumentation().decOccupiedMapSlots(oldStatus.countOccupiedMapSlots());
+      getInstrumentation().decOccupiedReduceSlots(oldStatus.countOccupiedReduceSlots());
       if (!faultyTrackers.isBlacklisted(oldStatus.getHost())) {
         int mapSlots = oldStatus.getMaxMapSlots();
         totalMapTaskCapacity -= mapSlots;
@@ -2445,6 +2458,8 @@
     if (status != null) {
       totalMaps += status.countMapTasks();
       totalReduces += status.countReduceTasks();
+      getInstrumentation().addOccupiedMapSlots(status.countOccupiedMapSlots());
+      getInstrumentation().addOccupiedReduceSlots(status.countOccupiedReduceSlots());
       if (!faultyTrackers.isBlacklisted(status.getHost())) {
         int mapSlots = status.getMaxMapSlots();
         totalMapTaskCapacity += mapSlots;
@@ -2553,7 +2568,7 @@
           // if this is lost tracker that came back now, and if it blacklisted
           // increment the count of blacklisted trackers in the cluster
           if (isBlacklisted(trackerName)) {
-            faultyTrackers.numBlacklistedTrackers += 1;
+            faultyTrackers.incrBlackListedTrackers(1);
           }
           addNewTracker(taskTracker);
         }
@@ -3766,12 +3781,13 @@
   }
 
   // main decommission
-  private synchronized void decommissionNodes(Set<String> hosts) 
+  synchronized void decommissionNodes(Set<String> hosts) 
   throws IOException {  
     LOG.info("Decommissioning " + hosts.size() + " nodes");
     // create a list of tracker hostnames
     synchronized (taskTrackers) {
       synchronized (trackerExpiryQueue) {
+        int trackersDecommissioned = 0;
         for (String host : hosts) {
           LOG.info("Decommissioning host " + host);
           Set<TaskTracker> trackers = hostnameToTaskTracker.remove(host);
@@ -3780,11 +3796,14 @@
               LOG.info("Decommission: Losing tracker " + tracker + 
                        " on host " + host);
               lostTaskTracker(tracker); // lose the tracker
-              updateTaskTrackerStatus(tracker.getStatus().getTrackerName(), null);
+              updateTaskTrackerStatus(
+                tracker.getStatus().getTrackerName(), null);
             }
+            trackersDecommissioned += trackers.size();
           }
           LOG.info("Host " + host + " is ready for decommissioning");
         }
+        getInstrumentation().setDecommissionedTrackers(trackersDecommissioned);
       }
     }
   }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTrackerInstrumentation.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTrackerInstrumentation.java?rev=828979&r1=828978&r2=828979&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTrackerInstrumentation.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTrackerInstrumentation.java Fri Oct 23 09:54:22 2009
@@ -84,4 +84,80 @@
 
   public void decBlackListedReduceSlots(int slots)
   { }
+
+  public void addReservedMapSlots(int slots)
+  { }
+
+  public void decReservedMapSlots(int slots)
+  { }
+
+  public void addReservedReduceSlots(int slots)
+  { }
+
+  public void decReservedReduceSlots(int slots)
+  { }
+
+  public void addOccupiedMapSlots(int slots)
+  { }
+
+  public void decOccupiedMapSlots(int slots)
+  { }
+
+  public void addOccupiedReduceSlots(int slots)
+  { }
+
+  public void decOccupiedReduceSlots(int slots)
+  { }
+
+  public void failedJob(JobConf conf, JobID id) 
+  { }
+
+  public void killedJob(JobConf conf, JobID id) 
+  { }
+
+  public void addPrepJob(JobConf conf, JobID id) 
+  { }
+  
+  public void decPrepJob(JobConf conf, JobID id) 
+  { }
+
+  public void addRunningJob(JobConf conf, JobID id) 
+  { }
+
+  public void decRunningJob(JobConf conf, JobID id) 
+  { }
+
+  public void addRunningMaps(JobID id, int task)
+  { }
+
+  public void decRunningMaps(JobID id, int task) 
+  { }
+
+  public void addRunningReduces(JobID id, int task)
+  { }
+
+  public void decRunningReduces(JobID id, int task)
+  { }
+
+  public void killedMap(TaskAttemptID taskAttemptID)
+  { }
+
+  public void killedReduce(TaskAttemptID taskAttemptID)
+  { }
+
+  public void addTrackers(int trackers)
+  { }
+
+  public void decTrackers(int trackers)
+  { }
+
+  public void addBlackListedTrackers(int trackers)
+  { }
+
+  public void decBlackListedTrackers(int trackers)
+  { }
+
+  public void setDecommissionedTrackers(int trackers)
+  { }  
+
 }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTrackerMetricsInst.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTrackerMetricsInst.java?rev=828979&r1=828978&r2=828979&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTrackerMetricsInst.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTrackerMetricsInst.java Fri Oct 23 09:54:22 2009
@@ -22,8 +22,6 @@
 import org.apache.hadoop.metrics.MetricsUtil;
 import org.apache.hadoop.metrics.Updater;
 import org.apache.hadoop.metrics.jvm.JvmMetrics;
-import org.apache.hadoop.metrics.util.MetricsRegistry;
-import org.apache.hadoop.metrics.util.MetricsTimeVaryingInt;
 
 class JobTrackerMetricsInst extends JobTrackerInstrumentation implements Updater {
   private final MetricsRecord metricsRecord;
@@ -45,6 +43,27 @@
   private int numBlackListedMapSlots = 0;
   private int numBlackListedReduceSlots = 0;
 
+  private int numReservedMapSlots = 0;
+  private int numReservedReduceSlots = 0;
+  private int numOccupiedMapSlots = 0;
+  private int numOccupiedReduceSlots = 0;
+  
+  private int numJobsFailed = 0;
+  private int numJobsKilled = 0;
+  
+  private int numJobsPreparing = 0;
+  private int numJobsRunning = 0;
+  
+  private int numRunningMaps = 0;
+  private int numRunningReduces = 0;
+  
+  private int numMapTasksKilled = 0;
+  private int numReduceTasksKilled = 0;
+
+  private int numTrackers = 0;
+  private int numTrackersBlackListed = 0;
+  private int numTrackersDecommissioned = 0;
+  
   public JobTrackerMetricsInst(JobTracker tracker, JobConf conf) {
     super(tracker, conf);
     String sessionId = conf.getSessionId();
@@ -78,6 +97,28 @@
       metricsRecord.incrMetric("jobs_completed", numJobsCompleted);
       metricsRecord.incrMetric("waiting_maps", numWaitingMaps);
       metricsRecord.incrMetric("waiting_reduces", numWaitingReduces);
+      
+      metricsRecord.incrMetric("reserved_map_slots", numReservedMapSlots);
+      metricsRecord.incrMetric("reserved_reduce_slots", numReservedReduceSlots);
+      metricsRecord.incrMetric("occupied_map_slots", numOccupiedMapSlots);
+      metricsRecord.incrMetric("occupied_reduce_slots", numOccupiedReduceSlots);
+      
+      metricsRecord.incrMetric("jobs_failed", numJobsFailed);
+      metricsRecord.incrMetric("jobs_killed", numJobsKilled);
+      
+      metricsRecord.incrMetric("jobs_preparing", numJobsPreparing);
+      metricsRecord.incrMetric("jobs_running", numJobsRunning);
+      
+      metricsRecord.incrMetric("running_maps", numRunningMaps);
+      metricsRecord.incrMetric("running_reduces", numRunningReduces);
+      
+      metricsRecord.incrMetric("maps_killed", numMapTasksKilled);
+      metricsRecord.incrMetric("reduces_killed", numReduceTasksKilled);
+
+      metricsRecord.incrMetric("trackers", numTrackers);
+      metricsRecord.incrMetric("trackers_blacklisted", numTrackersBlackListed);
+      metricsRecord.setMetric("trackers_decommissioned", 
+          numTrackersDecommissioned);
 
       numMapTasksLaunched = 0;
       numMapTasksCompleted = 0;
@@ -91,6 +132,26 @@
       numWaitingReduces = 0;
       numBlackListedMapSlots = 0;
       numBlackListedReduceSlots = 0;
+      
+      numReservedMapSlots = 0;
+      numReservedReduceSlots = 0;
+      numOccupiedMapSlots = 0;
+      numOccupiedReduceSlots = 0;
+      
+      numJobsFailed = 0;
+      numJobsKilled = 0;
+      
+      numJobsPreparing = 0;
+      numJobsRunning = 0;
+      
+      numRunningMaps = 0;
+      numRunningReduces = 0;
+      
+      numMapTasksKilled = 0;
+      numReduceTasksKilled = 0;
+
+      numTrackers = 0;
+      numTrackersBlackListed = 0;
     }
     metricsRecord.update();
 
@@ -166,12 +227,12 @@
   }
 
   @Override
-  public void setMapSlots(int slots) {
+  public synchronized void setMapSlots(int slots) {
     numMapSlots = slots;
   }
 
   @Override
-  public void setReduceSlots(int slots) {
+  public synchronized void setReduceSlots(int slots) {
     numReduceSlots = slots;
   }
 
@@ -194,4 +255,154 @@
   public synchronized void decBlackListedReduceSlots(int slots){
     numBlackListedReduceSlots -= slots;
   }
+
+  @Override
+  public synchronized void addReservedMapSlots(int slots)
+  { 
+    numReservedMapSlots += slots;
+  }
+
+  @Override
+  public synchronized void decReservedMapSlots(int slots)
+  {
+    numReservedMapSlots -= slots;
+  }
+
+  @Override
+  public synchronized void addReservedReduceSlots(int slots)
+  {
+    numReservedReduceSlots += slots;
+  }
+
+  @Override
+  public synchronized void decReservedReduceSlots(int slots)
+  {
+    numReservedReduceSlots -= slots;
+  }
+
+  @Override
+  public synchronized void addOccupiedMapSlots(int slots)
+  {
+    numOccupiedMapSlots += slots;
+  }
+
+  @Override
+  public synchronized void decOccupiedMapSlots(int slots)
+  {
+    numOccupiedMapSlots -= slots;
+  }
+
+  @Override
+  public synchronized void addOccupiedReduceSlots(int slots)
+  {
+    numOccupiedReduceSlots += slots;
+  }
+
+  @Override
+  public synchronized void decOccupiedReduceSlots(int slots)
+  {
+    numOccupiedReduceSlots -= slots;
+  }
+
+  @Override
+  public synchronized void failedJob(JobConf conf, JobID id) 
+  {
+    numJobsFailed++;
+  }
+
+  @Override
+  public synchronized void killedJob(JobConf conf, JobID id) 
+  {
+    numJobsKilled++;
+  }
+
+  @Override
+  public synchronized void addPrepJob(JobConf conf, JobID id) 
+  {
+    numJobsPreparing++;
+  }
+
+  @Override
+  public synchronized void decPrepJob(JobConf conf, JobID id) 
+  {
+    numJobsPreparing--;
+  }
+
+  @Override
+  public synchronized void addRunningJob(JobConf conf, JobID id) 
+  {
+    numJobsRunning++;
+  }
+
+  @Override
+  public synchronized void decRunningJob(JobConf conf, JobID id) 
+  {
+    numJobsRunning--;
+  }
+
+  @Override
+  public synchronized void addRunningMaps(JobID id, int task)
+  {
+    numRunningMaps += task;
+  }
+
+  @Override
+  public synchronized void decRunningMaps(JobID id, int task) 
+  {
+    numRunningMaps -= task;
+  }
+
+  @Override
+  public synchronized void addRunningReduces(JobID id, int task)
+  {
+    numRunningReduces += task;
+  }
+
+  @Override
+  public synchronized void decRunningReduces(JobID id, int task)
+  {
+    numRunningReduces -= task;
+  }
+
+  @Override
+  public synchronized void killedMap(TaskAttemptID taskAttemptID)
+  {
+    numMapTasksKilled++;
+  }
+
+  @Override
+  public synchronized void killedReduce(TaskAttemptID taskAttemptID)
+  {
+    numReduceTasksKilled++;
+  }
+
+  @Override
+  public synchronized void addTrackers(int trackers)
+  {
+    numTrackers += trackers;
+  }
+
+  @Override
+  public synchronized void decTrackers(int trackers)
+  {
+    numTrackers -= trackers;
+  }
+
+  @Override
+  public synchronized void addBlackListedTrackers(int trackers)
+  {
+    numTrackersBlackListed += trackers;
+  }
+
+  @Override
+  public synchronized void decBlackListedTrackers(int trackers)
+  {
+    numTrackersBlackListed -= trackers;
+  }
+
+  @Override
+  public synchronized void setDecommissionedTrackers(int trackers)
+  {
+    numTrackersDecommissioned = trackers;
+  }  
 }

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java?rev=828979&r1=828978&r2=828979&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java Fri Oct 23 09:54:22 2009
@@ -223,7 +223,7 @@
   }
   
   static short sendHeartBeat(JobTracker jt, TaskTrackerStatus status, 
-                                             boolean initialContact, 
+		  boolean initialContact, boolean acceptNewTasks,
                                              String tracker, short responseId) 
     throws IOException {
     if (status == null) {
@@ -231,13 +231,13 @@
           JobInProgress.convertTrackerNameToHostName(tracker));
 
     }
-      jt.heartbeat(status, false, initialContact, false, responseId);
+      jt.heartbeat(status, false, initialContact, acceptNewTasks, responseId);
       return ++responseId ;
   }
   
   static void establishFirstContact(JobTracker jt, String tracker) 
     throws IOException {
-    sendHeartBeat(jt, null, true, tracker, (short) 0);
+    sendHeartBeat(jt, null, true, false, tracker, (short) 0);
   }
 
   static class FakeTaskInProgress extends TaskInProgress {

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobKillAndFail.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobKillAndFail.java?rev=828979&r1=828978&r2=828979&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobKillAndFail.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobKillAndFail.java Fri Oct 23 09:54:22 2009
@@ -37,25 +37,93 @@
   public void testJobFailAndKill() throws IOException {
     MiniMRCluster mr = null;
     try {
-      mr = new MiniMRCluster(2, "file:///", 3);
+      JobConf jtConf = new JobConf();
+      jtConf.set("mapred.jobtracker.instrumentation", 
+          JTInstrumentation.class.getName());
+      mr = new MiniMRCluster(2, "file:///", 3, null, null, jtConf);
+      JTInstrumentation instr = (JTInstrumentation) 
+        mr.getJobTrackerRunner().getJobTracker().getInstrumentation();
 
       // run the TCs
       JobConf conf = mr.createJobConf();
-
+      
       Path inDir = new Path(TEST_ROOT_DIR + "/failkilljob/input");
       Path outDir = new Path(TEST_ROOT_DIR + "/failkilljob/output");
       RunningJob job = UtilsForTests.runJobFail(conf, inDir, outDir);
       // Checking that the Job got failed
       assertEquals(job.getJobState(), JobStatus.FAILED);
+      assertTrue(instr.verifyJob());
+      assertEquals(1, instr.failed);
+      instr.reset();
 
+      
       job = UtilsForTests.runJobKill(conf, inDir, outDir);
       // Checking that the Job got killed
       assertTrue(job.isComplete());
       assertEquals(job.getJobState(), JobStatus.KILLED);
+      assertTrue(instr.verifyJob());
+      assertEquals(1, instr.killed);
     } finally {
       if (mr != null) {
         mr.shutdown();
       }
     }
   }
+  
+  static class JTInstrumentation extends JobTrackerInstrumentation {
+    volatile int failed;
+    volatile int killed;
+    volatile int addPrep;
+    volatile int decPrep;
+    volatile int addRunning;
+    volatile int decRunning;
+
+    void reset() {
+      failed = 0;
+      killed = 0;
+      addPrep = 0;
+      decPrep = 0;
+      addRunning = 0;
+      decRunning = 0;
+    }
+
+    boolean verifyJob() {
+      return addPrep==1 && decPrep==1 && addRunning==1 && decRunning==1;
+    }
+
+    public JTInstrumentation(JobTracker jt, JobConf conf) {
+      super(jt, conf);
+    }
+
+    public synchronized void addPrepJob(JobConf conf, JobID id) 
+    {
+      addPrep++;
+    }
+    
+    public synchronized void decPrepJob(JobConf conf, JobID id) 
+    {
+      decPrep++;
+    }
+
+    public synchronized void addRunningJob(JobConf conf, JobID id) 
+    {
+      addRunning++;
+    }
+
+    public synchronized void decRunningJob(JobConf conf, JobID id) 
+    {
+      decRunning++;
+    }
+    
+    public synchronized void failedJob(JobConf conf, JobID id) 
+    {
+      failed++;
+    }
+
+    public synchronized void killedJob(JobConf conf, JobID id) 
+    {
+      killed++;
+    }
+  }
+  
 }

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java?rev=828979&r1=828978&r2=828979&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java Fri Oct 23 09:54:22 2009
@@ -46,8 +46,9 @@
     private FakeTaskTrackerManager taskTrackerManager;
     
     public FakeJobInProgress(JobConf jobConf,
-        FakeTaskTrackerManager taskTrackerManager) throws IOException {
-      super(new JobID("test", ++jobCounter), jobConf, null);
+        FakeTaskTrackerManager taskTrackerManager, JobTracker jt) 
+          throws IOException {
+      super(new JobID("test", ++jobCounter), jobConf, jt);
       this.taskTrackerManager = taskTrackerManager;
       this.startTime = System.currentTimeMillis();
       this.status = new JobStatus(getJobID(), 0f, 0f, JobStatus.PREP, 
@@ -281,7 +282,8 @@
                          int numJobs, int state)
     throws IOException {
     for (int i = 0; i < numJobs; i++) {
-      JobInProgress job = new FakeJobInProgress(jobConf, taskTrackerManager);
+      JobInProgress job = new FakeJobInProgress(jobConf, taskTrackerManager, 
+      UtilsForTests.getJobTracker());
       job.getStatus().setRunState(state);
       taskTrackerManager.submitJob(job);
     }

Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobTrackerInstrumentation.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobTrackerInstrumentation.java?rev=828979&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobTrackerInstrumentation.java (added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobTrackerInstrumentation.java Fri Oct 23 09:54:22 2009
@@ -0,0 +1,623 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import junit.extensions.TestSetup;
+import junit.framework.Test;
+import junit.framework.TestCase;
+import junit.framework.TestSuite;
+
+import org.apache.hadoop.mapred.TestTaskTrackerBlacklisting.FakeJobTracker;
+import org.apache.hadoop.mapred.UtilsForTests.FakeClock;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
+import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
+
+@SuppressWarnings("deprecation")
+public class TestJobTrackerInstrumentation extends TestCase {
+
+  static String trackers[] = new String[] { 
+    "tracker_tracker1:1000",
+    "tracker_tracker2:1000",
+    "tracker_tracker3:1000" };
+
+  static String hosts[] = new String[] { "tracker1", "tracker2", "tracker3" };
+  // heartbeat responseId. increment this after sending a heartbeat
+  private static short responseId = 1;
+
+  private static FakeJobTracker jobTracker;
+  private static FakeJobInProgress  fakeJob;
+
+  private static int mapSlotsPerTracker = 4;
+  private static int reduceSlotsPerTracker = 2;
+
+  private static int numMapSlotsToReserve = 2;
+  private static int numReduceSlotsToReserve = 2;
+
+  private static MyJobTrackerMetricsInst mi;
+  
+  
+
+  public static Test suite() {
+    TestSetup setup = 
+      new TestSetup(new TestSuite(TestJobTrackerInstrumentation.class)) {
+      protected void setUp() throws Exception {
+        JobConf conf = new JobConf();
+        conf.set(JTConfig.JT_IPC_ADDRESS, "localhost:0");
+        conf.set(JTConfig.JT_HTTP_ADDRESS, "0.0.0.0:0");
+        conf.setInt(JTConfig.JT_MAX_TRACKER_BLACKLISTS, 1);
+        conf.setClass(JTConfig.JT_TASK_SCHEDULER, 
+            FakeTaskScheduler.class, TaskScheduler.class);
+
+        conf.set(JTConfig.JT_INSTRUMENTATION, 
+            MyJobTrackerMetricsInst.class.getName());
+        jobTracker = new FakeJobTracker(conf, new FakeClock(), trackers);
+        mi = (MyJobTrackerMetricsInst) jobTracker.getInstrumentation();
+        for (String tracker : trackers) {
+          FakeObjectUtilities.establishFirstContact(jobTracker, tracker);
+        }
+        
+      }
+      protected void tearDown() throws Exception {
+      }
+    };
+    return setup;
+  }
+
+  private TaskTrackerStatus getTTStatus(String trackerName,
+      List<TaskStatus> taskStatuses) {
+    return new TaskTrackerStatus(trackerName, 
+        JobInProgress.convertTrackerNameToHostName(trackerName), 0,
+        taskStatuses, 0, mapSlotsPerTracker, reduceSlotsPerTracker);
+  }
+
+  public void testMetrics() throws Exception {
+    TaskAttemptID[] taskAttemptID = new TaskAttemptID[3];
+    
+    // create TaskTrackerStatus and send heartbeats
+    TaskTrackerStatus[] status = new TaskTrackerStatus[trackers.length];
+    status[0] = getTTStatus(trackers[0], new ArrayList<TaskStatus>());
+    status[1] = getTTStatus(trackers[1], new ArrayList<TaskStatus>());
+    status[2] = getTTStatus(trackers[2], new ArrayList<TaskStatus>());
+    for (int i = 0; i< trackers.length; i++) {
+      FakeObjectUtilities.sendHeartBeat(jobTracker, status[i], false,
+          false, trackers[i], responseId);
+    }
+    responseId++;
+
+    assertEquals("Mismatch in number of trackers",
+        trackers.length, mi.numTrackers);
+    
+    int numMaps = 2;
+    int numReds = 1;
+    JobConf conf = new JobConf();
+    conf.setSpeculativeExecution(false);
+    conf.setNumMapTasks(numMaps);
+    conf.setNumReduceTasks(numReds);
+    conf.setMaxTaskFailuresPerTracker(1);
+    conf.setBoolean(JobContext.SETUP_CLEANUP_NEEDED, false);
+
+    FakeJobInProgress job = new FakeJobInProgress(conf, jobTracker);
+    assertTrue(mi.numJobsPreparing == 1);
+
+    job.setClusterSize(trackers.length);
+    job.initTasks();
+    jobTracker.addJob(job.getJobID(), job);
+
+    taskAttemptID[0] = job.findMapTask(trackers[0]);
+    taskAttemptID[1] = job.findMapTask(trackers[1]);
+    taskAttemptID[2] = job.findReduceTask(trackers[2]);
+    
+    assertTrue("Mismatch in num  running maps",
+        mi.numRunningMaps == numMaps);
+    assertTrue("Mismatch in num running reduces",
+        mi.numRunningReduces == numReds);
+
+    job.finishTask(taskAttemptID[0]);
+    job.finishTask(taskAttemptID[1]);
+    job.finishTask(taskAttemptID[2]);
+    jobTracker.finalizeJob(job);
+ 
+    assertTrue("Mismatch in map tasks launched",
+        mi.numMapTasksLaunched == numMaps);
+    assertTrue("Mismatch in map tasks completed",
+        mi.numMapTasksCompleted == numMaps);
+    assertTrue("Mismatch in map tasks failed",
+        mi.numMapTasksFailed == 0);
+
+    assertTrue("Mismatch in reduce tasks launched",
+        mi.numReduceTasksLaunched == numReds);
+    assertTrue("Mismatch in reduce tasks completed",
+        mi.numReduceTasksCompleted == numReds);
+    assertTrue("Mismatch in reduce tasks failed",
+        mi.numReduceTasksFailed == 0);
+
+    assertTrue("Mismatch in num Jobs submitted",
+        mi.numJobsSubmitted == 1);
+    
+    assertTrue("Mismatch in num map slots",
+        mi.numMapSlots == (mapSlotsPerTracker * trackers.length));
+    assertTrue("Mismatch in num reduce slots",
+        mi.numReduceSlots == (reduceSlotsPerTracker * trackers.length));
+
+  }
+  
+  public void testBlackListing() throws IOException {
+    int numMaps, numReds;
+    JobConf conf = new JobConf();
+    conf.setSpeculativeExecution(false);
+    conf.setMaxTaskFailuresPerTracker(1);
+    conf.setBoolean(JobContext.SETUP_CLEANUP_NEEDED, false);
+    TaskAttemptID[] taskAttemptID = new TaskAttemptID[3];
+
+    numMaps = 1;
+    numReds = 1;
+    conf.setNumMapTasks(numMaps);
+    conf.setNumReduceTasks(numReds);
+    conf.setBoolean(JobContext.SETUP_CLEANUP_NEEDED, false);
+
+    FakeJobInProgress job1 = new FakeJobInProgress(conf, jobTracker);
+    job1.setClusterSize(trackers.length);
+    job1.initTasks();
+    jobTracker.addJob(job1.getJobID(), job1);
+    taskAttemptID[0] = job1.findMapTask(trackers[0]);
+    job1.failTask(taskAttemptID[0]);
+    taskAttemptID[1] = job1.findMapTask(trackers[1]);
+    job1.finishTask(taskAttemptID[1]);
+    taskAttemptID[2] = job1.findReduceTask(trackers[0]);
+    job1.failTask(taskAttemptID[2]);
+    taskAttemptID[2] = job1.findReduceTask(trackers[2]);
+    job1.finishTask(taskAttemptID[2]);
+    jobTracker.finalizeJob(job1);
+
+    assertEquals("Mismatch in number of blacklisted trackers",
+        mi.numTrackersBlackListed, 1);
+
+    assertEquals("Mismatch in blacklisted map slots", 
+        mi.numBlackListedMapSlots, 
+        (mapSlotsPerTracker * mi.numTrackersBlackListed));
+
+    assertEquals("Mismatch in blacklisted reduce slots", 
+        mi.numBlackListedReduceSlots, 
+        (reduceSlotsPerTracker * mi.numTrackersBlackListed));
+  }
+
+  public void testOccupiedSlotCounts() throws Exception {
+
+    TaskTrackerStatus[] status = new TaskTrackerStatus[trackers.length];
+
+    List<TaskStatus> list = new ArrayList<TaskStatus>();
+
+    // create a map task status, which uses 2 slots. 
+    int mapSlotsPerTask = 2;
+    TaskStatus ts = TaskStatus.createTaskStatus(true, 
+      new TaskAttemptID("jt", 1, TaskType.MAP, 0, 0), 0.0f, mapSlotsPerTask,
+      TaskStatus.State.RUNNING, "", "", trackers[0], 
+      TaskStatus.Phase.MAP, null);
+    list.add(ts);
+    int mapSlotsPerTask1 = 1;
+    ts = TaskStatus.createTaskStatus(true, 
+        new TaskAttemptID("jt", 1, TaskType.MAP, 0, 0), 0.0f, mapSlotsPerTask1,
+        TaskStatus.State.RUNNING, "", "", trackers[0], 
+        TaskStatus.Phase.MAP, null);
+      list.add(ts);
+    
+    // create a reduce task status, which uses 3 slot.
+    int reduceSlotsPerTask = 3;
+    ts = TaskStatus.createTaskStatus(false, 
+      new TaskAttemptID("jt", 1, TaskType.REDUCE, 0, 0), 0.0f,
+      reduceSlotsPerTask,
+      TaskStatus.State.RUNNING, "", "", trackers[0], 
+      TaskStatus.Phase.REDUCE, null);
+    list.add(ts);
+    int reduceSlotsPerTask1 = 1;
+    ts = TaskStatus.createTaskStatus(false, 
+        new TaskAttemptID("jt", 1, TaskType.REDUCE, 0, 0), 0.0f,
+        reduceSlotsPerTask1,
+        TaskStatus.State.RUNNING, "", "", trackers[0], 
+        TaskStatus.Phase.REDUCE, null);
+    list.add(ts);
+    
+    // create TaskTrackerStatus and send heartbeats
+    status = new TaskTrackerStatus[trackers.length];
+    status[0] = getTTStatus(trackers[0], list);
+    status[1] = getTTStatus(trackers[1], new ArrayList<TaskStatus>());
+    status[2] = getTTStatus(trackers[2], new ArrayList<TaskStatus>());
+    for (int i = 0; i< trackers.length; i++) {
+      FakeObjectUtilities.sendHeartBeat(jobTracker, status[i], false,
+        false, trackers[i], responseId);
+    }
+    responseId++;
+
+    assertEquals("Mismatch in map slots occupied",
+      mapSlotsPerTask+mapSlotsPerTask1, mi.numOccupiedMapSlots);
+    assertEquals("Mismatch in reduce slots occupied",
+      reduceSlotsPerTask+reduceSlotsPerTask1, mi.numOccupiedReduceSlots);
+    
+    //now send heartbeat with no running tasks
+    status = new TaskTrackerStatus[1];
+    status[0] = getTTStatus(trackers[0], new ArrayList<TaskStatus>());
+    FakeObjectUtilities.sendHeartBeat(jobTracker, status[0], false,
+        false, trackers[0], responseId);
+    
+    assertEquals("Mismatch in map slots occupied",
+      0, mi.numOccupiedMapSlots);
+    assertEquals("Mismatch in reduce slots occupied",
+      0, mi.numOccupiedReduceSlots);
+  }
+
+  public void testReservedSlots() throws IOException {
+      JobConf conf = new JobConf();
+      conf.setNumMapTasks(1);
+      conf.setNumReduceTasks(1);
+      conf.setSpeculativeExecution(false);
+      
+      //Set task tracker objects for reservation.
+      TaskTracker tt2 = jobTracker.getTaskTracker(trackers[1]);
+      TaskTrackerStatus status2 = new TaskTrackerStatus(
+          trackers[1],JobInProgress.convertTrackerNameToHostName(
+              trackers[1]),0,new ArrayList<TaskStatus>(), 0, 2, 2);
+      tt2.setStatus(status2);
+      
+      fakeJob = new FakeJobInProgress(conf, jobTracker);
+      fakeJob.setClusterSize(3);
+      fakeJob.initTasks();
+      
+      FakeObjectUtilities.sendHeartBeat(jobTracker, status2, false,
+        true, trackers[1], responseId);
+      responseId++; 
+      
+      assertEquals("Mismtach in reserved map slots", 
+        numMapSlotsToReserve, mi.numReservedMapSlots); 
+      assertEquals("Mismtach in reserved red slots", 
+        numReduceSlotsToReserve, mi.numReservedReduceSlots); 
+  }
+  
+  public void testDecomissionedTrackers() throws IOException {
+    Set<String> dHosts = new HashSet<String>();
+    dHosts.add(hosts[1]);
+    assertEquals("Mismatch in number of decommissioned trackers",
+        0, mi.numTrackersDecommissioned);
+    jobTracker.decommissionNodes(dHosts);
+    assertEquals("Mismatch in number of decommissioned trackers",
+        1, mi.numTrackersDecommissioned);
+  }
+  
+  static class FakeTaskScheduler extends JobQueueTaskScheduler {
+    public FakeTaskScheduler() {
+      super();
+    }
+    public List<Task> assignTasks(TaskTracker tt) {
+      tt.reserveSlots(TaskType.MAP, fakeJob, numMapSlotsToReserve);
+      tt.reserveSlots(TaskType.REDUCE, fakeJob, numReduceSlotsToReserve);
+      return new ArrayList<Task>();  
+    }
+  }
+  
+  static class FakeJobInProgress extends
+  org.apache.hadoop.mapred.TestTaskTrackerBlacklisting.FakeJobInProgress {
+
+    FakeJobInProgress(JobConf jobConf, JobTracker tracker) throws IOException {
+      super(jobConf, tracker);
+    }
+
+    @Override 
+    public synchronized void initTasks() throws IOException {
+      super.initTasks();
+      jobtracker.getInstrumentation().addWaitingMaps(getJobID(),
+          numMapTasks);
+      jobtracker.getInstrumentation().addWaitingReduces(getJobID(),
+          numReduceTasks);
+    }
+  }
+
+  static class MyJobTrackerMetricsInst extends JobTrackerInstrumentation  {
+    public MyJobTrackerMetricsInst(JobTracker tracker, JobConf conf) {
+      super(tracker, conf);
+    }
+
+    private int numMapTasksLaunched = 0;
+    private int numMapTasksCompleted = 0;
+    private int numMapTasksFailed = 0;
+    private int numReduceTasksLaunched = 0;
+    private int numReduceTasksCompleted = 0;
+    private int numReduceTasksFailed = 0;
+    private int numJobsSubmitted = 0;
+    private int numJobsCompleted = 0;
+    private int numWaitingMaps = 0;
+    private int numWaitingReduces = 0;
+
+    //Cluster status fields.
+    private volatile int numMapSlots = 0;
+    private volatile int numReduceSlots = 0;
+    private int numBlackListedMapSlots = 0;
+    private int numBlackListedReduceSlots = 0;
+
+    private int numReservedMapSlots = 0;
+    private int numReservedReduceSlots = 0;
+    private int numOccupiedMapSlots = 0;
+    private int numOccupiedReduceSlots = 0;
+    
+    private int numJobsFailed = 0;
+    private int numJobsKilled = 0;
+    
+    private int numJobsPreparing = 0;
+    private int numJobsRunning = 0;
+    
+    private int numRunningMaps = 0;
+    private int numRunningReduces = 0;
+    
+    private int numMapTasksKilled = 0;
+    private int numReduceTasksKilled = 0;
+
+    private int numTrackers = 0;
+    private int numTrackersBlackListed = 0;
+
+    private int numTrackersDecommissioned = 0;
+
+    @Override
+    public synchronized void launchMap(TaskAttemptID taskAttemptID) {
+      ++numMapTasksLaunched;
+      decWaitingMaps(taskAttemptID.getJobID(), 1);
+    }
+
+    @Override
+    public synchronized void completeMap(TaskAttemptID taskAttemptID) {
+      ++numMapTasksCompleted;
+    }
+
+    @Override
+    public synchronized void failedMap(TaskAttemptID taskAttemptID) {
+      ++numMapTasksFailed;
+      addWaitingMaps(taskAttemptID.getJobID(), 1);
+    }
+
+    @Override
+    public synchronized void launchReduce(TaskAttemptID taskAttemptID) {
+      ++numReduceTasksLaunched;
+      decWaitingReduces(taskAttemptID.getJobID(), 1);
+    }
+
+    @Override
+    public synchronized void completeReduce(TaskAttemptID taskAttemptID) {
+      ++numReduceTasksCompleted;
+    }
+
+    @Override
+    public synchronized void failedReduce(TaskAttemptID taskAttemptID) {
+      ++numReduceTasksFailed;
+      addWaitingReduces(taskAttemptID.getJobID(), 1);
+    }
+
+    @Override
+    public synchronized void submitJob(JobConf conf, JobID id) {
+      ++numJobsSubmitted;
+    }
+
+    @Override
+    public synchronized void completeJob(JobConf conf, JobID id) {
+      ++numJobsCompleted;
+    }
+
+    @Override
+    public synchronized void addWaitingMaps(JobID id, int task) {
+      numWaitingMaps  += task;
+    }
+
+    @Override
+    public synchronized void decWaitingMaps(JobID id, int task) {
+      numWaitingMaps -= task;
+    }
+
+    @Override
+    public synchronized void addWaitingReduces(JobID id, int task) {
+      numWaitingReduces += task;
+    }
+
+    @Override
+    public synchronized void decWaitingReduces(JobID id, int task){
+      numWaitingReduces -= task;
+    }
+
+    @Override
+    public void setMapSlots(int slots) {
+      numMapSlots = slots;
+    }
+
+    @Override
+    public void setReduceSlots(int slots) {
+      numReduceSlots = slots;
+    }
+
+    @Override
+    public synchronized void addBlackListedMapSlots(int slots){
+      numBlackListedMapSlots += slots;
+    }
+
+    @Override
+    public synchronized void decBlackListedMapSlots(int slots){
+      numBlackListedMapSlots -= slots;
+    }
+
+    @Override
+    public synchronized void addBlackListedReduceSlots(int slots){
+      numBlackListedReduceSlots += slots;
+    }
+
+    @Override
+    public synchronized void decBlackListedReduceSlots(int slots){
+      numBlackListedReduceSlots -= slots;
+    }
+    
+    @Override
+    public synchronized void addReservedMapSlots(int slots)
+    { 
+      numReservedMapSlots += slots;
+    }
+
+    @Override
+    public synchronized void decReservedMapSlots(int slots)
+    {
+      numReservedMapSlots -= slots;
+    }
+
+    @Override
+    public synchronized void addReservedReduceSlots(int slots)
+    {
+      numReservedReduceSlots += slots;
+    }
+
+    @Override
+    public synchronized void decReservedReduceSlots(int slots)
+    {
+      numReservedReduceSlots -= slots;
+    }
+
+    @Override
+    public synchronized void addOccupiedMapSlots(int slots)
+    {
+      numOccupiedMapSlots += slots;
+    }
+
+    @Override
+    public synchronized void decOccupiedMapSlots(int slots)
+    {
+      numOccupiedMapSlots -= slots;
+    }
+
+    @Override
+    public synchronized void addOccupiedReduceSlots(int slots)
+    {
+      numOccupiedReduceSlots += slots;
+    }
+
+    @Override
+    public synchronized void decOccupiedReduceSlots(int slots)
+    {
+      numOccupiedReduceSlots -= slots;
+    }
+
+    @Override
+    public synchronized void failedJob(JobConf conf, JobID id) 
+    {
+      numJobsFailed++;
+    }
+
+    @Override
+    public synchronized void killedJob(JobConf conf, JobID id) 
+    {
+      numJobsKilled++;
+    }
+
+    @Override
+    public synchronized void addPrepJob(JobConf conf, JobID id) 
+    {
+      numJobsPreparing++;
+    }
+
+    @Override
+    public synchronized void decPrepJob(JobConf conf, JobID id) 
+    {
+      numJobsPreparing--;
+    }
+
+    @Override
+    public synchronized void addRunningJob(JobConf conf, JobID id) 
+    {
+      numJobsRunning++;
+    }
+
+    @Override
+    public synchronized void decRunningJob(JobConf conf, JobID id) 
+    {
+      numJobsRunning--;
+    }
+
+    @Override
+    public synchronized void addRunningMaps(JobID id, int task)
+    {
+      numRunningMaps += task;
+    }
+
+    @Override
+    public synchronized void decRunningMaps(JobID id, int task) 
+    {
+      numRunningMaps -= task;
+    }
+
+    @Override
+    public synchronized void addRunningReduces(JobID id, int task)
+    {
+      numRunningReduces += task;
+    }
+
+    @Override
+    public synchronized void decRunningReduces(JobID id, int task)
+    {
+      numRunningReduces -= task;
+    }
+
+    @Override
+    public synchronized void killedMap(TaskAttemptID taskAttemptID)
+    {
+      numMapTasksKilled++;
+    }
+
+    @Override
+    public synchronized void killedReduce(TaskAttemptID taskAttemptID)
+    {
+      numReduceTasksKilled++;
+    }
+
+    @Override
+    public synchronized void addTrackers(int trackers)
+    {
+      numTrackers += trackers;
+    }
+
+    @Override
+    public synchronized void decTrackers(int trackers)
+    {
+      numTrackers -= trackers;
+    }
+
+    @Override
+    public synchronized void addBlackListedTrackers(int trackers)
+    {
+      numTrackersBlackListed += trackers;
+    }
+
+    @Override
+    public synchronized void decBlackListedTrackers(int trackers)
+    {
+      numTrackersBlackListed -= trackers;
+    }
+
+    @Override
+    public synchronized void setDecommissionedTrackers(int trackers)
+    {
+      numTrackersDecommissioned = trackers;
+    }
+  }
+}

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestParallelInitialization.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestParallelInitialization.java?rev=828979&r1=828978&r2=828979&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestParallelInitialization.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestParallelInitialization.java Fri Oct 23 09:54:22 2009
@@ -44,8 +44,9 @@
   class FakeJobInProgress extends JobInProgress {
    
     public FakeJobInProgress(JobConf jobConf,
-        FakeTaskTrackerManager taskTrackerManager) throws IOException {
-      super(new JobID("test", ++jobCounter), jobConf, null);
+        FakeTaskTrackerManager taskTrackerManager, 
+        JobTracker jt) throws IOException {
+      super(new JobID("test", ++jobCounter), jobConf, jt);
       this.startTime = System.currentTimeMillis();
       this.status = new JobStatus(getJobID(), 0f, 0f, JobStatus.PREP, 
           jobConf.getUser(), 
@@ -233,7 +234,8 @@
     // will be inited first and that will hang
     
     for (int i = 0; i < NUM_JOBS; i++) {
-      jobs[i] = new FakeJobInProgress(jobConf, taskTrackerManager);
+      jobs[i] = new FakeJobInProgress(jobConf, taskTrackerManager, 
+      UtilsForTests.getJobTracker());
       jobs[i].getStatus().setRunState(JobStatus.PREP);
       taskTrackerManager.submitJob(jobs[i]);
     }

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestResourceEstimation.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestResourceEstimation.java?rev=828979&r1=828978&r2=828979&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestResourceEstimation.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestResourceEstimation.java Fri Oct 23 09:54:22 2009
@@ -34,7 +34,8 @@
     jc.setNumMapTasks(maps);
     jc.setNumReduceTasks(reduces);
     
-    JobInProgress jip = new JobInProgress(jid, jc, null);
+    JobInProgress jip = new JobInProgress(jid, jc, 
+      UtilsForTests.getJobTracker());
     //unfortunately, we can't set job input size from here.
     ResourceEstimator re = new ResourceEstimator(jip);
     
@@ -66,7 +67,8 @@
     jc.setNumMapTasks(maps);
     jc.setNumReduceTasks(reduces);
     
-    JobInProgress jip = new JobInProgress(jid, jc, null) {
+    JobInProgress jip = new JobInProgress(jid, jc, 
+      UtilsForTests.getJobTracker()) {
       long getInputLength() {
         return singleMapInputSize*desiredMaps();
       }

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/UtilsForTests.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/UtilsForTests.java?rev=828979&r1=828978&r2=828979&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/UtilsForTests.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/UtilsForTests.java Fri Oct 23 09:54:22 2009
@@ -47,6 +47,7 @@
 import org.apache.hadoop.mapred.SortValidator.RecordStatsChecker.NonSplitableSequenceFileInputFormat;
 import org.apache.hadoop.mapred.lib.IdentityMapper;
 import org.apache.hadoop.mapred.lib.IdentityReducer;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
 
 import org.apache.commons.logging.Log;
 
@@ -701,4 +702,16 @@
     fos.close();
   }
 
+  static JobTracker getJobTracker() {
+    JobConf conf = new JobConf();
+    conf.set(JTConfig.JT_IPC_ADDRESS, "localhost:0");
+    conf.set(JTConfig.JT_HTTP_ADDRESS, "0.0.0.0:0");
+    JobTracker jt;
+    try {
+      jt = new JobTracker(conf);
+      return jt;
+    } catch (Exception e) {
+      throw new RuntimeException("Could not start jt", e);
+    }
+  }
 }



Mime
View raw message