hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cdoug...@apache.org
Subject svn commit: r1131280 - in /hadoop/common/branches/branch-0.20-security: ./ src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/ src/mapred/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/
Date Fri, 03 Jun 2011 22:51:26 GMT
Author: cdouglas
Date: Fri Jun  3 22:51:26 2011
New Revision: 1131280

URL: http://svn.apache.org/viewvc?rev=1131280&view=rev
Log:
MAPREDUCE-2558. Add JobTracker metrics for scheduling queues.
Contributed by Jeffrey Naisbitt

Added:
    hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/QueueMetrics.java
    hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestQueueMetrics.java
Modified:
    hadoop/common/branches/branch-0.20-security/CHANGES.txt
    hadoop/common/branches/branch-0.20-security/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
    hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/JobTracker.java
    hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
    hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/Queue.java
    hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/QueueManager.java

Modified: hadoop/common/branches/branch-0.20-security/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/CHANGES.txt?rev=1131280&r1=1131279&r2=1131280&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.20-security/CHANGES.txt Fri Jun  3 22:51:26 2011
@@ -44,6 +44,9 @@ Release 0.20.205.0 - unreleased
     MAPREDUCE-2555. Avoid sprious logging from completedtasks. (Thomas Graves
     via cdouglas)
 
+    MAPREDUCE-2558. Add JobTracker metrics for scheduling queues. (Jeffrey
+    Naisbitt via cdouglas)
+
 Release 0.20.204.0 - unreleased
 
   NEW FEATURES

