hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r511075 - in /lucene/hadoop/trunk: ./ src/examples/org/apache/hadoop/examples/ src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/metrics/ src/test/org/apache/hadoop/fs/ src/test/org/apache/hadoop/mapred/ src/test/org/apache/hado...
Date Fri, 23 Feb 2007 20:13:02 GMT
Author: cutting
Date: Fri Feb 23 12:13:00 2007
New Revision: 511075

URL: http://svn.apache.org/viewvc?view=rev&rev=511075
Log:
HADOOP-492.  Add counters.  Contributed by David Bowen.

Added:
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Counters.java
    lucene/hadoop/trunk/src/webapps/job/taskstats.jsp
Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/WordCount.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobStatus.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Reporter.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/RunningJob.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskReport.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskStatus.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/metrics/MetricsUtil.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/DFSCIOTest.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestDFSIO.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileInputFilter.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileInputFormat.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestTextInputFormat.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/record/test/TestWritable.java
    lucene/hadoop/trunk/src/webapps/job/jobdetails.jsp
    lucene/hadoop/trunk/src/webapps/job/jobtasks.jsp
    lucene/hadoop/trunk/src/webapps/job/taskdetails.jsp

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=511075&r1=511074&r2=511075
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Fri Feb 23 12:13:00 2007
@@ -122,6 +122,13 @@
 36. HADOOP-1029.  Fix streaming's input format to correctly seek to
     the start of splits.  (Arun C Murthy via cutting)
 
+37. HADOOP-492.  Add per-job and per-task counters.  These are
+    incremented via the Reporter interface and available through the
+    web ui and the JobClient API.  The mapreduce framework maintains a
+    few basic counters, and applications may add their own.  Counters
+    are also passed to the metrics system.
+    (David Bowen via cutting)
+
 
 Release 0.11.2 - 2007-02-16
 

Modified: lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/WordCount.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/WordCount.java?view=diff&rev=511075&r1=511074&r2=511075
==============================================================================
--- lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/WordCount.java (original)
+++ lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/WordCount.java Fri Feb 23 12:13:00 2007
@@ -47,6 +47,9 @@
  */
 public class WordCount {
   
+  // These are just for testing counters
+  private enum Counter { WORDS, VALUES }
+  
   /**
    * Counts the words in each line.
    * For each line of input, break the line into words and emit them as
@@ -65,6 +68,7 @@
       while (itr.hasMoreTokens()) {
         word.set(itr.nextToken());
         output.collect(word, one);
+        reporter.incrCounter(Counter.WORDS, 1);
       }
     }
   }
@@ -80,6 +84,7 @@
       int sum = 0;
       while (values.hasNext()) {
         sum += ((IntWritable) values.next()).get();
+        reporter.incrCounter(Counter.VALUES, 1);
       }
       output.collect(key, new IntWritable(sum));
     }

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Counters.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Counters.java?view=auto&rev=511075
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Counters.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Counters.java Fri Feb 23 12:13:00 2007
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.io.UTF8;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * A set of named counters.
+ */
+public class Counters implements Writable {
+  
+  private Map<String,Long> counters = new TreeMap<String,Long>();
+
+  /**
+   * Returns the names of all counters.
+   * @return Set of counter names.
+   */
+  public synchronized Set<String> getCounterNames() {
+    return counters.keySet();
+  }
+  
+  /**
+   * Returns the value of the named counter, or 0 if counter doesn't exist.
+   * @param name name of a counter
+   * @return value of the counter
+   */
+  public synchronized long getCounter(String name) {
+    Long result = counters.get(name);
+    return (result == null ? 0L : result);
+  }
+  
+  /**
+   * Increments the named counter by the specified amount, creating it if
+   * it didn't already exist.
+   * @param name of a counter
+   * @param amount amount by which counter is to be incremented
+   */
+  public synchronized void incrCounter(String name, long amount) {
+    counters.put(name, amount + getCounter(name));
+  }
+  
+  /**
+   * Increments multiple counters by their amounts in another Counters 
+   * instance.
+   * @param other the other Counters instance
+   */
+  public synchronized void incrAllCounters(Counters other) {
+    for (String name : other.getCounterNames()) {
+      incrCounter(name, other.getCounter(name));
+    }
+  }
+  
+  /**
+   * Returns the number of counters.
+   */
+  public synchronized int size() {
+	  return counters.size();
+  }
+  
+  // Writable
+  
+  public synchronized void write(DataOutput out) throws IOException {
+    out.writeInt(counters.size());
+    for (String name : counters.keySet()) {
+      UTF8.writeString(out, name);
+      out.writeLong(counters.get(name));
+    }
+  }
+  
+  public synchronized void readFields(DataInput in) throws IOException {
+    int n = in.readInt();
+    while (n-- > 0) {
+      String name = UTF8.readString(in);
+      long value = in.readLong();
+      counters.put(name, value);
+    }
+  }
+  
+  /**
+   * Logs the current counter values.
+   * @param log The log to use.
+   */
+  public void log(Log log) {
+    log.info("Counters: " + getCounterNames().size());
+    for (String counterName : getCounterNames()) {
+      log.info("  " + counterName + "=" + getCounter(counterName));
+    }
+  }
+  
+}

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java?view=diff&rev=511075&r1=511074&r2=511075
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java Fri Feb 23 12:13:00 2007
@@ -25,8 +25,10 @@
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.commons.logging.*;
-import org.apache.hadoop.fs.*;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
@@ -58,7 +60,9 @@
     }
 
     public void progress(String taskid, float progress, String state,
-                         TaskStatus.Phase phase) throws IOException {
+                         TaskStatus.Phase phase, Counters counters) 
+      throws IOException 
+    {
       StringBuffer buf = new StringBuffer("Task ");
       buf.append(taskid);
       buf.append(" making progress to ");
@@ -69,6 +73,7 @@
       }
       LOG.info(buf.toString());
       // ignore phase
+      // ignore counters
     }
 
     public void reportDiagnosticInfo(String taskid, String trace) throws IOException {

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java?view=diff&rev=511075&r1=511074&r2=511075
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java Fri Feb 23 12:13:00 2007
@@ -175,6 +175,10 @@
                 "map() completion: " + status.mapProgress() + "\n" + 
                 "reduce() completion: " + status.reduceProgress();
         }
+        
+        public Counters getCounters() {
+          return status.getCounters();
+        }
     }
 
     JobSubmissionProtocol jobSubmitClient;
@@ -597,6 +601,7 @@
           throw new IOException("Job failed!");
         }
         LOG.info("Job complete: " + jobId);
+        running.getCounters().log(LOG);
         error = false;
       } finally {
         if (error && (running != null)) {
@@ -604,6 +609,7 @@
         }
         jc.close();
       }
+      
     }
 
     private static void displayTaskLogs(String taskId, String baseUrl)

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?view=diff&rev=511075&r1=511074&r2=511075
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Fri Feb 23 12:13:00 2007
@@ -17,18 +17,27 @@
  */
 package org.apache.hadoop.mapred;
 
