hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acmur...@apache.org
Subject svn commit: r1157290 [3/4] - in /hadoop/common/trunk/mapreduce: ./ src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/mapreduce/ src/java/org/apache/hadoop/mapreduce/counters/ src/java/org/apache/hadoop/mapreduce/jobhistory/ src/java/org/apa...
Date Fri, 12 Aug 2011 23:25:52 GMT
Modified: hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/Task.java?rev=1157290&r1=1157289&r2=1157290&view=diff
==============================================================================
--- hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/Task.java Fri Aug 12 23:25:51 2011
@@ -31,7 +31,6 @@ import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-
 import javax.crypto.SecretKey;
 
 import org.apache.commons.logging.Log;
@@ -54,6 +53,7 @@ import org.apache.hadoop.io.serializer.D
 import org.apache.hadoop.io.serializer.SerializationFactory;
 import org.apache.hadoop.mapred.IFile.Writer;
 import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.FileSystemCounter;
 import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.TaskCounter;
 import org.apache.hadoop.mapreduce.JobStatus;
@@ -819,37 +819,41 @@ abstract public class Task implements Wr
    * system and only creates the counters when they are needed.
    */
   class FileSystemStatisticUpdater {
-    private long prevReadBytes = 0;
-    private long prevWriteBytes = 0;
     private FileSystem.Statistics stats;
-    private Counters.Counter readCounter = null;
-    private Counters.Counter writeCounter = null;
-    private String[] counterNames;
+    private Counters.Counter readBytesCounter, writeBytesCounter,
+        readOpsCounter, largeReadOpsCounter, writeOpsCounter;
     
-    FileSystemStatisticUpdater(String uriScheme, FileSystem.Statistics stats) {
+    FileSystemStatisticUpdater(FileSystem.Statistics stats) {
       this.stats = stats;
-      this.counterNames = getFileSystemCounterNames(uriScheme);
     }
 
     void updateCounters() {
-      long newReadBytes = stats.getBytesRead();
-      long newWriteBytes = stats.getBytesWritten();
-      if (prevReadBytes != newReadBytes) {
-        if (readCounter == null) {
-          readCounter = counters.findCounter(FILESYSTEM_COUNTER_GROUP, 
-              counterNames[0]);
-        }
-        readCounter.increment(newReadBytes - prevReadBytes);
-        prevReadBytes = newReadBytes;
-      }
-      if (prevWriteBytes != newWriteBytes) {
-        if (writeCounter == null) {
-          writeCounter = counters.findCounter(FILESYSTEM_COUNTER_GROUP, 
-              counterNames[1]);
-        }
-        writeCounter.increment(newWriteBytes - prevWriteBytes);
-        prevWriteBytes = newWriteBytes;
+      String scheme = stats.getScheme();
+      if (readBytesCounter == null) {
+        readBytesCounter = counters.findCounter(scheme,
+            FileSystemCounter.BYTES_READ);
+      }
+      readBytesCounter.setValue(stats.getBytesRead());
+      if (writeBytesCounter == null) {
+        writeBytesCounter = counters.findCounter(scheme,
+            FileSystemCounter.BYTES_WRITTEN);
+      }
+      writeBytesCounter.setValue(stats.getBytesWritten());
+      if (readOpsCounter == null) {
+        readOpsCounter = counters.findCounter(scheme,
+            FileSystemCounter.READ_OPS);
+      }
+      readOpsCounter.setValue(stats.getReadOps());
+      if (largeReadOpsCounter == null) {
+        largeReadOpsCounter = counters.findCounter(scheme,
+            FileSystemCounter.LARGE_READ_OPS);
+      }
+      largeReadOpsCounter.setValue(stats.getLargeReadOps());
+      if (writeOpsCounter == null) {
+        writeOpsCounter = counters.findCounter(scheme,
+            FileSystemCounter.WRITE_OPS);
       }
+      writeOpsCounter.setValue(stats.getWriteOps());
     }
   }
   
@@ -864,7 +868,7 @@ abstract public class Task implements Wr
       String uriScheme = stat.getScheme();
       FileSystemStatisticUpdater updater = statisticUpdaters.get(uriScheme);
       if(updater==null) {//new FileSystem has been found in the cache
-        updater = new FileSystemStatisticUpdater(uriScheme, stat);
+        updater = new FileSystemStatisticUpdater(stat);
         statisticUpdaters.put(uriScheme, updater);
       }
       updater.updateCounters();      

Modified: hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/TaskInProgress.java?rev=1157290&r1=1157289&r2=1157290&view=diff
==============================================================================
--- hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/TaskInProgress.java (original)
+++ hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/TaskInProgress.java Fri Aug 12 23:25:51 2011
@@ -103,9 +103,9 @@ class TaskInProgress {
   private boolean jobCleanup = false; 
   private boolean jobSetup = false;
 
-  private static Enum CPU_COUNTER_KEY = TaskCounter.CPU_MILLISECONDS;
-  private static Enum VM_BYTES_KEY = TaskCounter.VIRTUAL_MEMORY_BYTES;
-  private static Enum PHYSICAL_BYTES_KEY = TaskCounter.PHYSICAL_MEMORY_BYTES;
+  static final Enum<?> CPU_COUNTER_KEY = TaskCounter.CPU_MILLISECONDS;
+  static final Enum<?> VM_BYTES_KEY = TaskCounter.VIRTUAL_MEMORY_BYTES;
+  static final Enum<?> PHYSICAL_BYTES_KEY = TaskCounter.PHYSICAL_MEMORY_BYTES;
    
   // The 'next' usable taskid of this tip
   int nextTaskId = 0;
@@ -222,13 +222,10 @@ class TaskInProgress {
   }
 
   private void updateProgressSplits(TaskStatus taskStatus) {
-    if (!taskStatus.getIncludeCounters()) {
-      return;
-    }
-
     double newProgress = taskStatus.getProgress();
 
     Counters counters = taskStatus.getCounters();
+    if (counters == null) return;
 
     TaskAttemptID statusAttemptID = taskStatus.getTaskID();
     ProgressSplitsBlock splitsBlock = getSplits(statusAttemptID);
@@ -1040,7 +1037,7 @@ class TaskInProgress {
           if (status.getProgress() >= bestProgress) {
             bestProgress = status.getProgress();
             bestState = status.getStateString();
-            if (status.getIncludeCounters()) {
+            if (status.getIncludeAllCounters()) {
               bestCounters = status.getCounters();
             } else {
               bestCounters = this.counters;

Modified: hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/TaskStatus.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/TaskStatus.java?rev=1157290&r1=1157289&r2=1157290&view=diff
==============================================================================
--- hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/TaskStatus.java (original)
+++ hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/TaskStatus.java Fri Aug 12 23:25:51 2011
@@ -66,7 +66,7 @@ public abstract class TaskStatus impleme
     
   private volatile Phase phase = Phase.STARTING; 
   private Counters counters;
-  private boolean includeCounters;
+  private boolean includeAllCounters;
   private SortedRanges.Range nextRecordRange = new SortedRanges.Range();
   
   // max task-status string size
@@ -100,7 +100,7 @@ public abstract class TaskStatus impleme
     this.taskTracker = taskTracker;
     this.phase = phase;
     this.counters = counters;
-    this.includeCounters = true;
+    this.includeAllCounters = true;
   }
   
   public TaskAttemptID getTaskID() { return taskid; }
@@ -311,12 +311,13 @@ public abstract class TaskStatus impleme
       this.runState == TaskStatus.State.KILLED_UNCLEAN));
   }
   
-  public boolean getIncludeCounters() {
-    return includeCounters; 
+  public boolean getIncludeAllCounters() {
+    return includeAllCounters;
   }
   
-  public void setIncludeCounters(boolean send) {
-    includeCounters = send;
+  public void setIncludeAllCounters(boolean send) {
+    includeAllCounters = send;
+    counters.setWriteAllCounters(send);
   }
   
   /**
@@ -465,11 +466,9 @@ public abstract class TaskStatus impleme
     WritableUtils.writeEnum(out, phase);
     out.writeLong(startTime);
     out.writeLong(finishTime);
-    out.writeBoolean(includeCounters);
+    out.writeBoolean(includeAllCounters);
     out.writeLong(outputSize);
-    if (includeCounters) {
-      counters.write(out);
-    }
+    counters.write(out);
     nextRecordRange.write(out);
   }
 
@@ -484,11 +483,9 @@ public abstract class TaskStatus impleme
     this.startTime = in.readLong(); 
     this.finishTime = in.readLong(); 
     counters = new Counters();
-    this.includeCounters = in.readBoolean();
+    this.includeAllCounters = in.readBoolean();
     this.outputSize = in.readLong();
-    if (includeCounters) {
-      counters.readFields(in);
-    }
+    counters.readFields(in);
     nextRecordRange.readFields(in);
   }
   

Modified: hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=1157290&r1=1157289&r2=1157290&view=diff
==============================================================================
--- hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/TaskTracker.java Fri Aug 12 23:25:51 2011
@@ -1617,13 +1617,13 @@ public class TaskTracker 
    */
   HeartbeatResponse transmitHeartBeat(long now) throws IOException {
     // Send Counters in the status once every COUNTER_UPDATE_INTERVAL
-    boolean sendCounters;
+    boolean sendAllCounters;
     if (now > (previousUpdate + COUNTER_UPDATE_INTERVAL)) {
-      sendCounters = true;
+      sendAllCounters = true;
       previousUpdate = now;
     }
     else {
-      sendCounters = false;
+      sendAllCounters = false;
     }
 
     // 
@@ -1636,7 +1636,7 @@ public class TaskTracker 
         status = new TaskTrackerStatus(taskTrackerName, localHostname, 
                                        httpPort, 
                                        cloneAndResetRunningTaskStatuses(
-                                         sendCounters), 
+                                         sendAllCounters),
                                        failures, 
                                        maxMapSlots,
                                        maxReduceSlots); 
@@ -3521,10 +3521,10 @@ public class TaskTracker 
     List<TaskStatus> result = new ArrayList<TaskStatus>(runningTasks.size());
     for(TaskInProgress tip: runningTasks.values()) {
       TaskStatus status = tip.getStatus();
-      status.setIncludeCounters(sendCounters);
+      status.setIncludeAllCounters(sendCounters);
       // send counters for finished or failed tasks and commit pending tasks
       if (status.getRunState() != TaskStatus.State.RUNNING) {
-        status.setIncludeCounters(true);
+        status.setIncludeAllCounters(true);
       }
       result.add((TaskStatus)status.clone());
       status.clearStatus();

Modified: hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/Counter.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/Counter.java?rev=1157290&r1=1157289&r2=1157290&view=diff
==============================================================================
--- hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/Counter.java (original)
+++ hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/Counter.java Fri Aug 12 23:25:51 2011
@@ -18,137 +18,58 @@
 
 package org.apache.hadoop.mapreduce;
 
-import java.io.IOException;
-import java.io.DataInput;
-import java.io.DataOutput;
-
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
 
 /**
  * A named counter that tracks the progress of a map/reduce job.
- * 
- * <p><code>Counters</code> represent global counters, defined either by the 
+ *
+ * <p><code>Counters</code> represent global counters, defined either by the
  * Map-Reduce framework or applications. Each <code>Counter</code> is named by
  * an {@link Enum} and has a long for the value.</p>
- * 
+ *
  * <p><code>Counters</code> are bunched into Groups, each comprising of
- * counters from a particular <code>Enum</code> class. 
+ * counters from a particular <code>Enum</code> class.
  */
 @InterfaceAudience.Public
 @InterfaceStability.Stable
-public class Counter implements Writable {
+public interface Counter extends Writable {
 
-  private String name;
-  private String displayName;
-  private long value = 0;
-    
-  protected Counter() { 
-  }
-
-  protected Counter(String name, String displayName) {
-    this.name = name;
-    this.displayName = displayName;
-  }
-  
-  /** Create a counter.
-   * @param name the name within the group's enum.
-   * @param displayName a name to be displayed.
-   * @param value the counter value.
-   */
-  public Counter(String name, String displayName, long value) {
-    this.name = name;
-    this.displayName = displayName;
-    this.value = value;
-  }
-  
+  /**
+   * Set the display name of the counter
+   * @param displayName of the counter
+   * @deprecated (and no-op by default)
+   */
   @Deprecated
-  protected synchronized void setDisplayName(String displayName) {
-    this.displayName = displayName;
-  }
-    
-  /**
-   * Read the binary representation of the counter
-   */
-  @Override
-  public synchronized void readFields(DataInput in) throws IOException {
-    name = Text.readString(in);
-    if (in.readBoolean()) {
-      displayName = Text.readString(in);
-    } else {
-      displayName = name;
-    }
-    value = WritableUtils.readVLong(in);
-  }
-    
-  /**
-   * Write the binary representation of the counter
-   */
-  @Override
-  public synchronized void write(DataOutput out) throws IOException {
-    Text.writeString(out, name);
-    boolean distinctDisplayName = ! name.equals(displayName);
-    out.writeBoolean(distinctDisplayName);
-    if (distinctDisplayName) {
-      Text.writeString(out, displayName);
-    }
-    WritableUtils.writeVLong(out, value);
-  }
-
-  public synchronized String getName() {
-    return name;
-  }
+  void setDisplayName(String displayName);
 
   /**
-   * Get the name of the counter.
+   * @return the name of the counter
+   */
+  String getName();
+
+  /**
+   * Get the display name of the counter.
    * @return the user facing name of the counter
    */
-  public synchronized String getDisplayName() {
-    return displayName;
-  }
-    
+  String getDisplayName();
+
   /**
    * What is the current value of this counter?
    * @return the current value
    */
-  public synchronized long getValue() {
-    return value;
-  }
+  long getValue();
 
   /**
    * Set this counter by the given value
    * @param value the value to set
    */
-  public synchronized void setValue(long value) {
-    this.value = value;
-  }
+  void setValue(long value);
 
   /**
    * Increment this counter by the given value
    * @param incr the value to increase this counter by
    */
-  public synchronized void increment(long incr) {
-    value += incr;
-  }
-
-  @Override
-  public synchronized boolean equals(Object genericRight) {
-    if (genericRight instanceof Counter) {
-      synchronized (genericRight) {
-        Counter right = (Counter) genericRight;
-        return name.equals(right.name) && 
-               displayName.equals(right.displayName) &&
-               value == right.value;
-      }
-    }
-    return false;
-  }
-  
-  @Override
-  public synchronized int hashCode() {
-    return name.hashCode() + displayName.hashCode();
-  }
+  void increment(long incr);
 }

Modified: hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/CounterGroup.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/CounterGroup.java?rev=1157290&r1=1157289&r2=1157290&view=diff
==============================================================================
--- hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/CounterGroup.java (original)
+++ hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/CounterGroup.java Fri Aug 12 23:25:51 2011
@@ -18,19 +18,9 @@
 
 package org.apache.hadoop.mapreduce;
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.MissingResourceException;
-import java.util.ResourceBundle;
-import java.util.TreeMap;
-
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapreduce.counters.CounterGroupBase;
 
 /**
  * A group of {@link Counter}s that logically belong together. Typically,
@@ -38,156 +28,6 @@ import org.apache.hadoop.io.WritableUtil
  */
 @InterfaceAudience.Public
 @InterfaceStability.Stable
-public class CounterGroup implements Writable, Iterable<Counter> {
-  private String name;
-  private String displayName;
-  private TreeMap<String, Counter> counters = new TreeMap<String, Counter>();
-  // Optional ResourceBundle for localization of group and counter names.
-  private ResourceBundle bundle = null;    
-  
-  /**
-   * Returns the specified resource bundle, or throws an exception.
-   * @throws MissingResourceException if the bundle isn't found
-   */
-  private static ResourceBundle getResourceBundle(String enumClassName) {
-    String bundleName = enumClassName.replace('$','_');
-    return ResourceBundle.getBundle(bundleName);
-  }
-
-  protected CounterGroup(String name) {
-    this.name = name;
-    try {
-      bundle = getResourceBundle(name);
-    }
-    catch (MissingResourceException neverMind) {
-    }
-    displayName = localize("CounterGroupName", name);
-  }
-  
-  /** Create a CounterGroup.
-   * @param name the name of the group's enum.
-   * @param displayName a name to be displayed for the group.
-   */
-  public CounterGroup(String name, String displayName) {
-    this.name = name;
-    this.displayName = displayName;
-  }
- 
-  /**
-   * Get the internal name of the group
-   * @return the internal name
-   */
-  public synchronized String getName() {
-    return name;
-  }
-  
-  /**
-   * Get the display name of the group.
-   * @return the human readable name
-   */
-  public synchronized String getDisplayName() {
-    return displayName;
-  }
-
-  /** Add a counter to this group. */
-  public synchronized void addCounter(Counter counter) {
-    counters.put(counter.getName(), counter);
-  }
-
-  /**
-   * Find a counter in a group.
-   * @param counterName the name of the counter
-   * @param displayName the display name of the counter
-   * @return the counter that was found or added
-   */
-  public Counter findCounter(String counterName, String displayName) {
-    Counter result = counters.get(counterName);
-    if (result == null) {
-      result = new Counter(counterName, displayName);
-      counters.put(counterName, result);
-    }
-    return result;
-  }
-
-  public synchronized Counter findCounter(String counterName) {
-    Counter result = counters.get(counterName);
-    if (result == null) {
-      String displayName = localize(counterName, counterName);
-      result = new Counter(counterName, displayName);
-      counters.put(counterName, result);
-    }
-    return result;
-  }
-  
-  public synchronized Iterator<Counter> iterator() {
-    return counters.values().iterator();
-  }
-
-  public synchronized void write(DataOutput out) throws IOException {
-    Text.writeString(out, displayName);
-    WritableUtils.writeVInt(out, counters.size());
-    for(Counter counter: counters.values()) {
-      counter.write(out);
-    }
-  }
-  
-  public synchronized void readFields(DataInput in) throws IOException {
-    displayName = Text.readString(in);
-    counters.clear();
-    int size = WritableUtils.readVInt(in);
-    for(int i=0; i < size; i++) {
-      Counter counter = new Counter();
-      counter.readFields(in);
-      counters.put(counter.getName(), counter);
-    }
-  }
-
-  /**
-   * Looks up key in the ResourceBundle and returns the corresponding value.
-   * If the bundle or the key doesn't exist, returns the default value.
-   */
-  private String localize(String key, String defaultValue) {
-    String result = defaultValue;
-    if (bundle != null) {
-      try {
-        result = bundle.getString(key);
-      }
-      catch (MissingResourceException mre) {
-      }
-    }
-    return result;
-  }
-
-  /**
-   * Returns the number of counters in this group.
-   */
-  public synchronized int size() {
-    return counters.size();
-  }
-
-  public synchronized boolean equals(Object genericRight) {
-    if (genericRight instanceof CounterGroup) {
-      Iterator<Counter> right = ((CounterGroup) genericRight).counters.
-                                       values().iterator();
-      Iterator<Counter> left = counters.values().iterator();
-      while (left.hasNext()) {
-        if (!right.hasNext() || !left.next().equals(right.next())) {
-          return false;
-        }
-      }
-      return !right.hasNext();
-    }
-    return false;
-  }
-
-  public synchronized int hashCode() {
-    return counters.hashCode();
-  }
-  
-  public synchronized void incrAllCounters(CounterGroup rightGroup) {
-    for(Counter right: rightGroup.counters.values()) {
-      Counter left = findCounter(right.getName(), right.getDisplayName());
-      left.increment(right.getValue());
-    }
-  }
+public interface CounterGroup extends CounterGroupBase<Counter> {
+  // essentially a typedef so user doesn't have to use generic syntax
 }

Modified: hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/Counters.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/Counters.java?rev=1157290&r1=1157289&r2=1157290&view=diff
==============================================================================
--- hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/Counters.java (original)
+++ hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/Counters.java Fri Aug 12 23:25:51 2011
@@ -17,200 +17,121 @@
  */
 package org.apache.hadoop.mapreduce;
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.IdentityHashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.TreeMap;
-
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-
+import org.apache.hadoop.mapreduce.counters.Limits;
+import org.apache.hadoop.mapreduce.counters.GenericCounter;
+import org.apache.hadoop.mapreduce.counters.AbstractCounterGroup;
+import org.apache.hadoop.mapreduce.counters.CounterGroupBase;
+import org.apache.hadoop.mapreduce.counters.FileSystemCounterGroup;
+import org.apache.hadoop.mapreduce.counters.AbstractCounters;
+import org.apache.hadoop.mapreduce.counters.CounterGroupFactory;
+import org.apache.hadoop.mapreduce.counters.FrameworkCounterGroup;
+
+/**
+ * <p><code>Counters</code> holds per job/task counters, defined either by the
+ * Map-Reduce framework or applications. Each <code>Counter</code> can be of
+ * any {@link Enum} type.</p>
+ *
+ * <p><code>Counters</code> are bunched into {@link CounterGroup}s, each
+ * comprising of counters from a particular <code>Enum</code> class.
+ */
 @InterfaceAudience.Public
 @InterfaceStability.Stable
-public class Counters implements Writable,Iterable<CounterGroup> {
-  /**
-   * A cache from enum values to the associated counter. Dramatically speeds up
-   * typical usage.
-   */
-  private Map<Enum<?>, Counter> cache = new IdentityHashMap<Enum<?>, Counter>();
+public class Counters extends AbstractCounters<Counter, CounterGroup> {
 
-  private TreeMap<String, CounterGroup> groups = 
-      new TreeMap<String, CounterGroup>();
-  
-  public Counters() {
-  }
-  
-  /**
-   * Utility method to  create a Counters object from the 
-   * org.apache.hadoop.mapred counters
-   * @param counters
-   */
-  public Counters(org.apache.hadoop.mapred.Counters counters) {
-    for(org.apache.hadoop.mapred.Counters.Group group: counters) {
-      String name = group.getName();
-      CounterGroup newGroup = new CounterGroup(name, group.getDisplayName());
-      groups.put(name, newGroup);
-      for(Counter counter: group) {
-        newGroup.addCounter(counter);
-      }
+  // Mix framework group implementation into CounterGroup interface
+  private static class FrameworkGroupImpl<T extends Enum<T>>
+      extends FrameworkCounterGroup<T, Counter> implements CounterGroup {
+
+    FrameworkGroupImpl(Class<T> cls) {
+      super(cls);
     }
-  }
 
-  /** Add a group. */
-  public void addGroup(CounterGroup group) {
-    groups.put(group.getName(), group);
+    @Override
+    protected FrameworkCounter newCounter(T key) {
+      return new FrameworkCounter(key);
+    }
   }
 
-  public Counter findCounter(String groupName, String counterName) {
-    CounterGroup grp = getGroup(groupName);
-    return grp.findCounter(counterName);
-  }
+  // Mix generic group implementation into CounterGroup interface
+  // and provide some mandatory group factory methods.
+  private static class GenericGroup extends AbstractCounterGroup<Counter>
+      implements CounterGroup {
 
-  /**
-   * Find the counter for the given enum. The same enum will always return the
-   * same counter.
-   * @param key the counter key
-   * @return the matching counter object
-   */
-  public synchronized Counter findCounter(Enum<?> key) {
-    Counter counter = cache.get(key);
-    if (counter == null) {
-      counter = findCounter(key.getDeclaringClass().getName(), key.toString());
-      cache.put(key, counter);
+    GenericGroup(String name, String displayName, Limits limits) {
+      super(name, displayName, limits);
     }
-    return counter;    
-  }
 
-  /**
-   * Returns the names of all counter classes.
-   * @return Set of counter names.
-   */
-  public synchronized Collection<String> getGroupNames() {
-    return groups.keySet();
-  }
+    @Override
+    protected Counter newCounter(String name, String displayName, long value) {
+      return new GenericCounter(name, displayName, value);
+    }
 
-  @Override
-  public Iterator<CounterGroup> iterator() {
-    return groups.values().iterator();
+    @Override
+    protected Counter newCounter() {
+      return new GenericCounter();
+    }
   }
 
-  /**
-   * Returns the named counter group, or an empty group if there is none
-   * with the specified name.
-   */
-  public synchronized CounterGroup getGroup(String groupName) {
-    CounterGroup grp = groups.get(groupName);
-    if (grp == null) {
-      grp = new CounterGroup(groupName);
-      groups.put(groupName, grp);
+  // Mix file system group implementation into the CounterGroup interface
+  private static class FileSystemGroup extends FileSystemCounterGroup<Counter>
+      implements CounterGroup {
+
+    @Override
+    protected Counter newCounter(String scheme, FileSystemCounter key) {
+      return new FSCounter(scheme, key);
     }
-    return grp;
   }
 
   /**
-   * Returns the total number of counters, by summing the number of counters
-   * in each group.
+   * Provide factory methods for counter group factory implementation.
+   * See also the GroupFactory in
+   *  {@link org.apache.hadoop.mapred.Counters mapred.Counters}
    */
-  public synchronized  int countCounters() {
-    int result = 0;
-    for (CounterGroup group : this) {
-      result += group.size();
+  private static class GroupFactory
+      extends CounterGroupFactory<Counter, CounterGroup> {
+
+    @Override
+    protected <T extends Enum<T>>
+    FrameworkGroupFactory<CounterGroup>
+        newFrameworkGroupFactory(final Class<T> cls) {
+      return new FrameworkGroupFactory<CounterGroup>() {
+        @Override public CounterGroup newGroup(String name) {
+          return new FrameworkGroupImpl<T>(cls); // impl in this package
+        }
+      };
     }
-    return result;
-  }
 
-  /**
-   * Write the set of groups.
-   * The external format is:
-   *     #groups (groupName group)*
-   *
-   * i.e. the number of groups followed by 0 or more groups, where each 
-   * group is of the form:
-   *
-   *     groupDisplayName #counters (false | true counter)*
-   *
-   * where each counter is of the form:
-   *
-   *     name (false | true displayName) value
-   */
-  @Override
-  public synchronized void write(DataOutput out) throws IOException {
-    out.writeInt(groups.size());
-    for (org.apache.hadoop.mapreduce.CounterGroup group: groups.values()) {
-      Text.writeString(out, group.getName());
-      group.write(out);
+    @Override
+    protected CounterGroup newGenericGroup(String name, String displayName,
+                                           Limits limits) {
+      return new GenericGroup(name, displayName, limits);
     }
-  }
-  
-  /**
-   * Read a set of groups.
-   */
-  @Override
-  public synchronized void readFields(DataInput in) throws IOException {
-    int numClasses = in.readInt();
-    groups.clear();
-    while (numClasses-- > 0) {
-      String groupName = Text.readString(in);
-      CounterGroup group = new CounterGroup(groupName);
-      group.readFields(in);
-      groups.put(groupName, group);
+
+    @Override
+    protected CounterGroup newFileSystemGroup() {
+      return new FileSystemGroup();
     }
   }
 
+  private static final GroupFactory groupFactory = new GroupFactory();
+
   /**
-   * Return textual representation of the counter values.
+   * Default constructor
    */
-  public synchronized String toString() {
-    StringBuilder sb = new StringBuilder("Counters: " + countCounters());
-    for (CounterGroup group: this) {
-      sb.append("\n\t" + group.getDisplayName());
-      for (Counter counter: group) {
-        sb.append("\n\t\t" + counter.getDisplayName() + "=" + 
-                  counter.getValue());
-      }
-    }
-    return sb.toString();
+  public Counters() {
+    super(groupFactory);
   }
 
   /**
-   * Increments multiple counters by their amounts in another Counters 
-   * instance.
-   * @param other the other Counters instance
+   * Construct the Counters object from the another counters object
+   * @param <C> the type of counter
+   * @param <G> the type of counter group
+   * @param counters the old counters object
    */
-  public synchronized void incrAllCounters(Counters other) {
-    for(Map.Entry<String, CounterGroup> rightEntry: other.groups.entrySet()) {
-      CounterGroup left = groups.get(rightEntry.getKey());
-      CounterGroup right = rightEntry.getValue();
-      if (left == null) {
-        left = new CounterGroup(right.getName(), right.getDisplayName());
-        groups.put(rightEntry.getKey(), left);
-      }
-      left.incrAllCounters(right);
-    }
-  }
-
-  public boolean equals(Object genericRight) {
-    if (genericRight instanceof Counters) {
-      Iterator<CounterGroup> right = ((Counters) genericRight).groups.
-                                       values().iterator();
-      Iterator<CounterGroup> left = groups.values().iterator();
-      while (left.hasNext()) {
-        if (!right.hasNext() || !left.next().equals(right.next())) {
-          return false;
-        }
-      }
-      return !right.hasNext();
-    }
-    return false;
-  }
-  
-  public int hashCode() {
-    return groups.hashCode();
+  public <C extends Counter, G extends CounterGroupBase<C>>
+  Counters(AbstractCounters<C, G> counters) {
+    super(counters, groupFactory);
   }
 }

Added: hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/FileSystemCounter.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/FileSystemCounter.java?rev=1157290&view=auto
==============================================================================
--- hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/FileSystemCounter.java (added)
+++ hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/FileSystemCounter.java Fri Aug 12 23:25:51 2011
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+@InterfaceAudience.Private
+public enum FileSystemCounter {
+  BYTES_READ,
+  BYTES_WRITTEN,
+  READ_OPS,
+  LARGE_READ_OPS,
+  WRITE_OPS,
+}

Added: hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/FileSystemCounter.properties
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/FileSystemCounter.properties?rev=1157290&view=auto
==============================================================================
--- hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/FileSystemCounter.properties (added)
+++ hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/FileSystemCounter.properties Fri Aug 12 23:25:51 2011
@@ -0,0 +1,21 @@
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+
+# ResourceBundle properties file for job-level counters
+
+CounterGroupName=     File System Counters
+
+BYTES_READ.name=      Number of bytes read
+BYTES_WRITTEN.name=   Number of bytes written
+READ_OPS.name=        Number of read operations
+LARGE_READ_OPS.name=  Number of large read operations
+WRITE_OPS.name=       Number of write operations

Modified: hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/JobCounter.properties
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/JobCounter.properties?rev=1157290&r1=1157289&r2=1157290&view=diff
==============================================================================
--- hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/JobCounter.properties (original)
+++ hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/JobCounter.properties Fri Aug 12 23:25:51 2011
@@ -21,5 +21,7 @@ TOTAL_LAUNCHED_REDUCES.name=       Launc
 OTHER_LOCAL_MAPS.name=             Other local map tasks
 DATA_LOCAL_MAPS.name=              Data-local map tasks
 RACK_LOCAL_MAPS.name=              Rack-local map tasks
+SLOTS_MILLIS_MAPS.name=            Total time spent by all maps in occupied slots (ms)
+SLOTS_MILLIS_REDUCES.name=         Total time spent by all reduces in occupied slots (ms)
 FALLOW_SLOTS_MILLIS_MAPS.name=     Total time spent by all maps waiting after reserving slots (ms)
 FALLOW_SLOTS_MILLIS_REDUCES.name=  Total time spent by all reduces waiting after reserving slots (ms)

Modified: hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/MRJobConfig.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/MRJobConfig.java?rev=1157290&r1=1157289&r2=1157290&view=diff
==============================================================================
--- hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/MRJobConfig.java (original)
+++ hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/MRJobConfig.java Fri Aug 12 23:25:51 2011
@@ -275,4 +275,16 @@ public interface MRJobConfig {
     "mapreduce.job.submithostname";
   public static final String JOB_SUBMITHOSTADDR =
     "mapreduce.job.submithostaddress";
+
+  public static final String COUNTERS_MAX_KEY = "mapreduce.job.counters.max";
+  public static final int COUNTERS_MAX_DEFAULT = 120;
+
+  public static final String COUNTER_GROUP_NAME_MAX_KEY = "mapreduce.job.counters.group.name.max";
+  public static final int COUNTER_GROUP_NAME_MAX_DEFAULT = 128;
+
+  public static final String COUNTER_NAME_MAX_KEY = "mapreduce.job.counters.counter.name.max";
+  public static final int COUNTER_NAME_MAX_DEFAULT = 64;
+
+  public static final String COUNTER_GROUPS_MAX_KEY = "mapreduce.job.counters.groups.max";
+  public static final int COUNTER_GROUPS_MAX_DEFAULT = 50;
 }

Modified: hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/TaskCounter.properties
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/TaskCounter.properties?rev=1157290&r1=1157289&r2=1157290&view=diff
==============================================================================
--- hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/TaskCounter.properties (original)
+++ hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/TaskCounter.properties Fri Aug 12 23:25:51 2011
@@ -27,9 +27,13 @@ REDUCE_INPUT_RECORDS.name=     Reduce in
 REDUCE_OUTPUT_RECORDS.name=    Reduce output records
 REDUCE_SKIPPED_RECORDS.name=   Reduce skipped records
 REDUCE_SKIPPED_GROUPS.name=    Reduce skipped groups
+SPLIT_RAW_BYTES.name=          Input split bytes
 SPILLED_RECORDS.name=          Spilled Records
 SHUFFLED_MAPS.name=            Shuffled Maps 
 FAILED_SHUFFLE.name=           Failed Shuffles
 MERGED_MAP_OUTPUTS.name=       Merged Map outputs
 GC_TIME_MILLIS.name=           GC time elapsed (ms)
 COMMITTED_HEAP_BYTES.name=     Total committed heap usage (bytes)
+CPU_MILLISECONDS.name=         CPU time spent (ms)
+PHYSICAL_MEMORY_BYTES.name=    Physical memory (bytes) snapshot
+VIRTUAL_MEMORY_BYTES.name=     Virtual memory (bytes) snapshot

Added: hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/counters/AbstractCounter.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/counters/AbstractCounter.java?rev=1157290&view=auto
==============================================================================
--- hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/counters/AbstractCounter.java (added)
+++ hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/counters/AbstractCounter.java Fri Aug 12 23:25:51 2011
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.counters;
+
+import com.google.common.base.Objects;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.mapreduce.Counter;
+
+/**
+ * An abstract counter class to provide common implementation of
+ * the counter interface in both mapred and mapreduce packages.
+ */
+@InterfaceAudience.Private
+public abstract class AbstractCounter implements Counter {
+
+  @Override @Deprecated
+  public void setDisplayName(String name) {}
+
+  @Override
+  public synchronized boolean equals(Object genericRight) {
+    if (genericRight instanceof Counter) {
+      synchronized (genericRight) {
+        Counter right = (Counter) genericRight;
+        return getName().equals(right.getName()) &&
+               getDisplayName().equals(right.getDisplayName()) &&
+               getValue() == right.getValue();
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public synchronized int hashCode() {
+    return Objects.hashCode(getName(), getDisplayName(), getValue());
+  }
+}

Added: hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/counters/AbstractCounterGroup.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/counters/AbstractCounterGroup.java?rev=1157290&view=auto
==============================================================================
--- hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/counters/AbstractCounterGroup.java (added)
+++ hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/counters/AbstractCounterGroup.java Fri Aug 12 23:25:51 2011
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.counters;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Maps;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.util.ResourceBundles;
+
+/**
+ * An abstract class to provide common implementation of the
+ * generic counter group in both mapred and mapreduce package.
+ *
+ * @param <T> type of the counter for the group
+ */
+@InterfaceAudience.Private
+public abstract class AbstractCounterGroup<T extends Counter>
+    implements CounterGroupBase<T> {
+
+  private final String name;
+  private String displayName;
+  private final Map<String, T> counters = Maps.newTreeMap();
+  private final Limits limits;
+
+  public AbstractCounterGroup(String name, String displayName,
+                              Limits limits) {
+    this.name = name;
+    this.displayName = displayName;
+    this.limits = limits;
+  }
+
+  @Override
+  public synchronized String getName() {
+    return name;
+  }
+
+  @Override
+  public synchronized String getDisplayName() {
+    return displayName;
+  }
+
+  @Override
+  public synchronized void setDisplayName(String displayName) {
+    this.displayName = displayName;
+  }
+
+  @Override
+  public synchronized void addCounter(T counter) {
+    counters.put(counter.getName(), counter);
+    limits.incrCounters();
+  }
+
+  @Override
+  public synchronized T addCounter(String counterName, String displayName,
+                                   long value) {
+    String saveName = limits.filterCounterName(counterName);
+    T counter = findCounterImpl(saveName, false);
+    if (counter == null) {
+      return addCounterImpl(saveName, displayName, value);
+    }
+    counter.setValue(value);
+    return counter;
+  }
+
+  private T addCounterImpl(String name, String displayName, long value) {
+    T counter = newCounter(name, displayName, value);
+    addCounter(counter);
+    return counter;
+  }
+
+  @Override
+  public T findCounter(String counterName, String displayName) {
+    String saveName = limits.filterCounterName(counterName);
+    T counter = findCounterImpl(saveName, false);
+    if (counter == null) {
+      return addCounterImpl(saveName, displayName, 0);
+    }
+    return counter;
+  }
+
+  @Override
+  public synchronized T findCounter(String counterName, boolean create) {
+    return findCounterImpl(limits.filterCounterName(counterName), create);
+  }
+
+  private T findCounterImpl(String counterName, boolean create) {
+    T counter = counters.get(counterName);
+    if (counter == null && create) {
+      String localized =
+          ResourceBundles.getCounterName(getName(), counterName, counterName);
+      return addCounterImpl(counterName, localized, 0);
+    }
+    return counter;
+  }
+
+  @Override
+  public T findCounter(String counterName) {
+    return findCounter(counterName, true);
+  }
+
+  /**
+   * Abstract factory method to create a new counter of type T
+   * @param counterName of the counter
+   * @param displayName of the counter
+   * @param value of the counter
+   * @return a new counter
+   */
+  protected abstract T newCounter(String counterName, String displayName,
+                                  long value);
+
+  /**
+   * Abstract factory method to create a new counter of type T
+   * @return a new counter object
+   */
+  protected abstract T newCounter();
+
+  @Override
+  public synchronized Iterator<T> iterator() {
+    return counters.values().iterator();
+  }
+
+  /**
+   * GenericGroup ::= displayName #counter counter*
+   */
+  @Override
+  public synchronized void write(DataOutput out) throws IOException {
+    Text.writeString(out, displayName);
+    WritableUtils.writeVInt(out, counters.size());
+    for(Counter counter: counters.values()) {
+      counter.write(out);
+    }
+  }
+
+  @Override
+  public synchronized void readFields(DataInput in) throws IOException {
+    displayName = Text.readString(in);
+    counters.clear();
+    int size = WritableUtils.readVInt(in);
+    for (int i = 0; i < size; i++) {
+      T counter = newCounter();
+      counter.readFields(in);
+      counters.put(counter.getName(), counter);
+      limits.incrCounters();
+    }
+  }
+
+  @Override
+  public synchronized int size() {
+    return counters.size();
+  }
+
+  @Override
+  public synchronized boolean equals(Object genericRight) {
+    if (genericRight instanceof CounterGroupBase<?>) {
+      @SuppressWarnings("unchecked")
+      CounterGroupBase<T> right = (CounterGroupBase<T>) genericRight;
+      return Iterators.elementsEqual(iterator(), right.iterator());
+    }
+    return false;
+  }
+
+  @Override
+  public synchronized int hashCode() {
+    return counters.hashCode();
+  }
+
+  @Override
+  public void incrAllCounters(CounterGroupBase<T> rightGroup) {
+    try {
+      for (Counter right : rightGroup) {
+        Counter left = findCounter(right.getName(), right.getDisplayName());
+        left.increment(right.getValue());
+      }
+    } catch (LimitExceededException e) {
+      counters.clear();
+      throw e;
+    }
+  }
+}

Added: hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/counters/AbstractCounters.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/counters/AbstractCounters.java?rev=1157290&view=auto
==============================================================================
--- hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/counters/AbstractCounters.java (added)
+++ hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/counters/AbstractCounters.java Fri Aug 12 23:25:51 2011
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.counters;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Maps;
+
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.FileSystemCounter;
+import org.apache.hadoop.mapreduce.JobCounter;
+import org.apache.hadoop.mapreduce.TaskCounter;
+import static org.apache.hadoop.mapreduce.counters.CounterGroupFactory.*;
+
+/**
+ * An abstract class to provide common implementation for the Counters
+ * container in both mapred and mapreduce packages.
+ *
+ * @param <C> type of counter inside the counters
+ * @param <G> type of group inside the counters
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public abstract class AbstractCounters<C extends Counter,
+                                       G extends CounterGroupBase<C>>
+    implements Writable, Iterable<G> {
+
+  protected static final Log LOG = LogFactory.getLog("mapreduce.Counters");
+
+  /**
+   * A cache from enum values to the associated counter.
+   */
+  private Map<Enum<?>, C> cache = Maps.newIdentityHashMap();
+  private Map<String, G> fgroups = Maps.newTreeMap(); // framework & fs groups
+  private Map<String, G> groups = Maps.newTreeMap();  // other groups
+  private final CounterGroupFactory<C, G> groupFactory;
+
+  // For framework counter serialization without strings
+  enum GroupType { FRAMEWORK, FILESYSTEM };
+
+  // Writes only framework and fs counters if false.
+  private boolean writeAllCounters = true;
+
+  private static final Map<String, String> legacyMap = Maps.newHashMap();
+  static {
+    legacyMap.put("org.apache.hadoop.mapred.Task$Counter",
+                  TaskCounter.class.getName());
+    legacyMap.put("org.apache.hadoop.mapred.JobInProgress$Counter",
+                  JobCounter.class.getName());
+  }
+
+  private final Limits limits = new Limits();
+
+  @InterfaceAudience.Private
+  public AbstractCounters(CounterGroupFactory<C, G> gf) {
+    groupFactory = gf;
+  }
+
+  /**
+   * Construct from another counters object.
+   * @param <C1> type of the other counter
+   * @param <G1> type of the other counter group
+   * @param counters the counters object to copy
+   * @param groupFactory the factory for new groups
+   */
+  @InterfaceAudience.Private
+  public <C1 extends Counter, G1 extends CounterGroupBase<C1>>
+  AbstractCounters(AbstractCounters<C1, G1> counters,
+                   CounterGroupFactory<C, G> groupFactory) {
+    this.groupFactory = groupFactory;
+    for(G1 group: counters) {
+      String name = group.getName();
+      G newGroup = groupFactory.newGroup(name, group.getDisplayName(), limits);
+      (isFrameworkGroup(name) ? fgroups : groups).put(name, newGroup);
+      for(Counter counter: group) {
+        newGroup.addCounter(counter.getName(), counter.getDisplayName(),
+                            counter.getValue());
+      }
+    }
+  }
+
+  /** Add a group.
+   * @param group object to add
+   * @return the group
+   */
+  @InterfaceAudience.Private
+  public synchronized G addGroup(G group) {
+    String name = group.getName();
+    if (isFrameworkGroup(name)) {
+      fgroups.put(name, group);
+    } else {
+      limits.checkGroups(groups.size() + 1);
+      groups.put(name, group);
+    }
+    return group;
+  }
+
+  /**
+   * Add a new group
+   * @param name of the group
+   * @param displayName of the group
+   * @return the group
+   */
+  @InterfaceAudience.Private
+  public G addGroup(String name, String displayName) {
+    return addGroup(groupFactory.newGroup(name, displayName, limits));
+  }
+
+  /**
+   * Find a counter, create one if necessary
+   * @param groupName of the counter
+   * @param counterName name of the counter
+   * @return the matching counter
+   */
+  public C findCounter(String groupName, String counterName) {
+    G grp = getGroup(groupName);
+    return grp.findCounter(counterName);
+  }
+
+  /**
+   * Find the counter for the given enum. The same enum will always return the
+   * same counter.
+   * @param key the counter key
+   * @return the matching counter object
+   */
+  public synchronized C findCounter(Enum<?> key) {
+    C counter = cache.get(key);
+    if (counter == null) {
+      counter = findCounter(key.getDeclaringClass().getName(), key.name());
+      cache.put(key, counter);
+    }
+    return counter;
+  }
+
+  /**
+   * Find the file system counter for the given scheme and enum.
+   * @param scheme of the file system
+   * @param key the enum of the counter
+   * @return the file system counter
+   */
+  @InterfaceAudience.Private
+  public synchronized C findCounter(String scheme, FileSystemCounter key) {
+    return ((FileSystemCounterGroup<C>) getGroup(
+        FileSystemCounter.class.getName())).findCounter(scheme, key);
+  }
+
+  /**
+   * Returns the names of all counter classes.
+   * @return Set of counter names.
+   */
+  public synchronized Iterable<String> getGroupNames() {
+    return Iterables.concat(fgroups.keySet(), groups.keySet());
+  }
+
+  @Override
+  public Iterator<G> iterator() {
+    return Iterators.concat(fgroups.values().iterator(),
+                            groups.values().iterator());
+  }
+
+  /**
+   * Returns the named counter group, or an empty group if there is none
+   * with the specified name.
+   * @param groupName name of the group
+   * @return the group
+   */
+  public synchronized G getGroup(String groupName) {
+    boolean isFGroup = isFrameworkGroup(groupName);
+    G group = isFGroup ? fgroups.get(groupName) : groups.get(groupName);
+    if (group == null) {
+      group = groupFactory.newGroup(filterGroupName(groupName), limits);
+      if (isFGroup) {
+        fgroups.put(groupName, group);
+      } else {
+        limits.checkGroups(groups.size() + 1);
+        groups.put(groupName, group);
+      }
+    }
+    return group;
+  }
+
+  private String filterGroupName(String oldName) {
+    String newName = legacyMap.get(oldName);
+    if (newName == null) {
+      return limits.filterGroupName(oldName);
+    }
+    LOG.warn("Group "+ oldName +" is deprecated. Use "+ newName +" instead");
+    return newName;
+  }
+
+  /**
+   * Returns the total number of counters, by summing the number of counters
+   * in each group.
+   * @return the total number of counters
+   */
+  public synchronized int countCounters() {
+    int result = 0;
+    for (G group : this) {
+      result += group.size();
+    }
+    return result;
+  }
+
+  /**
+   * Write the set of groups.
+   * Counters ::= version #fgroups (groupId, group)* #groups (group)*
+   */
+  @Override
+  public synchronized void write(DataOutput out) throws IOException {
+    WritableUtils.writeVInt(out, groupFactory.version());
+    WritableUtils.writeVInt(out, fgroups.size());  // framework groups first
+    for (G group : fgroups.values()) {
+      if (group instanceof FrameworkCounterGroup<?, ?>) {
+        WritableUtils.writeVInt(out, GroupType.FRAMEWORK.ordinal());
+        WritableUtils.writeVInt(out, getFrameworkGroupId(group.getName()));
+        group.write(out);
+      } else if (group instanceof FileSystemCounterGroup<?>) {
+        WritableUtils.writeVInt(out, GroupType.FILESYSTEM.ordinal());
+        group.write(out);
+      }
+    }
+    if (writeAllCounters) {
+      WritableUtils.writeVInt(out, groups.size());
+      for (G group : groups.values()) {
+        Text.writeString(out, group.getName());
+        group.write(out);
+      }
+    } else {
+      WritableUtils.writeVInt(out, 0);
+    }
+  }
+
+  @Override
+  public synchronized void readFields(DataInput in) throws IOException {
+    int version = WritableUtils.readVInt(in);
+    if (version != groupFactory.version()) {
+      throw new IOException("Counters version mismatch, expected "+
+          groupFactory.version() +" got "+ version);
+    }
+    int numFGroups = WritableUtils.readVInt(in);
+    fgroups.clear();
+    GroupType[] groupTypes = GroupType.values();
+    while (numFGroups-- > 0) {
+      GroupType groupType = groupTypes[WritableUtils.readVInt(in)];
+      G group;
+      switch (groupType) {
+        case FILESYSTEM: // with nothing
+          group = groupFactory.newFileSystemGroup();
+          break;
+        case FRAMEWORK:  // with group id
+          group = groupFactory.newFrameworkGroup(WritableUtils.readVInt(in));
+          break;
+        default: // Silence dumb compiler, as it would've thrown earlier
+          throw new IOException("Unexpected counter group type: "+ groupType);
+      }
+      group.readFields(in);
+      fgroups.put(group.getName(), group);
+    }
+    int numGroups = WritableUtils.readVInt(in);
+    while (numGroups-- > 0) {
+      limits.checkGroups(groups.size() + 1);
+      G group = groupFactory.newGenericGroup(Text.readString(in), null, limits);
+      group.readFields(in);
+      groups.put(group.getName(), group);
+    }
+  }
+
+  /**
+   * Return textual representation of the counter values.
+   * @return the string
+   */
+  @Override
+  public synchronized String toString() {
+    StringBuilder sb = new StringBuilder("Counters: " + countCounters());
+    for (G group: this) {
+      sb.append("\n\t").append(group.getDisplayName());
+      for (Counter counter: group) {
+        sb.append("\n\t\t").append(counter.getDisplayName()).append("=")
+          .append(counter.getValue());
+      }
+    }
+    return sb.toString();
+  }
+
+  /**
+   * Increments multiple counters by their amounts in another Counters
+   * instance.
+   * @param other the other Counters instance
+   */
+  public synchronized void incrAllCounters(AbstractCounters<C, G> other) {
+    for(G right : other) {
+      G left = groups.get(right.getName());
+      if (left == null) {
+        limits.checkGroups(groups.size() + 1);
+        left = groupFactory.newGroup(right.getName(), right.getDisplayName(),
+                                     limits);
+        groups.put(right.getName(), left);
+      }
+      left.incrAllCounters(right);
+    }
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public boolean equals(Object genericRight) {
+    if (genericRight instanceof AbstractCounters<?, ?>) {
+      return Iterators.elementsEqual(iterator(),
+          ((AbstractCounters<C, G>)genericRight).iterator());
+    }
+    return false;
+  }
+
+  @Override
+  public int hashCode() {
+    return groups.hashCode();
+  }
+
+  /**
+   * Set the "writeAllCounters" option to true or false
+   * @param send  if true all counters would be serialized, otherwise only
+   *              framework counters would be serialized in
+   *              {@link #write(DataOutput)}
+   */
+  @InterfaceAudience.Private
+  public void setWriteAllCounters(boolean send) {
+    writeAllCounters = send;
+  }
+
+  /**
+   * Get the "writeAllCounters" option
+   * @return true of all counters would serialized
+   */
+  @InterfaceAudience.Private
+  public boolean getWriteAllCounters() {
+    return writeAllCounters;
+  }
+
+  @InterfaceAudience.Private
+  public Limits limits() {
+    return limits;
+  }
+}

Added: hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/counters/CounterGroupBase.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/counters/CounterGroupBase.java?rev=1157290&view=auto
==============================================================================
--- hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/counters/CounterGroupBase.java (added)
+++ hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/counters/CounterGroupBase.java Fri Aug 12 23:25:51 2011
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.counters;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Counter;
+
+/**
+ * The common counter group interface.
+ *
+ * @param <T> type of the counter for the group
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public interface CounterGroupBase<T extends Counter>
+    extends Writable, Iterable<T> {
+
+  /**
+   * Get the internal name of the group
+   * @return the internal name
+   */
+  String getName();
+
+  /**
+   * Get the display name of the group.
+   * @return the human readable name
+   */
+  String getDisplayName();
+
+  /**
+   * Set the display name of the group
+   * @param displayName of the group
+   */
+  void setDisplayName(String displayName);
+
+  /** Add a counter to this group.
+   * @param counter to add
+   */
+  void addCounter(T counter);
+
+  /**
+   * Add a counter to this group
+   * @param name  of the counter
+   * @param displayName of the counter
+   * @param value of the counter
+   * @return the counter
+   */
+  T addCounter(String name, String displayName, long value);
+
+  /**
+   * Find a counter in the group.
+   * @param counterName the name of the counter
+   * @param displayName the display name of the counter
+   * @return the counter that was found or added
+   */
+  T findCounter(String counterName, String displayName);
+
+  /**
+   * Find a counter in the group
+   * @param counterName the name of the counter
+   * @param create create the counter if not found if true
+   * @return the counter that was found or added or null if create is false
+   */
+  T findCounter(String counterName, boolean create);
+
+  /**
+   * Find a counter in the group.
+   * @param counterName the name of the counter
+   * @return the counter that was found or added
+   */
+  T findCounter(String counterName);
+
+  /**
+   * @return the number of counters in this group.
+   */
+  int size();
+
+  /**
+   * Increment all counters by a group of counters
+   * @param rightGroup  the group to be added to this group
+   */
+  void incrAllCounters(CounterGroupBase<T> rightGroup);
+}

Added: hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/counters/CounterGroupFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/counters/CounterGroupFactory.java?rev=1157290&view=auto
==============================================================================
--- hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/counters/CounterGroupFactory.java (added)
+++ hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/counters/CounterGroupFactory.java Fri Aug 12 23:25:51 2011
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.counters;
+
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.FileSystemCounter;
+import org.apache.hadoop.mapreduce.JobCounter;
+import org.apache.hadoop.mapreduce.TaskCounter;
+import org.apache.hadoop.mapreduce.util.ResourceBundles;
+
+/**
+ * An abstract class to provide common implementation of the
+ * group factory in both mapred and mapreduce packages.
+ *
+ * @param <C> type of the counter
+ * @param <G> type of the group
+ */
+@InterfaceAudience.Private
+public abstract class CounterGroupFactory<C extends Counter,
+                                          G extends CounterGroupBase<C>> {
+
+  public interface FrameworkGroupFactory<F> {
+    F newGroup(String name);
+  }
+
+  // Integer mapping (for serialization) for framework groups
+  private static final Map<String, Integer> s2i = Maps.newHashMap();
+  private static final List<String> i2s = Lists.newArrayList();
+  private static final int VERSION = 1;
+  private static final String FS_GROUP_NAME = FileSystemCounter.class.getName();
+
+  private final Map<String, FrameworkGroupFactory<G>> fmap = Maps.newHashMap();
+  {
+    // Add builtin counter class here and the version when changed.
+    addFrameworkGroup(TaskCounter.class);
+    addFrameworkGroup(JobCounter.class);
+  }
+
+  // Initialize the framework counter group mapping
+  private synchronized <T extends Enum<T>>
+  void addFrameworkGroup(final Class<T> cls) {
+    updateFrameworkGroupMapping(cls);
+    fmap.put(cls.getName(), newFrameworkGroupFactory(cls));
+  }
+
+  // Update static mappings (c2i, i2s) of framework groups
+  private static synchronized void updateFrameworkGroupMapping(Class<?> cls) {
+    String name = cls.getName();
+    Integer i = s2i.get(name);
+    if (i != null) return;
+    i2s.add(name);
+    s2i.put(name, i2s.size() - 1);
+  }
+
+  /**
+   * Required override to return a new framework group factory
+   * @param <T> type of the counter enum class
+   * @param cls the counter enum class
+   * @return a new framework group factory
+   */
+  protected abstract <T extends Enum<T>>
+  FrameworkGroupFactory<G> newFrameworkGroupFactory(Class<T> cls);
+
+  /**
+   * Create a new counter group
+   * @param name of the group
+   * @param limits the counters limits policy object
+   * @return a new counter group
+   */
+  public G newGroup(String name, Limits limits) {
+    return newGroup(name, ResourceBundles.getCounterGroupName(name, name),
+                    limits);
+  }
+
+  /**
+   * Create a new counter group
+   * @param name of the group
+   * @param displayName of the group
+   * @param limits the counters limits policy object
+   * @return a new counter group
+   */
+  public G newGroup(String name, String displayName, Limits limits) {
+    FrameworkGroupFactory<G> gf = fmap.get(name);
+    if (gf != null) return gf.newGroup(name);
+    if (name.equals(FS_GROUP_NAME)) {
+      return newFileSystemGroup();
+    }
+    return newGenericGroup(name, displayName, limits);
+  }
+
+  /**
+   * Create a new framework group
+   * @param id of the group
+   * @return a new framework group
+   */
+  public G newFrameworkGroup(int id) {
+    String name;
+    synchronized(CounterGroupFactory.class) {
+      if (id < 0 || id >= i2s.size()) throwBadFrameGroupIdException(id);
+      name = i2s.get(id); // should not throw here.
+    }
+    FrameworkGroupFactory<G> gf = fmap.get(name);
+    if (gf == null) throwBadFrameGroupIdException(id);
+    return gf.newGroup(name);
+  }
+
+  /**
+   * Get the id of a framework group
+   * @param name of the group
+   * @return the framework group id
+   */
+  public static synchronized int getFrameworkGroupId(String name) {
+    Integer i = s2i.get(name);
+    if (i == null) throwBadFrameworkGroupNameException(name);
+    return i;
+  }
+
+  /**
+   * @return the counter factory version
+   */
+  public int version() {
+    return VERSION;
+  }
+
+  /**
+   * Check whether a group name is a name of a framework group (including
+   * the filesystem group).
+   *
+   * @param name  to check
+   * @return true for framework group names
+   */
+  public static synchronized boolean isFrameworkGroup(String name) {
+    return s2i.get(name) != null || name.equals(FS_GROUP_NAME);
+  }
+
+  private static void throwBadFrameGroupIdException(int id) {
+    throw new IllegalArgumentException("bad framework group id: "+ id);
+  }
+
+  private static void throwBadFrameworkGroupNameException(String name) {
+    throw new IllegalArgumentException("bad framework group name: "+ name);
+  }
+
+  /**
+   * Abstract factory method to create a generic (vs framework) counter group
+   * @param name  of the group
+   * @param displayName of the group
+   * @param limits limits of the counters
+   * @return a new generic counter group
+   */
+  protected abstract G newGenericGroup(String name, String displayName,
+                                       Limits limits);
+
+  /**
+   * Abstract factory method to create a file system counter group
+   * @return a new file system counter group
+   */
+  protected abstract G newFileSystemGroup();
+}

Added: hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/counters/FileSystemCounterGroup.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/counters/FileSystemCounterGroup.java?rev=1157290&view=auto
==============================================================================
--- hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/counters/FileSystemCounterGroup.java (added)
+++ hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/counters/FileSystemCounterGroup.java Fri Aug 12 23:25:51 2011
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.counters;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.concurrent.ConcurrentMap;
+import java.util.Iterator;
+import java.util.Locale;
+import java.util.Map;
+
+import com.google.common.base.Joiner;
+import static com.google.common.base.Preconditions.*;
+import com.google.common.collect.AbstractIterator;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Maps;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.FileSystemCounter;
+import org.apache.hadoop.mapreduce.util.ResourceBundles;
+
+/**
+ * An abstract class to provide common implementation of the filesystem
+ * counter group in both mapred and mapreduce packages.
+ *
+ * @param <C> the type of the Counter for the group
+ */
+@InterfaceAudience.Private
+public abstract class FileSystemCounterGroup<C extends Counter>
+    implements CounterGroupBase<C> {
+
+  static final int MAX_NUM_SCHEMES = 100; // intern/sanity check
+  static final ConcurrentMap<String, String> schemes = Maps.newConcurrentMap();
+
+  // C[] would need Array.newInstance which requires a Class<C> reference.
+  // Just a few local casts probably worth not having to carry it around.
+  private final Map<String, Object[]> map = Maps.newTreeMap();
+  private String displayName;
+
+  private static final Joiner NAME_JOINER = Joiner.on('_');
+  private static final Joiner DISP_JOINER = Joiner.on(": ");
+
+  @InterfaceAudience.Private
+  public class FSCounter extends AbstractCounter {
+    final String scheme;
+    final FileSystemCounter key;
+    private long value;
+
+    public FSCounter(String scheme, FileSystemCounter ref) {
+      this.scheme = scheme;
+      key = ref;
+    }
+
+    @Override
+    public String getName() {
+      return NAME_JOINER.join(scheme, key.name());
+    }
+
+    @Override
+    public String getDisplayName() {
+      return DISP_JOINER.join(scheme, localizeCounterName(key.name()));
+    }
+
+    protected String localizeCounterName(String counterName) {
+      return ResourceBundles.getCounterName(FileSystemCounter.class.getName(),
+                                            counterName, counterName);
+    }
+
+    @Override
+    public long getValue() {
+      return value;
+    }
+
+    @Override
+    public void setValue(long value) {
+      this.value = value;
+    }
+
+    @Override
+    public void increment(long incr) {
+      value += incr;
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+      assert false : "shouldn't be called";
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      assert false : "shouldn't be called";
+    }
+  }
+
+  @Override
+  public String getName() {
+    return FileSystemCounter.class.getName();
+  }
+
+  @Override
+  public String getDisplayName() {
+    if (displayName == null) {
+      displayName = ResourceBundles.getCounterGroupName(getName(),
+          "File System Counters");
+    }
+    return displayName;
+  }
+
+  @Override
+  public void setDisplayName(String displayName) {
+    this.displayName = displayName;
+  }
+
+  @Override
+  public void addCounter(C counter) {
+    C ours;
+    if (counter instanceof FileSystemCounterGroup<?>.FSCounter) {
+      @SuppressWarnings("unchecked")
+      FSCounter c = (FSCounter) counter;
+      ours = findCounter(c.scheme, c.key);
+    }
+    else {
+      ours = findCounter(counter.getName());
+    }
+    ours.setValue(counter.getValue());
+  }
+
+  @Override
+  public C addCounter(String name, String displayName, long value) {
+    C counter = findCounter(name);
+    counter.setValue(value);
+    return counter;
+  }
+
+  // Parse generic counter name into [scheme, key]
+  private String[] parseCounterName(String counterName) {
+    int schemeEnd = counterName.indexOf('_');
+    if (schemeEnd < 0) {
+      throw new IllegalArgumentException("bad fs counter name");
+    }
+    return new String[]{counterName.substring(0, schemeEnd),
+                        counterName.substring(schemeEnd + 1)};
+  }
+
+  @Override
+  public C findCounter(String counterName, String displayName) {
+    return findCounter(counterName);
+  }
+
+  @Override
+  public C findCounter(String counterName, boolean create) {
+    try {
+      String[] pair = parseCounterName(counterName);
+      return findCounter(pair[0], FileSystemCounter.valueOf(pair[1]));
+    }
+    catch (Exception e) {
+      if (create) throw new IllegalArgumentException(e);
+      return null;
+    }
+  }
+
+  @Override
+  public C findCounter(String counterName) {
+    return findCounter(counterName, true);
+  }
+
+  @SuppressWarnings("unchecked")
+  public synchronized C findCounter(String scheme, FileSystemCounter key) {
+    final String canonicalScheme = checkScheme(scheme);
+    Object[] counters = map.get(canonicalScheme);
+    int ord = key.ordinal();
+    if (counters == null) {
+      counters = new Object[FileSystemCounter.values().length];
+      map.put(canonicalScheme, counters);
+      counters[ord] = newCounter(canonicalScheme, key);
+    }
+    else if (counters[ord] == null) {
+      counters[ord] = newCounter(canonicalScheme, key);
+    }
+    return (C) counters[ord];
+  }
+
+  private String checkScheme(String scheme) {
+    String fixed = scheme.toUpperCase(Locale.US);
+    String interned = schemes.putIfAbsent(fixed, fixed);
+    if (schemes.size() > MAX_NUM_SCHEMES) {
+      // mistakes or abuses
+      throw new IllegalArgumentException("too many schemes? "+ schemes.size() +
+                                         " when process scheme: "+ scheme);
+    }
+    return interned == null ? fixed : interned;
+  }
+
+  /**
+   * Abstract factory method to create a file system counter
+   * @param scheme of the file system
+   * @param key the enum of the file system counter
+   * @return a new file system counter
+   */
+  protected abstract C newCounter(String scheme, FileSystemCounter key);
+
+  @Override
+  public int size() {
+    int n = 0;
+    for (Object[] counters : map.values()) {
+      n += numSetCounters(counters);
+    }
+    return n;
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public void incrAllCounters(CounterGroupBase<C> other) {
+    if (checkNotNull(other, "other group")
+        instanceof FileSystemCounterGroup<?>) {
+      for (Counter counter : other) {
+        FSCounter c = (FSCounter) counter;
+        findCounter(c.scheme, c.key) .increment(counter.getValue());
+      }
+    }
+  }
+
+  /**
+   * FileSystemGroup ::= #scheme (scheme #counter (key value)*)*
+   */
+  @Override
+  public void write(DataOutput out) throws IOException {
+    WritableUtils.writeVInt(out, map.size()); // #scheme
+    for (Map.Entry<String, Object[]> entry : map.entrySet()) {
+      WritableUtils.writeString(out, entry.getKey()); // scheme
+      // #counter for the above scheme
+      WritableUtils.writeVInt(out, numSetCounters(entry.getValue()));
+      for (Object counter : entry.getValue()) {
+        if (counter == null) continue;
+        @SuppressWarnings("unchecked")
+        FSCounter c = (FSCounter) counter;
+        WritableUtils.writeVInt(out, c.key.ordinal());  // key
+        WritableUtils.writeVLong(out, c.getValue());    // value
+      }
+    }
+  }
+
+  private int numSetCounters(Object[] counters) {
+    int n = 0;
+    for (Object counter : counters) if (counter != null) ++n;
+    return n;
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    int numSchemes = WritableUtils.readVInt(in);    // #scheme
+    FileSystemCounter[] enums = FileSystemCounter.values();
+    for (int i = 0; i < numSchemes; ++i) {
+      String scheme = WritableUtils.readString(in); // scheme
+      int numCounters = WritableUtils.readVInt(in); // #counter
+      for (int j = 0; j < numCounters; ++j) {
+        findCounter(scheme, enums[WritableUtils.readVInt(in)])  // key
+            .setValue(WritableUtils.readVLong(in)); // value
+      }
+    }
+  }
+
+  @Override
+  public Iterator<C> iterator() {
+    return new AbstractIterator<C>() {
+      Iterator<Object[]> it = map.values().iterator();
+      Object[] counters = it.hasNext() ? it.next() : null;
+      int i = 0;
+      @Override
+      protected C computeNext() {
+        while (counters != null) {
+          while (i < counters.length) {
+            @SuppressWarnings("unchecked")
+            C counter = (C) counters[i++];
+            if (counter != null) return counter;
+          }
+          i = 0;
+          counters = it.hasNext() ? it.next() : null;
+        }
+        return endOfData();
+      }
+    };
+  }
+
+  @Override
+  public synchronized boolean equals(Object genericRight) {
+    if (genericRight instanceof CounterGroupBase<?>) {
+      @SuppressWarnings("unchecked")
+      CounterGroupBase<C> right = (CounterGroupBase<C>) genericRight;
+      return Iterators.elementsEqual(iterator(), right.iterator());
+    }
+    return false;
+  }
+
+  @Override
+  public synchronized int hashCode() {
+    // need to be deep as counters is an array
+    int hash = FileSystemCounter.class.hashCode();
+    for (Object[] counters : map.values()) {
+      if (counters != null) hash ^= Arrays.hashCode(counters);
+    }
+    return hash;
+  }
+}



Mime
View raw message