hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cdoug...@apache.org
Subject svn commit: r755497 - in /hadoop/core/branches/branch-0.20: ./ src/mapred/org/apache/hadoop/mapred/
Date Wed, 18 Mar 2009 07:09:30 GMT
Author: cdouglas
Date: Wed Mar 18 07:09:29 2009
New Revision: 755497

URL: http://svn.apache.org/viewvc?rev=755497&view=rev
Log:
HADOOP-5514. Fix JobTracker metrics and add metrics for wating, failed tasks.

Modified:
    hadoop/core/branches/branch-0.20/CHANGES.txt
    hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobTracker.java
    hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobTrackerInstrumentation.java
    hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobTrackerMetricsInst.java

Modified: hadoop/core/branches/branch-0.20/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/CHANGES.txt?rev=755497&r1=755496&r2=755497&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/CHANGES.txt (original)
+++ hadoop/core/branches/branch-0.20/CHANGES.txt Wed Mar 18 07:09:29 2009
@@ -748,6 +748,9 @@
     HADOOP-5463. Balancer throws "Not a host:port pair" unless port is
     specified in fs.default.name. (Stuart White via hairong)
 
+    HADOOP-5514. Fix JobTracker metrics and add metrics for wating, failed
+    tasks. (cdouglas)
+
 Release 0.19.2 - Unreleased
 
   BUG FIXES

Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=755497&r1=755496&r2=755497&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
(original)
+++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
Wed Mar 18 07:09:29 2009
@@ -415,6 +415,8 @@
                 (numMapTasks + numReduceTasks) +
                 " exceeds the configured limit " + maxTasks);
     }
+    jobtracker.getInstrumentation().addWaiting(
+        getJobID(), numMapTasks + numReduceTasks);
 
     maps = new TaskInProgress[numMapTasks];
     for(int i=0; i < numMapTasks; ++i) {
@@ -734,8 +736,7 @@
   // Status update methods
   ////////////////////////////////////////////////////
   public synchronized void updateTaskStatus(TaskInProgress tip, 
-                                            TaskStatus status,
-                                            JobTrackerInstrumentation metrics) {
+                                            TaskStatus status) {
 
     double oldProgress = tip.getProgress();   // save old progress
     boolean wasRunning = tip.isRunning();
@@ -833,7 +834,7 @@
         
         // Tell the job to fail the relevant task
         failedTask(tip, taskid, status, ttStatus,
-                   wasRunning, wasComplete, metrics);
+                   wasRunning, wasComplete);
 
         // Did the task failure lead to tip failure?
         TaskCompletionEvent.Status taskCompletionStatus = 
@@ -864,7 +865,7 @@
         this.taskCompletionEvents.add(taskEvent);
         taskCompletionEventTracker++;
         if (state == TaskStatus.State.SUCCEEDED) {
-          completedTask(tip, status, metrics);
+          completedTask(tip, status);
         }
       }
     }
@@ -1267,6 +1268,7 @@
     if (!isScheduled) {
       tip.addRunningTask(id, tts.getTrackerName());
     }
+    final JobTrackerInstrumentation metrics = jobtracker.getInstrumentation();
 
     // keeping the earlier ordering intact
     String name;
@@ -1285,12 +1287,14 @@
       splits = tip.getSplitNodes();
       if (tip.getActiveTasks().size() > 1)
         speculativeMapTasks++;
+      metrics.launchMap(id);
     } else {
       ++runningReduceTasks;
       name = Values.REDUCE.name();
       counter = Counter.TOTAL_LAUNCHED_REDUCES;
       if (tip.getActiveTasks().size() > 1)
         speculativeReduceTasks++;
+      metrics.launchReduce(id);
     }
     // Note that the logs are for the scheduled tasks only. Tasks that join on 
     // restart has already their logs in place.
@@ -1959,11 +1963,11 @@
    * A taskid assigned to this JobInProgress has reported in successfully.
    */
   public synchronized boolean completedTask(TaskInProgress tip, 
-                                         TaskStatus status,
-                                         JobTrackerInstrumentation metrics) 
+                                            TaskStatus status)
   {
     TaskAttemptID taskid = status.getTaskID();
     int oldNumAttempts = tip.getActiveTasks().size();
+    final JobTrackerInstrumentation metrics = jobtracker.getInstrumentation();
         
     // Sanity check: is the TIP already complete? 
     // It _is_ safe to not decrement running{Map|Reduce}Tasks and
@@ -2047,7 +2051,7 @@
         terminateJob(JobStatus.KILLED);
       }
       else {
-        jobComplete(metrics);
+        jobComplete();
       }
       // The job has been killed/failed/successful
       // JobTracker should cleanup this task
@@ -2085,10 +2089,9 @@
   /**
    * The job is done since all it's component tasks are either
    * successful or have failed.
-   * 
-   * @param metrics job-tracker metrics
    */