Modified: hadoop/common/branches/branch-0.20-security/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java?rev=1131280&r1=1131279&r2=1131280&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
(original)
+++ hadoop/common/branches/branch-0.20-security/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
Fri Jun  3 22:51:26 2011
@@ -432,7 +432,8 @@ public class TestCapacityScheduler exten
       queues.clear();
       for (String qName : newQueues) {
         try {
-          queues.put(qName, new Queue(qName, acls, Queue.QueueState.RUNNING));
+          queues.put(qName, new Queue(qName, acls, Queue.QueueState.RUNNING,
+                     QueueMetrics.create(qName, new Configuration())));
         } catch (Throwable t) {
           throw new RuntimeException("Unable to initialize queue " + qName, t);
         }

Modified: hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=1131280&r1=1131279&r2=1131280&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
(original)
+++ hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
Fri Jun  3 22:51:26 2011
@@ -91,6 +91,7 @@ public class JobInProgress {
   JobStatus status;
   String jobFile = null;
   Path localJobFile = null;
+  final QueueMetrics queueMetrics;
 
   TaskInProgress maps[] = new TaskInProgress[0];
   TaskInProgress reduces[] = new TaskInProgress[0];
@@ -339,14 +340,17 @@ public class JobInProgress {
     this.resourceEstimator = new ResourceEstimator(this);
     this.status = new JobStatus(jobid, 0.0f, 0.0f, JobStatus.PREP);
     this.status.setUsername(conf.getUser());
+    String queueName = conf.getQueueName();
     this.profile = new JobProfile(conf.getUser(), jobid, "", "",
-                                  conf.getJobName(), conf.getQueueName());
+                                  conf.getJobName(), queueName);
     this.memoryPerMap = conf.getMemoryForMapTask();
     this.memoryPerReduce = conf.getMemoryForReduceTask();
     this.maxTaskFailuresPerTracker = conf.getMaxTaskFailuresPerTracker();
     this.mapFailuresPercent = conf.getMaxMapTaskFailuresPercent();
     this.reduceFailuresPercent = conf.getMaxReduceTaskFailuresPercent();
 
+    this.queueMetrics = this.jobtracker.getQueueManager().getQueue(queueName).getMetrics();
+
     // Check task limits
     checkTaskLimits();
 
@@ -371,6 +375,7 @@ public class JobInProgress {
       this.status = new JobStatus(jobId, 0.0f, 0.0f, JobStatus.PREP);
       this.status.setUsername(jobInfo.getUser().toString());
       this.jobtracker.getInstrumentation().addPrepJob(conf, jobId);
+      // Add the queue-level metric below (after the profile has been initialized)
       this.startTime = jobtracker.getClock().getTime();
       status.setStartTime(startTime);
       this.localFs = jobtracker.getLocalFileSystem();
@@ -418,9 +423,12 @@ public class JobInProgress {
       
       this.priority = conf.getJobPriority();
       this.status.setJobPriority(this.priority);
+      String queueName = conf.getQueueName();
       this.profile = new JobProfile(user, jobId, 
-          jobFile, url, conf.getJobName(),
-          conf.getQueueName());
+          jobFile, url, conf.getJobName(), queueName);
+
+      this.queueMetrics = this.jobtracker.getQueueManager().getQueue(queueName).getMetrics();
+      this.queueMetrics.addPrepJob(conf, jobId);
 
       this.submitHostName = conf.getJobSubmitHostName();
       this.submitHostAddress = conf.getJobSubmitHostAddress();
@@ -474,7 +482,15 @@ public class JobInProgress {
       FileSystem.closeAllForUGI(UserGroupInformation.getCurrentUser());
     }
   }
-    
+
+  /**
+   * Get the QueueMetrics object associated with this job
+   * @return QueueMetrics
+   */
+  public QueueMetrics getQueueMetrics() {
+    return this.queueMetrics;
+  }
+
   private void checkTaskLimits() throws IOException {
     // if the number of tasks is larger than a configured value
     // then fail the job.
@@ -682,6 +698,8 @@ public class JobInProgress {
 
     jobtracker.getInstrumentation().addWaitingMaps(getJobID(), numMapTasks);
     jobtracker.getInstrumentation().addWaitingReduces(getJobID(), numReduceTasks);
+    this.queueMetrics.addWaitingMaps(getJobID(), numMapTasks);
+    this.queueMetrics.addWaitingReduces(getJobID(), numReduceTasks);
 
     maps = new TaskInProgress[numMapTasks];
     for(int i=0; i < numMapTasks; ++i) {
@@ -1682,6 +1700,7 @@ public class JobInProgress {
       if (tip.getActiveTasks().size() > 1)
         speculativeMapTasks++;
       metrics.launchMap(id);
+      this.queueMetrics.launchMap(id);
     } else {
       ++runningReduceTasks;
       name = Values.REDUCE.name();
@@ -1689,6 +1708,7 @@ public class JobInProgress {
       if (tip.getActiveTasks().size() > 1)
         speculativeReduceTasks++;
       metrics.launchReduce(id);
+      this.queueMetrics.launchReduce(id);
     }
     // Note that the logs are for the scheduled tasks only. Tasks that join on 
     // restart has already their logs in place.
@@ -1839,9 +1859,11 @@ public class JobInProgress {
     map.put(taskTracker, info);
     if (type == TaskType.MAP) {
       jobtracker.getInstrumentation().addReservedMapSlots(reservedSlots);
+      this.queueMetrics.addReservedMapSlots(reservedSlots);
     }
     else {
       jobtracker.getInstrumentation().addReservedReduceSlots(reservedSlots);
+      this.queueMetrics.addReservedReduceSlots(reservedSlots);
     }
     jobtracker.incrementReservations(type, reservedSlots);
   }
@@ -1871,10 +1893,12 @@ public class JobInProgress {
     map.remove(taskTracker);
     if (type == TaskType.MAP) {
       jobtracker.getInstrumentation().decReservedMapSlots(info.getNumSlots());
+      this.queueMetrics.decReservedMapSlots(info.getNumSlots());
     }
     else {
       jobtracker.getInstrumentation().decReservedReduceSlots(
         info.getNumSlots());
+      this.queueMetrics.decReservedReduceSlots(info.getNumSlots());
     }
     jobtracker.decrementReservations(type, info.getNumSlots());
   }
@@ -2583,6 +2607,7 @@ public class JobInProgress {
       }
       finishedMapTasks += 1;
       metrics.completeMap(taskid);
+      this.queueMetrics.completeMap(taskid);
       // remove the completed map from the resp running caches
       retireMap(tip);
       if ((finishedMapTasks + failedMapTIPs) == (numMapTasks)) {
@@ -2598,6 +2623,7 @@ public class JobInProgress {
       }
       finishedReduceTasks += 1;
       metrics.completeReduce(taskid);
+      this.queueMetrics.completeReduce(taskid);
       // remove the completed reduces from the running reducers set
       retireReduce(tip);
       if ((finishedReduceTasks + failedReduceTIPs) == (numReduceTasks)) {
@@ -2642,14 +2668,18 @@ public class JobInProgress {
     //update the metrics
     if (oldState == JobStatus.PREP) {
       this.jobtracker.getInstrumentation().decPrepJob(conf, jobId);
+      this.queueMetrics.decPrepJob(conf, jobId);
     } else if (oldState == JobStatus.RUNNING) {
       this.jobtracker.getInstrumentation().decRunningJob(conf, jobId);
+      this.queueMetrics.decRunningJob(conf, jobId);
     }
     
     if (newState == JobStatus.PREP) {
       this.jobtracker.getInstrumentation().addPrepJob(conf, jobId);
+      this.queueMetrics.addPrepJob(conf, jobId);
     } else if (newState == JobStatus.RUNNING) {
       this.jobtracker.getInstrumentation().addRunningJob(conf, jobId);
+      this.queueMetrics.addRunningJob(conf, jobId);
     }
     
   }
@@ -2704,6 +2734,7 @@ public class JobInProgress {
       garbageCollect();
       
       metrics.completeJob(this.conf, this.status.getJobID());
+      this.queueMetrics.completeJob(this.conf, this.status.getJobID());
     }
   }
   
@@ -2744,9 +2775,11 @@ public class JobInProgress {
       if (jobTerminationState == JobStatus.FAILED) {
         jobtracker.getInstrumentation().failedJob(
             this.conf, this.status.getJobID());
+        this.queueMetrics.failedJob(this.conf, this.status.getJobID());
       } else {
         jobtracker.getInstrumentation().killedJob(
             this.conf, this.status.getJobID());
+        this.queueMetrics.killedJob(this.conf, this.status.getJobID());
       }
     }
   }
@@ -2897,9 +2930,11 @@ public class JobInProgress {
         if (tip.isMapTask() && !metricsDone) {
           runningMapTasks -= 1;
           metrics.failedMap(taskid);
+          this.queueMetrics.failedMap(taskid);
         } else if (!metricsDone) {
           runningReduceTasks -= 1;
           metrics.failedReduce(taskid);
+          this.queueMetrics.failedReduce(taskid);
         }
       }
       
@@ -3142,6 +3177,8 @@ public class JobInProgress {
       // Let the JobTracker know that a job is complete
       jobtracker.getInstrumentation().decWaitingMaps(getJobID(), pendingMaps());
       jobtracker.getInstrumentation().decWaitingReduces(getJobID(), pendingReduces());
+      this.queueMetrics.decWaitingMaps(getJobID(), pendingMaps());
+      this.queueMetrics.decWaitingReduces(getJobID(), pendingReduces());
       jobtracker.storeCompletedJob(this);
       jobtracker.finalizeJob(this);
 

Modified: hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=1131280&r1=1131279&r2=1131280&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/JobTracker.java
(original)
+++ hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/JobTracker.java
Fri Jun  3 22:51:26 2011
@@ -4033,6 +4033,8 @@ public class JobTracker implements MRCon
       }
     }
     myInstrumentation.submitJob(job.getJobConf(), jobId);
+    job.getQueueMetrics().submitJob(job.getJobConf(), jobId);
+
     LOG.info("Job " + jobId + " added successfully for user '" 
              + job.getJobConf().getUser() + "' to queue '" 
              + job.getJobConf().getQueueName() + "'");

Modified: hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java?rev=1131280&r1=1131279&r2=1131280&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
(original)
+++ hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
Fri Jun  3 22:51:26 2011
@@ -61,6 +61,7 @@ class LocalJobRunner implements JobSubmi
   private final TaskController taskController = new DefaultTaskController();
 
   private JobTrackerInstrumentation myMetrics = null;
+  private QueueMetrics queueMetrics = null;
 
   private static final String jobDir =  "localRunner/";
   
@@ -207,8 +208,10 @@ class LocalJobRunner implements JobSubmi
             map.setConf(localConf);
             map_tasks += 1;
             myMetrics.launchMap(mapId);
+            queueMetrics.launchMap(mapId);
             map.run(localConf, this);
             myMetrics.completeMap(mapId);
+            queueMetrics.completeMap(mapId);
             map_tasks -= 1;
             updateCounters(map);
           } else {
@@ -253,8 +256,10 @@ class LocalJobRunner implements JobSubmi
               reduce.setConf(localConf);
               reduce_tasks += 1;
               myMetrics.launchReduce(reduce.getTaskID());
+              queueMetrics.launchReduce(reduce.getTaskID());
               reduce.run(localConf, this);
               myMetrics.completeReduce(reduce.getTaskID());
+              queueMetrics.completeReduce(reduce.getTaskID());
               reduce_tasks -= 1;
               updateCounters(reduce);
             } else {
@@ -413,6 +418,7 @@ class LocalJobRunner implements JobSubmi
     this.fs = FileSystem.getLocal(conf);
     this.conf = conf;
     myMetrics = JobTrackerInstrumentation.create(null, new JobConf(conf));
+    queueMetrics = QueueMetrics.create(conf.getQueueName(), new JobConf(conf));
     taskController.setConf(conf);
   }
 

Modified: hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/Queue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/Queue.java?rev=1131280&r1=1131279&r2=1131280&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/Queue.java
(original)
+++ hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/Queue.java
Fri Jun  3 22:51:26 2011
@@ -37,6 +37,7 @@ class Queue {
   private String name;
   private Map<String,AccessControlList> acls;
   private QueueState state = QueueState.RUNNING;
+  private QueueMetrics queueMetrics;
 
   /**
    * An Object that can be used by schedulers to fill in
@@ -69,10 +70,12 @@ class Queue {
    * @param acls ACLs for the queue
    * @param state state of the queue
    */
-  Queue(String name, Map<String, AccessControlList> acls, QueueState state) {
+  Queue(String name, Map<String, AccessControlList> acls, QueueState state,
+        QueueMetrics metrics) {
 	  this.name = name;
 	  this.acls = acls;
 	  this.state = state;
+	  this.queueMetrics = metrics;
   }
 
   /**
@@ -149,4 +152,12 @@ class Queue {
   void setSchedulingInfo(Object schedulingInfo) {
     this.schedulingInfo = schedulingInfo;
   }
+
+  /**
+   * Return the QueueMetrics object for this queue
+   * @return QueueMetrics
+   */
+  public QueueMetrics getMetrics() {
+    return this.queueMetrics;
+  }
 }

Modified: hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/QueueManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/QueueManager.java?rev=1131280&r1=1131279&r2=1131280&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/QueueManager.java
(original)
+++ hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/QueueManager.java
Fri Jun  3 22:51:26 2011
@@ -118,7 +118,7 @@ class QueueManager {
         LOG.error("The queue, " + name + " does not have a configured ACL list");
       }
       queues.put(name, new Queue(name, getQueueAcls(name, conf),
-          getQueueState(name, conf)));
+          getQueueState(name, conf), QueueMetrics.create(name, conf)));
     }
     
     return queues;
@@ -136,7 +136,17 @@ class QueueManager {
   public synchronized Set<String> getQueues() {
     return queues.keySet();
   }
-  
+
+  /**
+   * Return a specific queue configured in the system.
+   * 
+   * @param queueName Name of the queue requested
+   * @return Queue object corresponding to queueName
+   */
+  public synchronized Queue getQueue(String queueName) {
+    return queues.get(queueName);
+  }
+
   /**
    * Return true if the given user is part of the ACL for the given
    * {@link QueueACL} name for the given queue.

Added: hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/QueueMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/QueueMetrics.java?rev=1131280&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/QueueMetrics.java
(added)
+++ hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/QueueMetrics.java
Fri Jun  3 22:51:26 2011
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import org.apache.hadoop.metrics2.MetricsBuilder;
+import org.apache.hadoop.metrics2.MetricsSource;
+import org.apache.hadoop.metrics2.lib.MetricMutableCounterInt;
+import org.apache.hadoop.metrics2.lib.MetricMutableCounterLong;
+import org.apache.hadoop.metrics2.lib.MetricMutableGaugeInt;
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+import org.apache.hadoop.metrics2.source.JvmMetricsSource;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+
+/**
+ *
+ */
+@SuppressWarnings("deprecation")
+class QueueMetrics implements MetricsSource {
+  private static final Log LOG =
+    LogFactory.getLog(QueueMetrics.class);
+
+  final MetricsRegistry registry = new MetricsRegistry("Queue");
+  final MetricMutableCounterInt mapsLaunched =
+      registry.newCounter("maps_launched", "", 0);
+  final MetricMutableCounterInt mapsCompleted =
+      registry.newCounter("maps_completed", "", 0);
+  final MetricMutableCounterInt mapsFailed =
+      registry.newCounter("maps_failed", "", 0);
+  final MetricMutableCounterInt redsLaunched =
+      registry.newCounter("reduces_launched", "", 0);
+  final MetricMutableCounterInt redsCompleted =
+      registry.newCounter("reduces_completed", "", 0);
+  final MetricMutableCounterInt redsFailed =
+      registry.newCounter("reduces_failed", "", 0);
+  final MetricMutableCounterInt jobsSubmitted =
+      registry.newCounter("jobs_submitted", "", 0);
+  final MetricMutableCounterInt jobsCompleted =
+      registry.newCounter("jobs_completed", "", 0);
+  final MetricMutableGaugeInt waitingMaps =
+      registry.newGauge("waiting_maps", "", 0);
+  final MetricMutableGaugeInt waitingReds =
+      registry.newGauge("waiting_reduces", "", 0);
+  final MetricMutableGaugeInt reservedMapSlots =
+      registry.newGauge("reserved_map_slots", "", 0);
+  final MetricMutableGaugeInt reservedRedSlots =
+      registry.newGauge("reserved_reduce_slots", "", 0);
+  final MetricMutableCounterInt jobsFailed =
+      registry.newCounter("jobs_failed", "", 0);
+  final MetricMutableCounterInt jobsKilled =
+      registry.newCounter("jobs_killed", "", 0);
+  final MetricMutableGaugeInt jobsPreparing =
+      registry.newGauge("jobs_preparing", "", 0);
+  final MetricMutableGaugeInt jobsRunning =
+      registry.newGauge("jobs_running", "", 0);
+  final MetricMutableCounterInt mapsKilled =
+      registry.newCounter("maps_killed", "", 0);
+  final MetricMutableCounterInt redsKilled =
+      registry.newCounter("reduces_killed", "", 0);
+
+  final String sessionId;
+  private String queueName;
+
+  public QueueMetrics(String queueName, Configuration conf) {
+    this.queueName = queueName;
+    sessionId = conf.get("session.id", "");
+    registry.setContext("mapred").tag("sessionId", "", sessionId);
+    registry.tag("Queue", "Metrics by queue", queueName);
+  }
+
+  public String getQueueName() {
+    return this.queueName;
+  }
+
+  public void getMetrics(MetricsBuilder builder, boolean all) {
+    registry.snapshot(builder.addRecord(registry.name()), all);
+  }
+
+  public void launchMap(TaskAttemptID taskAttemptID) {
+    mapsLaunched.incr();
+    decWaitingMaps(taskAttemptID.getJobID(), 1);
+  }
+
+  public void completeMap(TaskAttemptID taskAttemptID) {
+    mapsCompleted.incr();
+  }
+
+  public void failedMap(TaskAttemptID taskAttemptID) {
+    mapsFailed.incr();
+    addWaitingMaps(taskAttemptID.getJobID(), 1);
+  }
+
+  public void launchReduce(TaskAttemptID taskAttemptID) {
+    redsLaunched.incr();
+    decWaitingReduces(taskAttemptID.getJobID(), 1);
+  }
+
+  public void completeReduce(TaskAttemptID taskAttemptID) {
+    redsCompleted.incr();
+  }
+
+  public void failedReduce(TaskAttemptID taskAttemptID) {
+    redsFailed.incr();
+    addWaitingReduces(taskAttemptID.getJobID(), 1);
+  }
+
+  public void submitJob(JobConf conf, JobID id) {
+    jobsSubmitted.incr();
+  }
+
+  public void completeJob(JobConf conf, JobID id) {
+    jobsCompleted.incr();
+  }
+
+  public void addWaitingMaps(JobID id, int task) {
+    waitingMaps.incr(task);
+  }
+
+  public void decWaitingMaps(JobID id, int task) {
+    waitingMaps.decr(task);
+  }
+
+  public void addWaitingReduces(JobID id, int task) {
+    waitingReds.incr(task);
+  }
+
+  public void decWaitingReduces(JobID id, int task){
+    waitingReds.decr(task);
+  }
+
+  public void addReservedMapSlots(int slots) {
+    reservedMapSlots.incr(slots);;
+  }
+
+  public void decReservedMapSlots(int slots) {
+    reservedMapSlots.decr(slots);
+  }
+
+  public void addReservedReduceSlots(int slots) {
+    reservedRedSlots.incr(slots);
+  }
+
+  public void decReservedReduceSlots(int slots) {
+    reservedRedSlots.decr(slots);
+  }
+
+  public void failedJob(JobConf conf, JobID id) {
+    jobsFailed.incr();
+  }
+
+  public void killedJob(JobConf conf, JobID id) {
+    jobsKilled.incr();
+  }
+
+  public void addPrepJob(JobConf conf, JobID id) {
+    jobsPreparing.incr();
+  }
+
+  public void decPrepJob(JobConf conf, JobID id) {
+    jobsPreparing.decr();
+  }
+
+  public void addRunningJob(JobConf conf, JobID id) {
+    jobsRunning.incr();
+  }
+
+  public void decRunningJob(JobConf conf, JobID id) {
+    jobsRunning.decr();
+  }
+
+  public void killedMap(TaskAttemptID taskAttemptID) {
+    mapsKilled.incr();
+  }
+
+  public void killedReduce(TaskAttemptID taskAttemptID) {
+    redsKilled.incr();
+  }
+
+  static QueueMetrics create(String queueName, Configuration conf) {
+    return create(queueName, conf, DefaultMetricsSystem.INSTANCE);
+  }
+
+  static QueueMetrics create(String queueName, Configuration conf,
+                                     MetricsSystem ms) {
+    return ms.register("QueueMetrics,q=" + queueName, "Queue metrics",
+                       new QueueMetrics(queueName, conf));
+  }
+
+}

Added: hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestQueueMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestQueueMetrics.java?rev=1131280&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestQueueMetrics.java
(added)
+++ hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestQueueMetrics.java
Fri Jun  3 22:51:26 2011
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import junit.framework.TestCase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.examples.SleepJob;
+import org.apache.hadoop.metrics2.MetricsBuilder;
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import org.apache.hadoop.metrics2.MetricsSource;
+import org.apache.hadoop.test.MetricsAsserts;
+import org.apache.jasper.tagplugins.jstl.core.When;
+import org.mockito.Mockito;
+
+import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
+import static org.apache.hadoop.test.MetricsAsserts.assertGauge;
+import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+@SuppressWarnings("deprecation")
+public class TestQueueMetrics extends TestCase {
+
+  QueueMetrics metrics = Mockito.mock(QueueMetrics.class);
+  static int jobIdCounter = 0;
+  static final String jtIdentifier = "queue_jt";
+
+  private static JobID getJobId() {
+    return new JobID(TestQueueMetrics.jtIdentifier, jobIdCounter++);
+  }
+
+  public void testDefaultSingleQueueMetrics() {
+    String queueName = "single";
+    TaskAttemptID taskAttemptID = Mockito.mock(TaskAttemptID.class);
+    when(taskAttemptID.getJobID()).thenReturn(TestQueueMetrics.getJobId());
+
+    QueueMetrics metrics = QueueMetrics.create(queueName, new Configuration());
+
+    assertEquals(metrics.getQueueName(), "single");
+    metrics.launchMap(taskAttemptID);
+    checkMaps(metrics, 1, 0, 0, 0, -1, 0);
+    metrics.addWaitingMaps(taskAttemptID.getJobID(), 5);
+    metrics.launchMap(taskAttemptID);
+    checkMaps(metrics, 2, 0, 0, 0, 3, 0);
+    checkReduces(metrics, 0, 0, 0, 0, 0, 0);
+
+    metrics.completeMap(taskAttemptID);
+    metrics.failedMap(taskAttemptID);
+    checkMaps(metrics, 2, 1, 1, 0, 4, 0);
+    checkReduces(metrics, 0, 0, 0, 0, 0, 0);
+
+    metrics.launchReduce(taskAttemptID);
+    metrics.completeReduce(taskAttemptID);
+    metrics.failedReduce(taskAttemptID);
+    checkMaps(metrics, 2, 1, 1, 0, 4, 0);
+    checkReduces(metrics, 1, 1, 1, 0, 0, 0);
+
+    metrics.addWaitingMaps(null, 20);
+    metrics.decWaitingMaps(null, 10);
+    metrics.addWaitingReduces(null, 20);
+    metrics.decWaitingReduces(null, 10);
+    checkMaps(metrics, 2, 1, 1, 0, 14, 0);
+    checkReduces(metrics, 1, 1, 1, 0, 10, 0);
+
+    metrics.addReservedMapSlots(10);
+    metrics.addReservedReduceSlots(10);
+    checkMaps(metrics, 2, 1, 1, 0, 14, 10);
+    checkReduces(metrics, 1, 1, 1, 0, 10, 10);
+    metrics.decReservedReduceSlots(5);
+    metrics.decReservedMapSlots(5);
+    checkMaps(metrics, 2, 1, 1, 0, 14, 5);
+    checkReduces(metrics, 1, 1, 1, 0, 10, 5);
+
+    metrics.killedMap(taskAttemptID);
+    metrics.killedReduce(taskAttemptID);
+    checkMaps(metrics, 2, 1, 1, 1, 14, 5);
+    checkReduces(metrics, 1, 1, 1, 1, 10, 5);
+    checkJobs(metrics, 0, 0, 0, 0, 0, 0);  
+
+    metrics.submitJob(null, null);
+    metrics.completeJob(null, null);
+    metrics.failedJob(null, null);
+    metrics.killedJob(null, null);
+    checkJobs(metrics, 1, 1, 1, 1, 0, 0);
+
+    metrics.addPrepJob(null, null);
+    metrics.addRunningJob(null, null);
+    metrics.addPrepJob(null, null);
+    metrics.addRunningJob(null, null);
+    checkJobs(metrics, 1, 1, 1, 1, 2, 2);
+    metrics.decPrepJob(null, null);
+    metrics.decRunningJob(null, null);
+    checkJobs(metrics, 1, 1, 1, 1, 1, 1);
+    checkMaps(metrics, 2, 1, 1, 1, 14, 5);
+    checkReduces(metrics, 1, 1, 1, 1, 10, 5);
+  }
+
+  public static void checkMaps(QueueMetrics metrics,
+      int maps_launched, int maps_completed, int maps_failed, int maps_killed,
+      int waiting_maps, int reserved_map_slots) {
+    assertCounter("maps_launched", maps_launched, metrics);
+    assertCounter("maps_completed", maps_completed, metrics);
+    assertCounter("maps_failed", maps_failed, metrics);
+    assertCounter("maps_killed", maps_killed, metrics);
+    assertGauge("waiting_maps", waiting_maps, metrics);
+    assertGauge("reserved_map_slots", reserved_map_slots, metrics);
+  }
+
+  public static void checkReduces(QueueMetrics metrics,
+      int reduces_launched, int reduces_completed, int reduces_failed,
+      int reduces_killed, int waiting_reduces, int reserved_reduce_slots) {
+    assertCounter("reduces_launched", reduces_launched, metrics);
+    assertCounter("reduces_completed", reduces_completed, metrics);
+    assertCounter("reduces_failed", reduces_failed, metrics);
+    assertCounter("reduces_killed", reduces_killed, metrics);
+    assertGauge("waiting_reduces", waiting_reduces, metrics);
+    assertGauge("reserved_reduce_slots", reserved_reduce_slots, metrics);
+  }
+
+  public static void checkJobs(QueueMetrics metrics, int jobs_submitted, int jobs_completed,
+      int jobs_failed, int jobs_killed, int jobs_preparing, int jobs_running) {
+    assertCounter("jobs_submitted", jobs_submitted, metrics);
+    assertCounter("jobs_completed", jobs_completed, metrics);
+    assertCounter("jobs_failed", jobs_failed, metrics);
+    assertCounter("jobs_killed", jobs_killed, metrics);
+    assertGauge("jobs_preparing", jobs_preparing, metrics);
+    assertGauge("jobs_running", jobs_running, metrics);    
+  }
+}



Mime
View raw message