-import org.apache.commons.logging.*;
-
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.conf.*;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.WritableUtils;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.Vector;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobHistory.Values;
 import org.apache.hadoop.mapred.JobTracker.JobTrackerMetrics;
-import org.apache.hadoop.mapred.JobHistory.Values ; 
-import java.io.*;
-import java.net.*;
-import java.util.*;
+import org.apache.hadoop.metrics.MetricsContext;
+import org.apache.hadoop.metrics.MetricsRecord;
+import org.apache.hadoop.metrics.MetricsUtil;
 
 ///////////////////////////////////////////////////////
 // JobInProgress maintains all the info for keeping
@@ -74,6 +83,9 @@
 
     private LocalFileSystem localFs;
     private String uniqueString;
+    
+    private Counters counters = new Counters();
+    private MetricsRecord jobMetrics;
   
     /**
      * Create a JobInProgress with the given job file, plus a handle
@@ -113,6 +125,10 @@
         JobHistory.JobInfo.logSubmitted(jobid, conf.getJobName(), conf.getUser(), 
             System.currentTimeMillis(), jobFile); 
         
+        MetricsContext metricsContext = MetricsUtil.getContext("mapred");
+        this.jobMetrics = metricsContext.createRecord("job");
+        this.jobMetrics.setTag("user", conf.getUser());
+        this.jobMetrics.setTag("jobName", conf.getJobName());
     }
 
     /**
@@ -338,6 +354,28 @@
                            (progressDelta / reduces.length)));
           }
         }
+        
+        //
+        // Update counters by summing over all tasks in progress
+        //
+        Counters newCounters = new Counters();
+        for (TaskInProgress mapTask : maps) {
+          newCounters.incrAllCounters(mapTask.getCounters());
+        }
+        for (TaskInProgress reduceTask : reduces) {
+          newCounters.incrAllCounters(reduceTask.getCounters());
+        }
+        this.status.setCounters(newCounters);
+        
+        //
+        // Send counter data to the metrics package.
+        //
+        for (String counter : newCounters.getCounterNames()) {
+          long value = newCounters.getCounter(counter);
+          jobMetrics.setTag("counter", counter);
+          jobMetrics.setMetric("value", (float) value);
+          jobMetrics.update();
+        }
     }   
 
     /////////////////////////////////////////////////////
@@ -787,7 +825,8 @@
                                           TaskStatus.State.FAILED,
                                           reason,
                                           reason,
-                                          trackerName, phase);
+                                          trackerName, phase,
+                                          tip.getCounters());
        updateTaskStatus(tip, status, metrics);
        JobHistory.Task.logFailed(profile.getJobId(), tip.getTIPId(), 
            tip.isMapTask() ? Values.MAP.name() : Values.REDUCE.name(), 
@@ -831,8 +870,8 @@
     }
 
     /**
-      * Return the TaskInProgress that matches the tipid.
-      */
+     * Return the TaskInProgress that matches the tipid.
+     */
     public TaskInProgress getTaskInProgress(String tipid){
       for (int i = 0; i < maps.length; i++) {
         if (tipid.equals(maps[i].getTIPId())){

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobStatus.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobStatus.java?view=diff&rev=511075&r1=511074&r2=511075
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobStatus.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobStatus.java Fri Feb 23 12:13:00 2007
@@ -17,9 +17,14 @@
  */
 package org.apache.hadoop.mapred;
 
-import org.apache.hadoop.io.*;
-
-import java.io.*;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.UTF8;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableFactories;
+import org.apache.hadoop.io.WritableFactory;
 
 /**************************************************
  * Describes the current status of a job.  This is
@@ -49,6 +54,7 @@
     private int runState;
     private long startTime;
     private String user;
+    private Counters counters = new Counters();
     /**
      */
     public JobStatus() {
@@ -133,6 +139,16 @@
       * @return the username of the job
       */
     public String getUsername() { return this.user;};
+    
+    /**
+     * @param counters Counters for the job.
+     */
+    void setCounters(Counters counters) { this.counters = counters; }
+    /**
+     * @return the counters for the job
+     */
+    public Counters getCounters() { return counters; }
+    
     ///////////////////////////////////////
     // Writable
     ///////////////////////////////////////
@@ -143,6 +159,7 @@
         out.writeInt(runState);
         out.writeLong(startTime);
         UTF8.writeString(out, user);
+        counters.write(out);
     }
     public void readFields(DataInput in) throws IOException {
         this.jobid = UTF8.readString(in);
@@ -151,5 +168,6 @@
         this.runState = in.readInt();
         this.startTime = in.readLong();
         this.user = UTF8.readString(in);
+        counters.readFields(in);
     }
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?view=diff&rev=511075&r1=511074&r2=511075
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Fri Feb 23 12:13:00 2007
@@ -18,22 +18,36 @@
 package org.apache.hadoop.mapred;
 
 
-import org.apache.commons.logging.*;
-
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.ipc.*;
-import org.apache.hadoop.conf.*;
-import org.apache.hadoop.util.*;
-
-import java.io.*;
-import java.net.*;
+import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.text.NumberFormat;
-import java.util.*;
-
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.Vector;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.metrics.MetricsContext;
 import org.apache.hadoop.metrics.MetricsRecord;
 import org.apache.hadoop.metrics.MetricsUtil;
-import org.apache.hadoop.metrics.MetricsContext;
 import org.apache.hadoop.metrics.Updater;
+import org.apache.hadoop.util.HostsFileReader;
+import org.apache.hadoop.util.StringUtils;
 
 /*******************************************************
  * JobTracker is the central location for submitting and 
@@ -1485,18 +1499,36 @@
     }
     
     /** Get all the TaskStatuses from the tipid. */
-    TaskStatus[] getTaskStatuses(String jobid, String tipid){
-	JobInProgress job = (JobInProgress) jobs.get(jobid);
-	if (job == null){
-	    return new TaskStatus[0];
-	}
-	TaskInProgress tip = (TaskInProgress) job.getTaskInProgress(tipid);
-	if (tip == null){
-	    return new TaskStatus[0];
-	}
-	return tip.getTaskStatuses();
+    TaskStatus[] getTaskStatuses(String jobid, String tipid) {
+      TaskInProgress tip = getTip(jobid, tipid);
+      return (tip == null ? new TaskStatus[0] 
+             : tip.getTaskStatuses());
     }
 
+    /** Returns the TaskStatus for a particular taskid. */
+    TaskStatus getTaskStatus(String jobid, String tipid, String taskid) {
+      TaskInProgress tip = getTip(jobid, tipid);
+      return (tip == null ? null 
+             : tip.getTaskStatus(taskid));
+    }
+    
+    /**
+     * Returns the counters for the specified task in progress.
+     */
+    Counters getTipCounters(String jobid, String tipid) {
+      TaskInProgress tip = getTip(jobid, tipid);
+      return (tip == null ? null : tip.getCounters());
+    }
+    
+    /**
+     * Returns specified TaskInProgress, or null.
+     */
+    private TaskInProgress getTip(String jobid, String tipid) {
+      JobInProgress job = (JobInProgress) jobs.get(jobid);
+      return (job == null ? null 
+             : (TaskInProgress) job.getTaskInProgress(tipid));
+    }
+    
     /**
      * Get tracker name for a given task id.
      * @param taskId the name of the task

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java?view=diff&rev=511075&r1=511074&r2=511075
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java Fri Feb 23 12:13:00 2007
@@ -18,13 +18,16 @@
 
 package org.apache.hadoop.mapred;
 
-import java.io.*;
-import java.util.*;
-
-import org.apache.commons.logging.*;
-
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.conf.*;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.*;
 import org.apache.hadoop.mapred.JobTracker.JobTrackerMetrics;
 
@@ -58,6 +61,10 @@
     private JobProfile profile;
     private Path localFile;
     private FileSystem localFs;
+    
+    // Contains the counters summed over all the tasks which
+    // have successfully completed
+    private Counters counters = new Counters();
 
     public long getProtocolVersion(String protocol, long clientVersion) {
       return TaskUmbilicalProtocol.versionID;
@@ -116,6 +123,7 @@
           map.run(localConf, this);
           myMetrics.completeMap();
           map_tasks -= 1;
+          updateCounters(map);
         }
 
         // move map output to reduce input
@@ -144,6 +152,7 @@
           reduce.run(localConf, this);
           myMetrics.completeReduce();
           reduce_tasks -= 1;
+          updateCounters(reduce);
         }
         this.mapoutputFile.removeAll(reduceId);
         
@@ -162,7 +171,7 @@
         }
       }
     }
-
+    
     private String newId() {
       return Integer.toString(Math.abs(random.nextInt()),36);
     }
@@ -172,7 +181,7 @@
     public Task getTask(String taskid) { return null; }
 
     public void progress(String taskId, float progress, String state, 
-                         TaskStatus.Phase phase) {
+                         TaskStatus.Phase phase, Counters taskStats) {
       LOG.info(state);
       float taskIndex = mapIds.indexOf(taskId);
       if (taskIndex >= 0) {                       // mapping
@@ -183,6 +192,30 @@
       }
       
       // ignore phase
+      updateStatusCounters(taskStats);
+    }
+    
+    /**
+     * Updates counters corresponding to completed tasks.
+     * @param task A map or reduce task which has just been 
+     * successfully completed
+     */ 
+    private void updateCounters(Task task) {
+      counters.incrAllCounters(task.getCounters());
+      status.setCounters(counters);
+    }
+
+    /**
+     * Sets status counters to the sum of (1) the counters from
+     * all completed tasks, and (2) the counters from a particular
+     * task in progress.
+     * @param taskCounters Counters from a task that is in progress
+     */
+    private void updateStatusCounters(Counters taskCounters) {
+      Counters newStats = new Counters();
+      newStats.incrAllCounters(counters);
+      newStats.incrAllCounters(taskCounters);
+      status.setCounters(newStats);
     }
 
     public void reportDiagnosticInfo(String taskid, String trace) {

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java?view=diff&rev=511075&r1=511074&r2=511075
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java Fri Feb 23 12:13:00 2007
@@ -18,34 +18,41 @@
 
 package org.apache.hadoop.mapred;
 
-import java.io.*;
-import java.util.*;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 
-import org.apache.hadoop.io.*;
-import org.apache.hadoop.io.SequenceFile.CompressionType;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.DefaultCodec;
-import org.apache.hadoop.io.SequenceFile.Sorter;
-import org.apache.hadoop.io.SequenceFile.Sorter.*;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.metrics.MetricsRecord;
-
-import org.apache.commons.logging.*;
-import org.apache.hadoop.metrics.MetricsUtil;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.SequenceFile.Sorter;
+import org.apache.hadoop.io.SequenceFile.Sorter.RawKeyValueIterator;
+import org.apache.hadoop.io.SequenceFile.Sorter.SegmentDescriptor;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.mapred.ReduceTask.ValuesIterator;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.mapred.ReduceTask.ValuesIterator;
-import org.apache.hadoop.metrics.MetricsContext;
-import org.apache.hadoop.metrics.Updater;
 
 
 /** A Map task. */
 class MapTask extends Task {
-  
-  private MapTaskMetrics myMetrics = null;
 
   private BytesWritable split = new BytesWritable();
   private String splitClass;
@@ -58,28 +65,11 @@
     setPhase(TaskStatus.Phase.MAP); 
   }
   
-  private class MapTaskMetrics {
-    private MetricsRecord mapInputMetrics = null;
-    private MetricsRecord mapOutputMetrics = null;
-    
-    MapTaskMetrics(String user) {
-        MetricsContext context = MetricsUtil.getContext("mapred");
-        mapInputMetrics = MetricsUtil.createRecord(context, "mapInput", "user", user);
-        mapOutputMetrics = MetricsUtil.createRecord(context, "mapOutput", "user", user);
-    }
-    
-    synchronized void mapInput(int numBytes) {
-        mapInputMetrics.incrMetric("map_input_records", 1);
-        mapInputMetrics.incrMetric("map_input_bytes", numBytes);
-        mapInputMetrics.update();
-    }
-    
-    synchronized void mapOutput(int numBytes) {
-        mapOutputMetrics.incrMetric("map_output_records", 1);
-        mapOutputMetrics.incrMetric("map_output_bytes", numBytes);
-        mapOutputMetrics.update();
-    }
-    
+  private enum Counter { 
+    INPUT_RECORDS, 
+    INPUT_BYTES, 
+    OUTPUT_RECORDS,
+    OUTPUT_BYTES
   }
 
   public MapTask() {}
@@ -121,16 +111,12 @@
     super.readFields(in);
     splitClass = Text.readString(in);
     split.readFields(in);
-    if (myMetrics == null) {
-        myMetrics = new MapTaskMetrics("unknown");
-    }
   }
 
   public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)
     throws IOException {
 
-    myMetrics = new MapTaskMetrics(job.getUser());
-    Reporter reporter = getReporter(umbilical, getProgress());
+    final Reporter reporter = getReporter(umbilical);
 
     MapOutputBuffer collector = new MapOutputBuffer(umbilical, job, reporter);
     
@@ -175,7 +161,8 @@
           setProgress(getProgress());
           long beforePos = getPos();
           boolean ret = rawIn.next(key, value);
-          myMetrics.mapInput((int)(getPos() - beforePos));
+          reporter.incrCounter(Counter.INPUT_RECORDS, 1);
+          reporter.incrCounter(Counter.INPUT_BYTES, (getPos() - beforePos));
           return ret;
         }
         public long getPos() throws IOException { return rawIn.getPos(); }
@@ -337,7 +324,9 @@
         int partNumber = partitioner.getPartition(key, value, partitions);
         sortImpl[partNumber].addKeyValue(keyOffset, keyLength, valLength);
 
-        myMetrics.mapOutput((int)(keyValBuffer.getLength() - keyOffset));
+        reporter.incrCounter(Counter.OUTPUT_RECORDS, 1);
+        reporter.incrCounter(Counter.OUTPUT_BYTES,
+                             (keyValBuffer.getLength() - keyOffset));
 
         //now check whether we need to spill to disk
         long totalMem = 0;

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java?view=diff&rev=511075&r1=511074&r2=511075
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Fri Feb 23 12:13:00 2007
@@ -18,19 +18,31 @@
 
 package org.apache.hadoop.mapred;
 
-import org.apache.hadoop.io.*;
-import org.apache.hadoop.conf.*;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.metrics.MetricsUtil;
-import org.apache.hadoop.util.*;
-
-import java.io.*;
-import java.util.*;
-import java.text.*;
-
-import org.apache.hadoop.metrics.ContextFactory;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.text.NumberFormat;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.io.WritableFactories;
+import org.apache.hadoop.io.WritableFactory;
 import org.apache.hadoop.metrics.MetricsContext;
 import org.apache.hadoop.metrics.MetricsRecord;
+import org.apache.hadoop.metrics.MetricsUtil;
+import org.apache.hadoop.util.Progress;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringUtils;
 
 /** A Reduce task. */
 class ReduceTask extends Task {
@@ -43,27 +55,7 @@
        });
   }
 
-  private class ReduceTaskMetrics {
-    private final MetricsRecord inputMetrics, outputMetrics;
-    
-    ReduceTaskMetrics(String user) {
-        MetricsContext context = MetricsUtil.getContext("mapred");
-        inputMetrics = MetricsUtil.createRecord(context, "reduceInput", "user", user);
-        outputMetrics = MetricsUtil.createRecord(context, "reduceOutput", "user", user);
-    }
-    
-    synchronized void reduceInput() {
-        inputMetrics.incrMetric("reduce_input_records", 1);
-        inputMetrics.update();
-    }
-    
-    synchronized void reduceOutput() {
-        outputMetrics.incrMetric("reduce_output_records", 1);
-        outputMetrics.update();
-    }
-  }
-  
-  private ReduceTaskMetrics myMetrics = null;
+  private enum Counter { INPUT_RECORDS, OUTPUT_RECORDS }
   
   private int numMaps;
   private boolean sortComplete;
@@ -115,9 +107,6 @@
     super.readFields(in);
 
     numMaps = in.readInt();
-    if (myMetrics == null) {
-        myMetrics = new ReduceTaskMetrics("unknown");
-    }
   }
 
   /** Iterates values while keys match in sorted input. */
@@ -224,7 +213,6 @@
 
   public void run(JobConf job, final TaskUmbilicalProtocol umbilical)
     throws IOException {
-    myMetrics = new ReduceTaskMetrics(job.getUser());
     Class valueClass = job.getMapOutputValueClass();
     Reducer reducer = (Reducer)ReflectionUtils.newInstance(
                                   job.getReducerClass(), job);
@@ -289,7 +277,7 @@
     sortPhase.complete();                         // sort is complete
     setPhase(TaskStatus.Phase.REDUCE); 
 
-    Reporter reporter = getReporter(umbilical, getProgress());
+    final Reporter reporter = getReporter(umbilical);
     
     // make output collector
     String finalName = getOutputName(getPartition());
@@ -308,7 +296,7 @@
         public void collect(WritableComparable key, Writable value)
           throws IOException {
           out.write(key, value);
-          myMetrics.reduceOutput();
+          reporter.incrCounter(Counter.OUTPUT_RECORDS, 1);
           reportProgress(umbilical);
         }
       };
@@ -321,7 +309,7 @@
                                   keyClass, valClass, umbilical, job);
       values.informReduceProgress();
       while (values.more()) {
-        myMetrics.reduceInput();
+        reporter.incrCounter(Counter.INPUT_RECORDS, 1);
         reducer.reduce(values.getKey(), values, collector, reporter);
         values.nextKey();
         values.informReduceProgress();

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java?view=diff&rev=511075&r1=511074&r2=511075
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java Fri Feb 23 12:13:00 2007
@@ -405,8 +405,9 @@
     this.lastPollTime = 0;
 
     MetricsContext metricsContext = MetricsUtil.getContext("mapred");
-    this.shuffleMetrics = MetricsUtil.createRecord(
-      metricsContext, "shuffleInput", "user", conf.getUser());
+    this.shuffleMetrics = 
+        MetricsUtil.createRecord(metricsContext, "shuffleInput");
+    this.shuffleMetrics.setTag("user", conf.getUser());
   }
 
   /** Assemble all of the map output files */

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Reporter.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Reporter.java?view=diff&rev=511075&r1=511074&r2=511075
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Reporter.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Reporter.java Fri Feb 23 12:13:00 2007
@@ -19,6 +19,7 @@
 package org.apache.hadoop.mapred;
 
 import java.io.IOException;