-  private void jobComplete(JobTrackerInstrumentation metrics) {
+  private void jobComplete() {
+    final JobTrackerInstrumentation metrics = jobtracker.getInstrumentation();
     //
     // All tasks are complete, then the job is done!
     //
@@ -2186,12 +2189,12 @@
     while (!mapCleanupTasks.isEmpty()) {
       taskid = mapCleanupTasks.remove(0);
       tip = maps[taskid.getTaskID().getId()];
-      updateTaskStatus(tip, tip.getTaskStatus(taskid), null);
+      updateTaskStatus(tip, tip.getTaskStatus(taskid));
     }
     while (!reduceCleanupTasks.isEmpty()) {
       taskid = reduceCleanupTasks.remove(0);
       tip = reduces[taskid.getTaskID().getId()];
-      updateTaskStatus(tip, tip.getTaskStatus(taskid), null);
+      updateTaskStatus(tip, tip.getTaskStatus(taskid));
     }
   }
 
@@ -2239,8 +2242,8 @@
   private void failedTask(TaskInProgress tip, TaskAttemptID taskid, 
                           TaskStatus status, 
                           TaskTrackerStatus taskTrackerStatus,
-                          boolean wasRunning, boolean wasComplete,
-                          JobTrackerInstrumentation metrics) {
+                          boolean wasRunning, boolean wasComplete) {
+    final JobTrackerInstrumentation metrics = jobtracker.getInstrumentation();
     // check if the TIP is already failed
     boolean wasFailed = tip.isFailed();
 
@@ -2258,6 +2261,7 @@
         launchedSetup = false;
       } else if (tip.isMapTask()) {
         runningMapTasks -= 1;
+        metrics.failedMap(taskid);
         // remove from the running queue and put it in the non-running cache
         // if the tip is not complete i.e if the tip still needs to be run
         if (!isComplete) {
@@ -2266,6 +2270,7 @@
         }
       } else {
         runningReduceTasks -= 1;
+        metrics.failedReduce(taskid);
         // remove from the running queue and put in the failed queue if the tip
         // is not complete
         if (!isComplete) {
@@ -2417,7 +2422,7 @@
    */
   public void failedTask(TaskInProgress tip, TaskAttemptID taskid, String reason, 
                          TaskStatus.Phase phase, TaskStatus.State state, 
-                         String trackerName, JobTrackerInstrumentation metrics) {
+                         String trackerName) {
     TaskStatus status = TaskStatus.createTaskStatus(tip.isMapTask(), 
                                                     taskid,
                                                     0.0f,
@@ -2434,7 +2439,7 @@
     status.setStartTime(startTime);
     status.setFinishTime(System.currentTimeMillis());
     boolean wasComplete = tip.isComplete();
-    updateTaskStatus(tip, status, metrics);
+    updateTaskStatus(tip, status);
     boolean isComplete = tip.isComplete();
     if (wasComplete && !isComplete) { // mark a successful tip as failed
       String taskType = getTaskType(tip);
@@ -2451,6 +2456,8 @@
    */
   synchronized void garbageCollect() {
     // Let the JobTracker know that a job is complete
+    jobtracker.getInstrumentation(
+        ).decWaiting(getJobID(), pendingMaps() + pendingReduces());
     jobtracker.storeCompletedJob(this);
     jobtracker.finalizeJob(this);
       
@@ -2556,8 +2563,7 @@
   
   synchronized void fetchFailureNotification(TaskInProgress tip, 
                                              TaskAttemptID mapTaskId, 
-                                             String trackerName, 
-                                             JobTrackerInstrumentation metrics) {
+                                             String trackerName) {
     Integer fetchFailures = mapTaskIdToFetchFailuresMap.get(mapTaskId);
     fetchFailures = (fetchFailures == null) ? 1 : (fetchFailures+1);
     mapTaskIdToFetchFailuresMap.put(mapTaskId, fetchFailures);
@@ -2577,7 +2583,7 @@
       failedTask(tip, mapTaskId, "Too many fetch-failures",                            
                  (tip.isMapTask() ? TaskStatus.Phase.MAP : 
                                     TaskStatus.Phase.REDUCE), 
-                 TaskStatus.State.FAILED, trackerName, metrics);
+                 TaskStatus.State.FAILED, trackerName);
       
       mapTaskIdToFetchFailuresMap.remove(mapTaskId);
     }

Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=755497&r1=755496&r2=755497&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobTracker.java Wed
Mar 18 07:09:29 2009
@@ -253,7 +253,7 @@
                                      tip.isMapTask()? TaskStatus.Phase.MAP:
                                      TaskStatus.Phase.STARTING,
                                      TaskStatus.State.FAILED,
-                                     trackerName, myInstrumentation);
+                                     trackerName);
                   }
                   itr.remove();
                 } else {
@@ -931,7 +931,7 @@
         // This will add the tip failed event in the new log
         tip.getJob().failedTask(tip, id, status.getDiagnosticInfo(), 
                                 status.getPhase(), status.getRunState(), 
-                                status.getTaskTracker(), myInstrumentation);
+                                status.getTaskTracker());
       }
     }
     