+
 import org.apache.hadoop.util.Progressable;
 
 /** Passed to application code to permit alteration of status. */
@@ -30,9 +31,10 @@
   public static final Reporter NULL = new Reporter() {
     public void setStatus(String s) {
     }
-
     public void progress() throws IOException {
     }
+    public void incrCounter(Enum key, long amount) {
+    }
   };
 
   /**
@@ -41,5 +43,14 @@
    * @param status
    *          a brief description of the current status
    */
-  void setStatus(String status) throws IOException;
+  public abstract void setStatus(String status) throws IOException;
+  
+  /**
+   * Increments the counter identified by the key, which can be of
+   * any enum type, by the specified amount.
+   * @param key A value of any enum type
+   * @param amount A non-negative amount by which the counter is to 
+   * be incremented
+   */
+  public abstract void incrCounter(Enum key, long amount);
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/RunningJob.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/RunningJob.java?view=diff&rev=511075&r1=511074&r2=511075
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/RunningJob.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/RunningJob.java Fri Feb 23 12:13:00 2007
@@ -80,4 +80,5 @@
     public TaskCompletionEvent[] getTaskCompletionEvents(
         int startFrom) throws IOException;
     
+    public Counters getCounters();
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java?view=diff&rev=511075&r1=511074&r2=511075
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java Fri Feb 23 12:13:00 2007
@@ -18,13 +18,17 @@
 
 package org.apache.hadoop.mapred;
 