@@ -1045,7 +1045,7 @@
       taskStatus.setCounters(counter);
       
       // II. Replay the status
-      job.updateTaskStatus(tip, taskStatus, myInstrumentation);
+      job.updateTaskStatus(tip, taskStatus);
       
       // III. Prevent the task from expiry
       expireLaunchingTasks.removeTask(attemptId);
@@ -1082,7 +1082,7 @@
       taskStatus.setDiagnosticInfo(diagInfo); // diag info
 
       // II. Update the task status
-     job.updateTaskStatus(tip, taskStatus, myInstrumentation);
+     job.updateTaskStatus(tip, taskStatus);
 
      // III. Prevent the task from expiry
      expireLaunchingTasks.removeTask(attemptId);
@@ -1231,7 +1231,7 @@
     }
   }
 
-  private JobTrackerInstrumentation myInstrumentation = null;
+  private final JobTrackerInstrumentation myInstrumentation;
     
   /////////////////////////////////////////////////////////////////
   // The real JobTracker
@@ -1457,18 +1457,21 @@
     
     trackerIdentifier = getDateFormat().format(new Date());
 
-    Class<? extends JobTrackerInstrumentation> metricsInst = getInstrumentationClass(jobConf);
+    // Initialize instrumentation
+    JobTrackerInstrumentation tmp;
+    Class<? extends JobTrackerInstrumentation> metricsInst =
+      getInstrumentationClass(jobConf);
     try {
       java.lang.reflect.Constructor<? extends JobTrackerInstrumentation> c =
         metricsInst.getConstructor(new Class[] {JobTracker.class, JobConf.class} );
-      this.myInstrumentation = c.newInstance(this, jobConf);
+      tmp = c.newInstance(this, jobConf);
     } catch(Exception e) {
       //Reflection can throw lots of exceptions -- handle them all by 
       //falling back on the default.
       LOG.error("failed to initialize job tracker metrics", e);
-      this.myInstrumentation = new JobTrackerMetricsInst(this, jobConf);
+      tmp = new JobTrackerMetricsInst(this, jobConf);
     }
- 
+    myInstrumentation = tmp;
     
     // The rpc/web-server ports can be ephemeral ports... 
     // ... ensure we have the correct info
@@ -1611,6 +1614,10 @@
         t, JobTrackerInstrumentation.class);
   }
 
+  JobTrackerInstrumentation getInstrumentation() {
+    return myInstrumentation;
+  }
+
   public static InetSocketAddress getAddress(Configuration conf) {
     String jobTrackerStr =
       conf.get("mapred.job.tracker", "localhost:8012");
@@ -1736,12 +1743,6 @@
     // taskid --> TIP
     taskidToTIPMap.put(taskid, tip);
     
-    // Note this launch
-    if (taskid.isMap()) {
-      myInstrumentation.launchMap(taskid);
-    } else {
-      myInstrumentation.launchReduce(taskid);
-    }
   }
     
   void removeTaskEntry(TaskAttemptID taskid) {
@@ -3249,7 +3250,7 @@
         
         // Update the job and inform the listeners if necessary
         JobStatus prevStatus = (JobStatus)job.getStatus().clone();
-        job.updateTaskStatus(tip, report, myInstrumentation);
+        job.updateTaskStatus(tip, report);
         JobStatus newStatus = (JobStatus)job.getStatus().clone();
         
         // Update the listeners if an incomplete job completes
@@ -3278,8 +3279,7 @@
             }
             failedFetchMap.getJob().fetchFailureNotification(failedFetchMap, 
                                                              mapTaskId, 
-                                                             failedFetchTrackerName, 
-                                                             myInstrumentation);
+                                                             failedFetchTrackerName);
           }
         }
       }
@@ -3329,7 +3329,7 @@
                                TaskStatus.Phase.MAP : 
                                TaskStatus.Phase.REDUCE), 
                             killState,