-import org.apache.commons.logging.*;
-
-import org.apache.hadoop.io.*;
-import org.apache.hadoop.conf.*;
-import org.apache.hadoop.util.*;
-
-import java.io.*;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.io.UTF8;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.Progress;
+import org.apache.hadoop.util.StringUtils;
 
 /** Base class for tasks. */
 abstract class Task implements Writable, Configurable {
@@ -40,7 +44,8 @@
   private String jobId;                           // unique jobid
   private String tipId ;
   private int partition;                          // id within job
-  private TaskStatus.Phase phase ;                         // current phase of the task 
+  private TaskStatus.Phase phase ;                // current phase of the task 
+  
 
   ////////////////////////////////////////////
   // Constructors
@@ -64,6 +69,7 @@
   public String getJobFile() { return jobFile; }
   public String getTaskId() { return taskId; }
   public String getTipId(){ return tipId ; }
+  public Counters getCounters() { return counters; }
   
   /**
    * Get the job name for this task.
@@ -142,26 +148,36 @@
   /** The number of milliseconds between progress reports. */
   public static final int PROGRESS_INTERVAL = 1000;
 
-  private volatile Progress taskProgress = new Progress();
+  private transient Progress taskProgress = new Progress();
   private transient long nextProgressTime =
     System.currentTimeMillis() + PROGRESS_INTERVAL;
 
+  private transient Counters counters = new Counters();
+  
   public abstract boolean isMapTask();
 
   public Progress getProgress() { return taskProgress; }
 
-  protected Reporter getReporter(final TaskUmbilicalProtocol umbilical,
-                              final Progress progress) throws IOException {
+  protected Reporter getReporter(final TaskUmbilicalProtocol umbilical) 
+    throws IOException 
+  {
     return new Reporter() {
         public void setStatus(String status) throws IOException {
           synchronized (this) {
-            progress.setStatus(status);
+            taskProgress.setStatus(status);
             progress();
           }
         }
         public void progress() throws IOException {
             reportProgress(umbilical);
         }
+        public void incrCounter(Enum key, long amount) {
+            Counters counters = getCounters();
+            if (counters != null) {
+              String name = key.getDeclaringClass().getName()+"#"+key.toString();
+              counters.incrCounter(name, amount);
+            }
+          }
       };
   }
 
@@ -169,21 +185,15 @@
     taskProgress.set(progress);
   }
 
-  public void reportProgress(TaskUmbilicalProtocol umbilical, float progress)
-    throws IOException {
-    setProgress(progress);
-    reportProgress(umbilical);
-  }
-
   public void reportProgress(TaskUmbilicalProtocol umbilical) {
     long now = System.currentTimeMillis();
-    if (now > nextProgressTime)  {
-      synchronized (this) {
+    synchronized (this) {
+      if (now > nextProgressTime)  {
         nextProgressTime = now + PROGRESS_INTERVAL;
         float progress = taskProgress.get();
         String status = taskProgress.toString();
         try {
-          umbilical.progress(getTaskId(), progress, status, phase);
+          umbilical.progress(getTaskId(), progress, status, phase, counters);
         } catch (IOException ie) {
           LOG.warn(StringUtils.stringifyException(ie));
         }
@@ -199,7 +209,7 @@
         if (needProgress) {
           // send a final status report
           umbilical.progress(getTaskId(), taskProgress.get(), 
-                             taskProgress.toString(), phase);
+                             taskProgress.toString(), phase, counters);
           needProgress = false;
         }
         umbilical.done(getTaskId());

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java?view=diff&rev=511075&r1=511074&r2=511075
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java Fri Feb 23 12:13:00 2007
@@ -17,12 +17,19 @@
  */
 package org.apache.hadoop.mapred;
 
-import org.apache.commons.logging.*;
-import org.apache.hadoop.io.BytesWritable;
 
-import java.text.NumberFormat;
 import java.io.IOException;
-import java.util.*;
+import java.text.NumberFormat;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.BytesWritable;
 
 
 ////////////////////////////////////////////////////////
@@ -89,10 +96,13 @@
     /**
      * Map from taskId -> TaskStatus
      */
-    private TreeMap taskStatuses = new TreeMap();
+    private TreeMap<String,TaskStatus> taskStatuses = 
+      new TreeMap<String,TaskStatus>();
 
     private TreeSet machinesWhereFailed = new TreeSet();
     private TreeSet tasksReportedClosed = new TreeSet();
+    
+    private Counters counters = new Counters();
 
     /**
      * Constructor for MapTask
@@ -240,6 +250,13 @@
     public double getProgress() {
         return progress;
     }
+    
+    /**
+     * Get the task's counters
+     */
+    public Counters getCounters() {
+      return counters;
+    }
     /**
      * Returns whether a component task-thread should be 
      * closed because the containing JobInProgress has completed.
@@ -278,7 +295,7 @@
       TaskReport report = new TaskReport
       (getTIPId(), (float)progress, state,
           (String[])diagnostics.toArray(new String[diagnostics.size()]),
-          execStartTime, execFinishTime);
+          execStartTime, execFinishTime, counters);
       
       return report ;
     }
@@ -361,7 +378,7 @@
         // Note the failure and its location
         //
         LOG.info("Task '" + taskid + "' has been lost.");
-        TaskStatus status = (TaskStatus) taskStatuses.get(taskid);
+        TaskStatus status = taskStatuses.get(taskid);
         if (status != null) {
             status.setRunState(TaskStatus.State.FAILED);
             // tasktracker went down and failed time was not reported. 
@@ -392,7 +409,7 @@
      */
     void completedTask(String taskid) {
         LOG.info("Task '" + taskid + "' has completed.");
-        TaskStatus status = (TaskStatus) taskStatuses.get(taskid);
+        TaskStatus status = taskStatuses.get(taskid);
         status.setRunState(TaskStatus.State.SUCCEEDED);
         activeTasks.remove(taskid);
     }
@@ -421,9 +438,17 @@
      * Get the Status of the tasks managed by this TIP
      */
     public TaskStatus[] getTaskStatuses() {
-	return (TaskStatus[])taskStatuses.values().toArray(new TaskStatus[taskStatuses.size()]);
+	    return taskStatuses.values().toArray(new TaskStatus[taskStatuses.size()]);
     }
 
+    /**
+     * Get the status of the specified task
+     * @param taskid
+     * @return
+     */
+    public TaskStatus getTaskStatus(String taskid) {
+      return taskStatuses.get(taskid);
+    }
      /**
      * The TIP's been ordered kill()ed.
      */
@@ -460,22 +485,26 @@
         } else {
             double bestProgress = 0;
             String bestState = "";
+            Counters bestCounters = new Counters();
             for (Iterator it = taskStatuses.keySet().iterator(); it.hasNext(); ) {
                 String taskid = (String) it.next();
-                TaskStatus status = (TaskStatus) taskStatuses.get(taskid);
+                TaskStatus status = taskStatuses.get(taskid);
                 if (status.getRunState() == TaskStatus.State.SUCCEEDED) {
                     bestProgress = 1;
                     bestState = status.getStateString();
+                    bestCounters = status.getCounters();
                     break;
                 } else if (status.getRunState() == TaskStatus.State.RUNNING) {
                   if (status.getProgress() >= bestProgress) {
                     bestProgress = status.getProgress();
                     bestState = status.getStateString();
+                    bestCounters = status.getCounters();
                   }
                 }
             }
             this.progress = bestProgress;
             this.state = bestState;
+            this.counters = bestCounters;
         }
     }
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskReport.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskReport.java?view=diff&rev=511075&r1=511074&r2=511075
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskReport.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskReport.java Fri Feb 23 12:13:00 2007
@@ -17,9 +17,13 @@
  */
 package org.apache.hadoop.mapred;
 
-import org.apache.hadoop.io.*;
-
-import java.io.*;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
 
 /** A report on the state of a task. */
 public class TaskReport implements Writable {
@@ -29,17 +33,20 @@
   private String[] diagnostics;
   private long startTime ; 
   private long finishTime; 
+  private Counters counters;
 
   public TaskReport() {}
 
   TaskReport(String taskid, float progress, String state,
-             String[] diagnostics, long startTime, long finishTime) {
+             String[] diagnostics, long startTime, long finishTime,
+             Counters counters) {
     this.taskid = taskid;
     this.progress = progress;
     this.state = state;
     this.diagnostics = diagnostics;
     this.startTime = startTime ; 
     this.finishTime = finishTime ;
+    this.counters = counters;
   }
     
   /** The id of the task. */
@@ -50,6 +57,9 @@
   public String getState() { return state; }
   /** A list of error messages. */
   public String[] getDiagnostics() { return diagnostics; }
+  /** A table of counters. */
+  public Counters getCounters() { return counters; }
+  
   /**
    * Get finish time of task. 
    * @return 0, if finish time was not set else returns finish time.
@@ -90,6 +100,7 @@
     out.writeLong(startTime);
     out.writeLong(finishTime);
     WritableUtils.writeStringArray(out, diagnostics);
+    counters.write(out);
   }
 
   public void readFields(DataInput in) throws IOException {
@@ -100,5 +111,7 @@
     this.finishTime = in.readLong() ;
     
     diagnostics = WritableUtils.readStringArray(in);
+    counters = new Counters();
+    counters.readFields(in);
   }
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskStatus.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskStatus.java?view=diff&rev=511075&r1=511074&r2=511075
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskStatus.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskStatus.java Fri Feb 23 12:13:00 2007
@@ -17,9 +17,13 @@
  */
 package org.apache.hadoop.mapred;
 
-import org.apache.hadoop.io.*;
-
-import java.io.*;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.UTF8;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
 /**************************************************
  * Describes the current status of a task.  This is
  * not intended to be a comprehensive piece of data.
@@ -49,13 +53,14 @@
     private long sortFinishTime ; 
     
     private Phase phase = Phase.STARTING; 
+    private Counters counters;
 
     public TaskStatus() {}
 
     public TaskStatus(String taskid, boolean isMap, float progress,
                       State runState, String diagnosticInfo,
                       String stateString, String taskTracker,
-                      Phase phase) {
+                      Phase phase, Counters counters) {
         this.taskid = taskid;
         this.isMap = isMap;
         this.progress = progress;
@@ -64,6 +69,7 @@
         this.stateString = stateString;
         this.taskTracker = taskTracker;
         this.phase = phase ;
+        this.counters = counters;
     }
     
     public String getTaskId() { return taskid; }
@@ -176,6 +182,20 @@
     void setPhase(Phase p){
       this.phase = p ; 
     }
+    /**
+     * Get task's counters.
+     */
+    public Counters getCounters() {
+      return counters;
+    }
+    /**
+     * Set the task's counters.
+     * @param counters
+     */
+    public void setCounters(Counters counters) {
+      this.counters = counters;
+    }
+    
     //////////////////////////////////////////////
     // Writable
     //////////////////////////////////////////////
@@ -193,6 +213,7 @@
           out.writeLong(shuffleFinishTime);
           out.writeLong(sortFinishTime);
         }
+        counters.write(out);
     }
 
     public void readFields(DataInput in) throws IOException {
@@ -209,6 +230,8 @@
           shuffleFinishTime = in.readLong(); 
           sortFinishTime = in.readLong(); 
         }
+        counters = new Counters();
+        counters.readFields(in);
      }
 }
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?view=diff&rev=511075&r1=511074&r2=511075
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Fri Feb 23 12:13:00 2007
@@ -17,18 +17,22 @@
  */
  package org.apache.hadoop.mapred;
 
-import org.apache.commons.logging.*;
-
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.ipc.*;
-import org.apache.hadoop.metrics.MetricsUtil;
-import org.apache.hadoop.metrics.MetricsException;
-import org.apache.hadoop.util.*;
-import org.apache.hadoop.util.DiskChecker.DiskErrorException;
-
-import java.io.*;
-import java.net.*;
-import java.util.*;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.net.BindException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeMap;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.regex.Pattern;
@@ -38,10 +42,27 @@
 import javax.servlet.http.HttpServlet;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
-import org.apache.hadoop.metrics.MetricsContext;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.DF;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSError;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.metrics.MetricsContext;
+import org.apache.hadoop.metrics.MetricsException;
 import org.apache.hadoop.metrics.MetricsRecord;
+import org.apache.hadoop.metrics.MetricsUtil;
 import org.apache.hadoop.net.DNS;
+import org.apache.hadoop.util.DiskChecker;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.RunJar;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 
 /*******************************************************
  * TaskTracker is a process that starts and tracks MR Tasks
@@ -939,7 +960,8 @@
                 diagnosticInfo.toString(), 
                 "initializing",  
                  getName(), task.isMapTask()? TaskStatus.Phase.MAP:
-                   TaskStatus.Phase.SHUFFLE); 
+                   TaskStatus.Phase.SHUFFLE,
+                 task.getCounters()); 
             keepJobFiles = false;
             taskTimeout = (10 * 60 * 1000);
         }
@@ -1022,7 +1044,9 @@
          * The task is reporting its progress
          */
         public synchronized void reportProgress(float p, String state, 
-                                                TaskStatus.Phase newPhase) {
+                                                TaskStatus.Phase newPhase,
+                                                Counters counters) 
+        {
             LOG.info(task.getTaskId()+" "+p+"% "+state);
             this.progress = p;
             this.runstate = TaskStatus.State.RUNNING;
@@ -1038,6 +1062,7 @@
               this.taskStatus.setPhase(newPhase);
             }
             this.taskStatus.setStateString(state);