-                            trackerName, myInstrumentation);
+                            trackerName);
             jobsWithFailures.add(job);
           }
         } else {

Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobTrackerInstrumentation.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobTrackerInstrumentation.java?rev=755497&r1=755496&r2=755497&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobTrackerInstrumentation.java
(original)
+++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobTrackerInstrumentation.java
Wed Mar 18 07:09:29 2009
@@ -31,16 +31,27 @@
   public void completeMap(TaskAttemptID taskAttemptID)
   { }
 
+  public void failedMap(TaskAttemptID taskAttemptID)
+  { }
+
   public void launchReduce(TaskAttemptID taskAttemptID)
   { }
 
   public void completeReduce(TaskAttemptID taskAttemptID)
-  {  }
+  { }
   
+  public void failedReduce(TaskAttemptID taskAttemptID)
+  { }
+
   public void submitJob(JobConf conf, JobID id) 
   { }
     
   public void completeJob(JobConf conf, JobID id) 
   { }
 
+  public void addWaiting(JobID id, int tasks)
+  { }
+
+  public void decWaiting(JobID id, int tasks)
+  { }
 }

Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobTrackerMetricsInst.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobTrackerMetricsInst.java?rev=755497&r1=755496&r2=755497&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobTrackerMetricsInst.java
(original)
+++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobTrackerMetricsInst.java
Wed Mar 18 07:09:29 2009
@@ -22,15 +22,21 @@
 import org.apache.hadoop.metrics.MetricsUtil;
 import org.apache.hadoop.metrics.Updater;
 import org.apache.hadoop.metrics.jvm.JvmMetrics;
+import org.apache.hadoop.metrics.util.MetricsRegistry;
+import org.apache.hadoop.metrics.util.MetricsTimeVaryingInt;
 
 class JobTrackerMetricsInst extends JobTrackerInstrumentation implements Updater {
-  private MetricsRecord metricsRecord = null;
-  int numMapTasksLaunched = 0;
-  int numMapTasksCompleted = 0;
-  int numReduceTasksLaunched = 0;
-  int numReduceTasksCompleted = 0;
+  private final MetricsRecord metricsRecord;
+
+  private int numMapTasksLaunched = 0;
+  private int numMapTasksCompleted = 0;
+  private int numMapTasksFailed = 0;
+  private int numReduceTasksLaunched = 0;
+  private int numReduceTasksCompleted = 0;
+  private int numReduceTasksFailed = 0;
   private int numJobsSubmitted = 0;
   private int numJobsCompleted = 0;
+  private int numWaitingTasks = 0;
     
   public JobTrackerMetricsInst(JobTracker tracker, JobConf conf) {
     super(tracker, conf);
@@ -52,15 +58,21 @@
     synchronized (this) {
       metricsRecord.incrMetric("maps_launched", numMapTasksLaunched);
       metricsRecord.incrMetric("maps_completed", numMapTasksCompleted);
+      metricsRecord.incrMetric("maps_failed", numMapTasksFailed);
       metricsRecord.incrMetric("reduces_launched", numReduceTasksLaunched);
       metricsRecord.incrMetric("reduces_completed", numReduceTasksCompleted);
+      metricsRecord.incrMetric("reduces_failed", numReduceTasksFailed);
       metricsRecord.incrMetric("jobs_submitted", numJobsSubmitted);
       metricsRecord.incrMetric("jobs_completed", numJobsCompleted);
-            
+      metricsRecord.incrMetric("waiting_tasks", numWaitingTasks);
+
       numMapTasksLaunched = 0;
       numMapTasksCompleted = 0;
+      numMapTasksFailed = 0;
       numReduceTasksLaunched = 0;
       numReduceTasksCompleted = 0;
+      numReduceTasksFailed = 0;
+      numWaitingTasks = 0;
       numJobsSubmitted = 0;
       numJobsCompleted = 0;
     }
@@ -76,6 +88,7 @@
   @Override
   public synchronized void launchMap(TaskAttemptID taskAttemptID) {
     ++numMapTasksLaunched;
+    decWaiting(taskAttemptID.getJobID(), 1);
   }
 
   @Override
@@ -84,8 +97,15 @@
   }
 
   @Override
+  public synchronized void failedMap(TaskAttemptID taskAttemptID) {
+    ++numMapTasksFailed;
+    addWaiting(taskAttemptID.getJobID(), 1);
+  }
+
+  @Override
   public synchronized void launchReduce(TaskAttemptID taskAttemptID) {
     ++numReduceTasksLaunched;
+    decWaiting(taskAttemptID.getJobID(), 1);
   }
 
   @Override
@@ -94,6 +114,12 @@
   }
 
   @Override
+  public synchronized void failedReduce(TaskAttemptID taskAttemptID) {
+    ++numReduceTasksFailed;
+    addWaiting(taskAttemptID.getJobID(), 1);
+  }
+
+  @Override
   public synchronized void submitJob(JobConf conf, JobID id) {
     ++numJobsSubmitted;
   }
@@ -102,4 +128,14 @@
   public synchronized void completeJob(JobConf conf, JobID id) {
     ++numJobsCompleted;
   }
+
+  @Override
+  public synchronized void addWaiting(JobID id, int tasks) {
+    numWaitingTasks += tasks;
+  }
+
+  @Override
+  public synchronized void decWaiting(JobID id, int tasks) {
+    numWaitingTasks -= tasks;
+  }
 }



Mime
View raw message