+            this.taskStatus.setCounters(counters);
         }
 
         /**
@@ -1272,11 +1297,12 @@
      */
     public synchronized void progress(String taskid, float progress, 
                                       String state, 
-                                      TaskStatus.Phase phase
+                                      TaskStatus.Phase phase,
+                                      Counters counters
                                       ) throws IOException {
         TaskInProgress tip = (TaskInProgress) tasks.get(taskid);
         if (tip != null) {
-          tip.reportProgress(progress, state, phase);
+          tip.reportProgress(progress, state, phase, counters);
         } else {
           LOG.warn("Progress from unknown child task: "+taskid+". Ignored.");
         }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java?view=diff&rev=511075&r1=511074&r2=511075
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java Fri Feb 23 12:13:00 2007
@@ -38,9 +38,10 @@
    * @param progress value between zero and one
    * @param state description of task's current state
    * @param phase current phase of the task.
+   * @param counters the counters for this task.
    */
   void progress(String taskid, float progress, String state, 
-                TaskStatus.Phase phase)
+                TaskStatus.Phase phase, Counters counters)
     throws IOException;
 
   /** Report error messages back to parent.  Calls should be sparing, since all

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/metrics/MetricsUtil.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/metrics/MetricsUtil.java?view=diff&rev=511075&r1=511074&r2=511075
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/metrics/MetricsUtil.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/metrics/MetricsUtil.java Fri Feb 23 12:13:00 2007
@@ -25,8 +25,6 @@
 
 /**
  * Utility class to simplify creation and reporting of hadoop metrics.
- * This class makes the simplifying assumption that each metrics record has
- * exactly one tag, which defaults to being the hostName.
  *
  * For examples of usage, see {@link org.apache.hadoop.dfs.DataNode}.
  * @see org.apache.hadoop.metrics.MetricsRecord
@@ -62,26 +60,6 @@
         }
         return metricsContext;
     }
-    
-    /**
-     * Utility method to create and return a new metrics record instance 
-     * within the given name and the specified tag.
-     *
-     * @param context the context
-     * @param recordName the name of the record
-     * @param tagName name of the tag field of metrics record
-     * @param tagValue value of the tag field
-     * @return newly created metrics record
-     */
-    public static MetricsRecord createRecord(MetricsContext context, 
-                                             String recordName,
-                                             String tagName, 
-                                             String tagValue) 
-    {
-        MetricsRecord metricsRecord = context.createRecord(recordName);
-        metricsRecord.setTag(tagName, tagValue);
-        return metricsRecord;
-    }
 
     /**
      * Utility method to create and return new metrics record instance within the
@@ -92,9 +70,11 @@
      * @return newly created metrics record
      */
     public static MetricsRecord createRecord(MetricsContext context, 
-                                            String recordName) 
+                                             String recordName) 
     {
-        return createRecord(context, recordName, "hostName", getHostName());
+      MetricsRecord metricsRecord = context.createRecord(recordName);
+      metricsRecord.setTag("hostName", getHostName());
+      return metricsRecord;        
     }
     
     /**

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/DFSCIOTest.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/DFSCIOTest.java?view=diff&rev=511075&r1=511074&r2=511075
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/DFSCIOTest.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/DFSCIOTest.java Fri Feb 23 12:13:00 2007
@@ -383,10 +383,7 @@
     else
       return;
     for( int i=0; i < nrFiles; i++)
-      ioer.doIO(new Reporter() {
-          public void setStatus(String status) throws IOException {}
-          public void progress() throws IOException {}
-        },
+      ioer.doIO(Reporter.NULL,
                 BASE_FILE_NAME+Integer.toString(i), 
                 MEGA*fileSize );
   }

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestDFSIO.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestDFSIO.java?view=diff&rev=511075&r1=511074&r2=511075
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestDFSIO.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestDFSIO.java Fri Feb 23 12:13:00 2007
@@ -290,10 +290,7 @@
     else
       return;
     for( int i=0; i < nrFiles; i++)
-      ioer.doIO(new Reporter() {
-          public void setStatus(String status) throws IOException {}
-          public void progress() throws IOException {}
-        },
+      ioer.doIO(Reporter.NULL,
                 BASE_FILE_NAME+Integer.toString(i), 
                 MEGA*fileSize );
   }

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileInputFilter.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileInputFilter.java?view=diff&rev=511075&r1=511074&r2=511075
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileInputFilter.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileInputFilter.java Fri Feb 23 12:13:00 2007
@@ -38,10 +38,7 @@
   private static final Path inDir = new Path(System.getProperty("test.build.data",".") + "/mapred");
   private static final Path inFile = new Path(inDir, "test.seq");
   private static final Random random = new Random(1);
-  private static final Reporter reporter = new Reporter() {
-      public void setStatus(String status) throws IOException {}
-      public void progress() throws IOException {}
-  };
+  private static final Reporter reporter = Reporter.NULL;
   
   static {
       job.setInputPath(inDir);

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileInputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileInputFormat.java?view=diff&rev=511075&r1=511074&r2=511075
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileInputFormat.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileInputFormat.java Fri Feb 23 12:13:00 2007
@@ -40,10 +40,7 @@
     Path dir = new Path(System.getProperty("test.build.data",".") + "/mapred");
     Path file = new Path(dir, "test.seq");
     
-    Reporter reporter = new Reporter() {
-        public void setStatus(String status) throws IOException {}
-        public void progress() throws IOException {}
-      };
+    Reporter reporter = Reporter.NULL;
     
     int seed = new Random().nextInt();
     //LOG.info("seed = "+seed);

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestTextInputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestTextInputFormat.java?view=diff&rev=511075&r1=511074&r2=511075
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestTextInputFormat.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestTextInputFormat.java Fri Feb 23 12:13:00 2007
@@ -51,10 +51,8 @@
     JobConf job = new JobConf();
     Path file = new Path(workDir, "test.txt");
 
-    Reporter reporter = new Reporter() {
-        public void setStatus(String status) throws IOException {}
-        public void progress() throws IOException {}
-      };
+    // A reporter that does nothing
+    Reporter reporter = Reporter.NULL;
     
     int seed = new Random().nextInt();
     LOG.info("seed = "+seed);
@@ -178,11 +176,7 @@
     stm.close();
   }
   
-  private static class VoidReporter implements Reporter {
-    public void progress() {}
-    public void setStatus(String msg) {}
-  }
-  private static final Reporter voidReporter = new VoidReporter();
+  private static final Reporter voidReporter = Reporter.NULL;
   
   private static List<Text> readSplit(InputFormat format, 
                                       InputSplit split, 

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/record/test/TestWritable.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/record/test/TestWritable.java?view=diff&rev=511075&r1=511074&r2=511075
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/record/test/TestWritable.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/record/test/TestWritable.java Fri Feb 23 12:13:00 2007
@@ -46,10 +46,8 @@
     Path dir = new Path(System.getProperty("test.build.data",".") + "/mapred");
     Path file = new Path(dir, "test.seq");
     
-    Reporter reporter = new Reporter() {
-        public void setStatus(String status) throws IOException {}
-        public void progress() throws IOException {}
-      };
+    // A reporter that does nothing
+    Reporter reporter = Reporter.NULL;
     
     int seed = new Random().nextInt();
     //LOG.info("seed = "+seed);

Modified: lucene/hadoop/trunk/src/webapps/job/jobdetails.jsp
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/webapps/job/jobdetails.jsp?view=diff&rev=511075&r1=511074&r2=511075
==============================================================================
--- lucene/hadoop/trunk/src/webapps/job/jobdetails.jsp (original)
+++ lucene/hadoop/trunk/src/webapps/job/jobdetails.jsp Fri Feb 23 12:13:00 2007
@@ -98,6 +98,16 @@
     printTaskSummary(out, jobId, "reduce", status.reduceProgress(),
                      job.getReduceTasks());
     out.print("</table>\n");
+    
+    Counters counters = status.getCounters();
+    out.println("<p/>");
+    out.println("<table border=2 cellpadding=\"5\" cellspacing=\"2\">");
+    out.println("<tr><th>Counter</th><th>Value</th></tr>");
+    for (String counter : counters.getCounterNames()) {
+      out.print("<tr><td>" + counter + "</td><td>" + counters.getCounter(counter) +
+                "</td></tr>\n");
+    }
+    out.print("</table>\n");
   }
 %>
 

Modified: lucene/hadoop/trunk/src/webapps/job/jobtasks.jsp
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/webapps/job/jobtasks.jsp?view=diff&rev=511075&r1=511074&r2=511075
==============================================================================
--- lucene/hadoop/trunk/src/webapps/job/jobtasks.jsp (original)
+++ lucene/hadoop/trunk/src/webapps/job/jobtasks.jsp Fri Feb 23 12:13:00 2007
@@ -55,14 +55,15 @@
     out.print("<h2>Tasks</h2>");
     out.print("<center>");
     out.print("<table border=2 cellpadding=\"5\" cellspacing=\"2\">");
-    out.print("<tr><td align=\"center\">Task</td><td>Complete</td><td>Status</td><td>Start Time</td><td>Finish Time</td><td>Errors</td></tr>");
+    out.print("<tr><td align=\"center\">Task</td><td>Complete</td><td>Status</td>" +
+              "<td>Start Time</td><td>Finish Time</td><td>Errors</td><td>Counters</td></tr>");
     if (end_index > report_len){
         end_index = report_len;
     }
     for (int i = start_index ; i < end_index; i++) {
           TaskReport report = reports[i];
           out.print("<tr><td><a href=\"taskdetails.jsp?jobid=" + jobid + 
-                    "&taskid=" + report.getTaskId() + "\">"  + 
+                    "&tipid=" + report.getTaskId() + "\">"  + 
                     report.getTaskId() + "</a></td>");
          out.print("<td>" + StringUtils.formatPercent(report.getProgress(),2) + 
                    "</td>");
@@ -71,10 +72,16 @@
          out.println("<td>" + StringUtils.getFormattedTimeWithDiff(dateFormat, 
              report.getFinishTime(), report.getStartTime()) + "</td>");
          String[] diagnostics = report.getDiagnostics();
+         out.print("<td><pre>");
          for (int j = 0; j < diagnostics.length ; j++) {
-                out.print("<td><pre>" + diagnostics[j] + "</pre></td>");
+             out.println(diagnostics[j]);
          }
-         out.print("</tr>\n");
+         out.println("</pre><br/></td>");
+         out.println("<td>" + 
+             "<a href=\"/taskstats.jsp?jobid=" + jobid + 
+             "&tipid=" + report.getTaskId() +
+             "\">" + report.getCounters().size() +
+             "</a></td></tr>");
     }
     out.print("</table>");
     out.print("</center>");

Modified: lucene/hadoop/trunk/src/webapps/job/taskdetails.jsp
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/webapps/job/taskdetails.jsp?view=diff&rev=511075&r1=511074&r2=511075
==============================================================================
--- lucene/hadoop/trunk/src/webapps/job/taskdetails.jsp (original)
+++ lucene/hadoop/trunk/src/webapps/job/taskdetails.jsp Fri Feb 23 12:13:00 2007
@@ -15,7 +15,7 @@
   String jobid = request.getParameter("jobid");
   JobTracker tracker = JobTracker.getTracker();
   JobInProgress job = (JobInProgress) tracker.getJob(jobid);
-  String tipid = request.getParameter("taskid");
+  String tipid = request.getParameter("tipid");
   TaskStatus[] ts = (job != null) ? tracker.getTaskStatuses(jobid, tipid): null;
 %>
 
@@ -44,7 +44,7 @@
   <%
 	}
   %>
-<td>Finish Time</td><td>Errors</td><td>Task Logs</td></tr>
+<td>Finish Time</td><td>Errors</td><td>Task Logs</td><td>Counters</td></tr>
   <%
     for (int i = 0; i < ts.length; i++) {
       TaskStatus status = ts[i];
@@ -101,7 +101,11 @@
         out.print("<a href=\"" + tailEightKBUrl + "\">Last 8KB</a><br/>");
         out.print("<a href=\"" + entireLogUrl + "\">All</a><br/>");
       }
-      out.println("</td></tr>\n");
+      out.println("</td><td>" + 
+                 "<a href=\"/taskstats.jsp?jobid=" + jobid + 
+                 "&tipid=" + tipid + "&taskid=" + status.getTaskId() + 
+                 "\">" + status.getCounters().size() +
+                 "</a></td></tr>");
     }
   }
   %>

Added: lucene/hadoop/trunk/src/webapps/job/taskstats.jsp
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/webapps/job/taskstats.jsp?view=auto&rev=511075
==============================================================================
--- lucene/hadoop/trunk/src/webapps/job/taskstats.jsp (added)
+++ lucene/hadoop/trunk/src/webapps/job/taskstats.jsp Fri Feb 23 12:13:00 2007
@@ -0,0 +1,64 @@
+<%@ page
+  contentType="text/html; charset=UTF-8"
+  import="javax.servlet.*"
+  import="javax.servlet.http.*"
+  import="java.io.*"
+  import="java.lang.String"
+  import="java.util.*"
+  import="org.apache.hadoop.mapred.*"
+  import="org.apache.hadoop.util.*"
+  import="java.text.SimpleDateFormat"  
+  import="org.apache.hadoop.util.*"
+%>
+<%
+  String jobid = request.getParameter("jobid");
+  JobTracker tracker = JobTracker.getTracker();
+  JobInProgress job = (JobInProgress) tracker.getJob(jobid);
+  String tipid = request.getParameter("tipid");
+  String taskid = request.getParameter("taskid");
+  Counters counters;
+  if (taskid == null) {
+      counters = tracker.getTipCounters(jobid, tipid);
+      taskid = tipid; // for page title etc
+  }
+  else {
+      TaskStatus taskStatus = tracker.getTaskStatus(jobid, tipid, taskid);
+      counters = taskStatus.getCounters();
+  }
+%>
+
+<html>
+<title>Counters for <%=taskid%></title>
+<body>
+<h1>Counters for <%=taskid%></h1>
+
+<hr>
+
+<%
+	if( counters == null ) {
+%>
+		<h3>No counter information found for this task</h3>
+<%
+	}else{    
+%>
+      <table border=2 cellpadding="5" cellspacing="2">
+          <tr><td align="center">Counter</td><td>Value</td></tr> 
+		  <%
+		    for (String counter : counters.getCounterNames()) {
+		      long value = counters.getCounter(counter);
+		      %>
+		      <tr><td><%=counter%></td><td><%=value%></td></tr>
+              <%
+		    }
+		  %>
+      </table>
+<%
+    }
+%>
+
+<hr>
+<a href="/jobdetails.jsp?jobid=<%=jobid%>">Go back to the job</a><br>
+<a href="/jobtracker.jsp">Go back to JobTracker</a><br>
+<a href="http://lucene.apache.org/hadoop">Hadoop</a>, 2006.<br>
+</body>
+</html>



Mime
View raw message