hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject svn commit: r1079197 [1/2] - in /hadoop/mapreduce/branches/yahoo-merge/src: java/ java/org/apache/hadoop/mapred/ java/org/apache/hadoop/mapreduce/ java/org/apache/hadoop/mapreduce/counters/ java/org/apache/hadoop/mapreduce/jobhistory/ java/org/apache/h...
Date Tue, 08 Mar 2011 05:54:44 GMT
Author: omalley
Date: Tue Mar  8 05:54:43 2011
New Revision: 1079197

URL: http://svn.apache.org/viewvc?rev=1079197&view=rev
Log:
commit 2166522e54340d54343b1a8f127a33627bf22f07
Author: Luke Lu <llu@yahoo-inc.com>
Date:   Wed Nov 24 16:18:19 2010 -0800

    MAPREDUCE:901 Refactor counters framework
    
    to support efficient framework counters and user counters limits.
    
    See  for patches/reviews.

Added:
    hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/FileSystemCounter.java
    hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/FileSystemCounter.properties
    hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/counters/
    hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/counters/AbstractCounter.java
    hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/counters/AbstractCounterGroup.java
    hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/counters/AbstractCounters.java
    hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/counters/CounterGroupBase.java
    hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/counters/CounterGroupFactory.java
    hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/counters/FileSystemCounterGroup.java
    hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/counters/FrameworkCounterGroup.java
    hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/counters/GenericCounter.java
    hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/counters/LimitExceededException.java
    hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/counters/Limits.java
    hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/counters/package-info.java
    hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/util/CountersStrings.java
    hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/util/ResourceBundles.java
Modified:
    hadoop/mapreduce/branches/yahoo-merge/src/java/mapred-default.xml
    hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/Counters.java
    hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java
    hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/Task.java
    hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskInProgress.java
    hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskStatus.java
    hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskTracker.java
    hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/Counter.java
    hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/CounterGroup.java
    hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/Counters.java
    hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/MRJobConfig.java
    hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/EventReader.java
    hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java
    hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFS.java
    hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestSeveral.java
    hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapreduce/TestCounters.java

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/mapred-default.xml?rev=1079197&r1=1079196&r2=1079197&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/mapred-default.xml (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/mapred-default.xml Tue Mar  8 05:54:43 2011
@@ -1150,4 +1150,11 @@
 
 <!--  end of node health script variables -->
 
+<property>
+ <name>mapreduce.job.counters.limit</name>
+  <value>120</value>
+  <description>Limit on the number of user counters allowed per job.
+  </description>
+</property>
+
 </configuration>

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/Counters.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/Counters.java?rev=1079197&r1=1079196&r2=1079197&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/Counters.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/Counters.java Tue Mar  8 05:54:43 2011
@@ -18,463 +18,348 @@
 
 package org.apache.hadoop.mapred;
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
 import java.text.ParseException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.IdentityHashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.MissingResourceException;
-import java.util.ResourceBundle;
 
-import org.apache.commons.logging.*;
+import org.apache.commons.logging.Log;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.mapreduce.FileSystemCounter;
+import org.apache.hadoop.mapreduce.counters.AbstractCounterGroup;
+import org.apache.hadoop.mapreduce.counters.AbstractCounters;
+import org.apache.hadoop.mapreduce.counters.CounterGroupBase;
+import org.apache.hadoop.mapreduce.counters.CounterGroupFactory;
+import org.apache.hadoop.mapreduce.counters.FrameworkCounterGroup;
+import org.apache.hadoop.mapreduce.counters.FileSystemCounterGroup;
+import org.apache.hadoop.mapreduce.counters.GenericCounter;
+import org.apache.hadoop.mapreduce.counters.Limits;
+import static org.apache.hadoop.mapreduce.util.CountersStrings.*;
 
 /**
  * A set of named counters.
- * 
- * <p><code>Counters</code> represent global counters, defined either by the 
+ *
+ * <p><code>Counters</code> represent global counters, defined either by the
  * Map-Reduce framework or applications. Each <code>Counter</code> can be of
  * any {@link Enum} type.</p>
- * 
+ *
  * <p><code>Counters</code> are bunched into {@link Group}s, each comprising of
- * counters from a particular <code>Enum</code> class. 
+ * counters from a particular <code>Enum</code> class.
  * @deprecated Use {@link org.apache.hadoop.mapreduce.Counters} instead.
  */
 @Deprecated
 @InterfaceAudience.Public
 @InterfaceStability.Stable
-public class Counters implements Writable, Iterable<Counters.Group> {
-  private static final Log LOG = LogFactory.getLog(Counters.class);
-  private static final char GROUP_OPEN = '{';
-  private static final char GROUP_CLOSE = '}';
-  private static final char COUNTER_OPEN = '[';
-  private static final char COUNTER_CLOSE = ']';
-  private static final char UNIT_OPEN = '(';
-  private static final char UNIT_CLOSE = ')';
-  private static char[] charsToEscape =  {GROUP_OPEN, GROUP_CLOSE, 
-                                          COUNTER_OPEN, COUNTER_CLOSE, 
-                                          UNIT_OPEN, UNIT_CLOSE};
-  
-  //private static Log log = LogFactory.getLog("Counters.class");
-  
+public class Counters
+    extends AbstractCounters<Counters.Counter, Counters.Group> {
+
+  public Counters() {
+    super(groupFactory);
+  }
+
+  public Counters(org.apache.hadoop.mapreduce.Counters newCounters) {
+    super(newCounters, groupFactory);
+  }
+
   /**
    * Downgrade new {@link org.apache.hadoop.mapreduce.Counters} to old Counters
    * @param newCounters new Counters
    * @return old Counters instance corresponding to newCounters
    */
   static Counters downgrade(org.apache.hadoop.mapreduce.Counters newCounters) {
-    Counters oldCounters = new Counters();
-    for (org.apache.hadoop.mapreduce.CounterGroup newGroup: newCounters) {
-      String groupName = newGroup.getName();
-      Group oldGroup = oldCounters.getGroup(groupName);
-      for (org.apache.hadoop.mapreduce.Counter newCounter: newGroup) {
-        Counter oldCounter = oldGroup.getCounterForName(newCounter.getName());
-        oldCounter.setDisplayName(newCounter.getDisplayName());
-        oldCounter.increment(newCounter.getValue());
-      }
-    }
-    return oldCounters;
+    return new Counters(newCounters);
   }
 
   /**
-   * A counter record, comprising its name and value. 
+   * A counter record, comprising its name and value.
    */
-  public static class Counter extends org.apache.hadoop.mapreduce.Counter {
-    
-    Counter() { 
-    }
+  public interface Counter extends org.apache.hadoop.mapreduce.Counter {
 
-    Counter(String name, String displayName, long value) {
-      super(name, displayName);
-      increment(value);
-    }
-    
-    public void setDisplayName(String newName) {
-      super.setDisplayName(newName);
-    }
-    
     /**
      * Returns the compact stringified version of the counter in the format
      * [(actual-name)(display-name)(value)]
+     * @return the stringified result
      */
-    public synchronized String makeEscapedCompactString() {
+    String makeEscapedCompactString();
 
-      // First up, obtain the strings that need escaping. This will help us
-      // determine the buffer length apriori.
-      String escapedName = escape(getName());
-      String escapedDispName = escape(getDisplayName());
-      long currentValue = this.getValue();
-      int length = escapedName.length() + escapedDispName.length() + 4;
-
-      length += 8; // For the following delimiting characters
-      StringBuilder builder = new StringBuilder(length);
-      builder.append(COUNTER_OPEN);
-      
-      // Add the counter name
-      builder.append(UNIT_OPEN);
-      builder.append(escapedName);
-      builder.append(UNIT_CLOSE);
-      
-      // Add the display name
-      builder.append(UNIT_OPEN);
-      builder.append(escapedDispName);
-      builder.append(UNIT_CLOSE);
-      
-      // Add the value
-      builder.append(UNIT_OPEN);
-      builder.append(currentValue);
-      builder.append(UNIT_CLOSE);
-      
-      builder.append(COUNTER_CLOSE);
-      
-      return builder.toString();
-    }
-    
-    // Checks for (content) equality of two (basic) counters
+    /**
+     * Checks for (content) equality of two (basic) counters
+     * @param counter to compare
+     * @return true if content equals
+     * @deprecated
+     */
     @Deprecated
-    synchronized boolean contentEquals(Counter c) {
-      return this.equals(c);
-    }
-    
+    boolean contentEquals(Counter counter);
+
     /**
-     * What is the current value of this counter?
-     * @return the current value
+     * @return the value of the counter
      */
-    public synchronized long getCounter() {
+    long getCounter();
+  }
+
+  static class OldCounterImpl extends GenericCounter implements Counter {
+
+    OldCounterImpl() {
+    }
+
+    OldCounterImpl(String name, String displayName, long value) {
+      super(name, displayName, value);
+    }
+
+    @Override
+    public synchronized String makeEscapedCompactString() {
+      return toEscapedCompactString(this);
+    }
+
+    @Override @Deprecated
+    public boolean contentEquals(Counter counter) {
+      return equals(counter);
+    }
+
+    @Override
+    public long getCounter() {
       return getValue();
     }
-    
   }
-  
+
   /**
-   *  <code>Group</code> of counters, comprising of counters from a particular 
-   *  counter {@link Enum} class.  
+   *  <code>Group</code> of counters, comprising of counters from a particular
+   *  counter {@link Enum} class.
    *
-   *  <p><code>Group</code>handles localization of the class name and the 
+   *  <p><code>Group</code>handles localization of the class name and the
    *  counter names.</p>
    */
-  public static class Group implements Writable, Iterable<Counter> {
-    private String groupName;
-    private String displayName;
-    private Map<String, Counter> subcounters = new HashMap<String, Counter>();
-    
-    // Optional ResourceBundle for localization of group and counter names.
-    private ResourceBundle bundle = null;    
-    
-    Group(String groupName) {
-      try {
-        bundle = getResourceBundle(groupName);
-      }
-      catch (MissingResourceException neverMind) {
-      }
-      this.groupName = groupName;
-      this.displayName = localize("CounterGroupName", groupName);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Creating group " + groupName + " with " +
-                  (bundle == null ? "nothing" : "bundle"));
-      }
-    }
-    
+  public static interface Group extends CounterGroupBase<Counter> {
+
     /**
-     * Returns the specified resource bundle, or throws an exception.
-     * @throws MissingResourceException if the bundle isn't found
+     * @param counterName the name of the counter
+     * @return the value of the specified counter, or 0 if the counter does
+     * not exist.
      */
-    private static ResourceBundle getResourceBundle(String enumClassName) {
-      String bundleName = enumClassName.replace('$','_');
-      return ResourceBundle.getBundle(bundleName);
-    }
-    
+    long getCounter(String counterName);
+
     /**
-     * Returns raw name of the group.  This is the name of the enum class
-     * for this group of counters.
+     * @return the compact stringified version of the group in the format
+     * {(actual-name)(display-name)(value)[][][]} where [] are compact strings
+     * for the counters within.
      */
-    public String getName() {
-      return groupName;
-    }
-    
+    String makeEscapedCompactString();
+
     /**
-     * Returns localized name of the group.  This is the same as getName() by
-     * default, but different if an appropriate ResourceBundle is found.
+     * Get the counter for the given id and create it if it doesn't exist.
+     * @param id the numeric id of the counter within the group
+     * @param name the internal counter name
+     * @return the counter
+     * @deprecated use {@link #findCounter(String)} instead
      */
-    public String getDisplayName() {
-      return displayName;
-    }
-    
+    @Deprecated
+    Counter getCounter(int id, String name);
+
     /**
-     * Set the display name
+     * Get the counter for the given name and create it if it doesn't exist.
+     * @param name the internal counter name
+     * @return the counter
      */
-    public void setDisplayName(String displayName) {
-      this.displayName = displayName;
+    Counter getCounterForName(String name);
+  }
+
+  // All the group impls need this for legacy group interface
+  static long getCounterValue(Group group, String counterName) {
+    Counter counter = group.findCounter(counterName, false);
+    if (counter != null) return counter.getValue();
+    return 0L;
+  }
+
+  // Mix the generic group implementation into the Group interface
+  private static class GenericGroup extends AbstractCounterGroup<Counter>
+                                    implements Group {
+
+    GenericGroup(String name, String displayName, Limits limits) {
+      super(name, displayName, limits);
     }
-    
-    /**
-     * Returns the compact stringified version of the group in the format
-     * {(actual-name)(display-name)(value)[][][]} where [] are compact strings for the
-     * counters within.
-     */
-    public String makeEscapedCompactString() {
-      String[] subcountersArray = new String[subcounters.size()];
 
-      // First up, obtain the strings that need escaping. This will help us
-      // determine the buffer length apriori.
-      String escapedName = escape(getName());
-      String escapedDispName = escape(getDisplayName());
-      int i = 0;
-      int length = escapedName.length() + escapedDispName.length();
-      for (Counter counter : subcounters.values()) {
-        String escapedStr = counter.makeEscapedCompactString();
-        subcountersArray[i++] = escapedStr;
-        length += escapedStr.length();
-      }
+    @Override
+    public long getCounter(String counterName) {
+      return getCounterValue(this, counterName);
+    }
 
-      length += 6; // for all the delimiting characters below
-      StringBuilder builder = new StringBuilder(length);
-      builder.append(GROUP_OPEN); // group start
-      
-      // Add the group name
-      builder.append(UNIT_OPEN);
-      builder.append(escapedName);
-      builder.append(UNIT_CLOSE);
-      
-      // Add the display name
-      builder.append(UNIT_OPEN);
-      builder.append(escapedDispName);
-      builder.append(UNIT_CLOSE);
-      
-      // write the value
-      for(Counter counter: subcounters.values()) {
-        builder.append(counter.makeEscapedCompactString());
-      }
-      
-      builder.append(GROUP_CLOSE); // group end
-      return builder.toString();
+    @Override
+    public String makeEscapedCompactString() {
+      return toEscapedCompactString(this);
     }
 
     @Override
-    public int hashCode() {
-      return subcounters.hashCode();
+    public Counter getCounter(int id, String name) {
+      return findCounter(name);
     }
 
-    /** 
-     * Checks for (content) equality of Groups
-     */
     @Override
-    public synchronized boolean equals(Object obj) {
-      boolean isEqual = false;
-      if (obj != null && obj instanceof Group) {
-        Group g = (Group) obj;
-        if (size() == g.size()) {
-          isEqual = true;
-          for (Map.Entry<String, Counter> entry : subcounters.entrySet()) {
-            String key = entry.getKey();
-            Counter c1 = entry.getValue();
-            Counter c2 = g.getCounterForName(key);
-            if (!c1.contentEquals(c2)) {
-              isEqual = false;
-              break;
-            }
-          }
-        }
-      }
-      return isEqual;
+    public Counter getCounterForName(String name) {
+      return findCounter(name);
     }
-    
-    /**
-     * Returns the value of the specified counter, or 0 if the counter does
-     * not exist.
-     */
-    public synchronized long getCounter(String counterName) {
-      for(Counter counter: subcounters.values()) {
-        if (counter != null && counter.getDisplayName().equals(counterName)) {
-          return counter.getValue();
-        }
-      }
-      return 0L;
+
+    @Override
+    protected Counter newCounter(String counterName, String displayName,
+                                 long value) {
+      return new OldCounterImpl(counterName, displayName, value);
     }
-    
-    /**
-     * Get the counter for the given id and create it if it doesn't exist.
-     * @param id the numeric id of the counter within the group
-     * @param name the internal counter name
-     * @return the counter
-     * @deprecated use {@link #getCounter(String)} instead
-     */
-    @Deprecated
-    public synchronized Counter getCounter(int id, String name) {
-      return getCounterForName(name);
+
+    @Override
+    protected Counter newCounter() {
+      return new OldCounterImpl();
     }
-    
-    /**
-     * Get the counter for the given name and create it if it doesn't exist.
-     * @param name the internal counter name
-     * @return the counter
-     */
-    public synchronized Counter getCounterForName(String name) {
-      Counter result = subcounters.get(name);
-      if (result == null) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Adding " + name);
-        }
-        result = new Counter(name, localize(name + ".name", name), 0L);
-        subcounters.put(name, result);
+  }
+
+  // Mix the framework group implementation into the Group interface
+  private static class FrameworkGroupImpl<T extends Enum<T>>
+      extends FrameworkCounterGroup<T, Counter> implements Group {
+
+    // Mix the framework counter implmementation into the Counter interface
+    class FrameworkCounterImpl extends FrameworkCounter implements Counter {
+
+      FrameworkCounterImpl(T key) {
+        super(key);
+      }
+
+      @Override
+      public String makeEscapedCompactString() {
+        return toEscapedCompactString(this);
+      }
+
+      @Override
+      public boolean contentEquals(Counter counter) {
+        return equals(counter);
+      }
+
+      @Override
+      public long getCounter() {
+        return getValue();
       }
-      return result;
     }
-    
-    /**
-     * Returns the number of counters in this group.
-     */
-    public synchronized int size() {
-      return subcounters.size();
+
+    FrameworkGroupImpl(Class<T> cls) {
+      super(cls);
     }
-    
-    /**
-     * Looks up key in the ResourceBundle and returns the corresponding value.
-     * If the bundle or the key doesn't exist, returns the default value.
-     */
-    private String localize(String key, String defaultValue) {
-      String result = defaultValue;
-      if (bundle != null) {
-        try {
-          result = bundle.getString(key);
-        }
-        catch (MissingResourceException mre) {
-        }
-      }
-      return result;
+
+    @Override
+    public long getCounter(String counterName) {
+      return getCounterValue(this, counterName);
     }
-    
-    public synchronized void write(DataOutput out) throws IOException {
-      Text.writeString(out, displayName);
-      WritableUtils.writeVInt(out, subcounters.size());
-      for(Counter counter: subcounters.values()) {
-        counter.write(out);
-      }
+
+    @Override
+    public String makeEscapedCompactString() {
+      return toEscapedCompactString(this);
     }
-    
-    public synchronized void readFields(DataInput in) throws IOException {
-      displayName = Text.readString(in);
-      subcounters.clear();
-      int size = WritableUtils.readVInt(in);
-      for(int i=0; i < size; i++) {
-        Counter counter = new Counter();
-        counter.readFields(in);
-        subcounters.put(counter.getName(), counter);
-      }
+
+    @Override @Deprecated
+    public Counter getCounter(int id, String name) {
+      return findCounter(name);
+    }
+
+    @Override
+    public Counter getCounterForName(String name) {
+      return findCounter(name);
     }
 
-    public synchronized Iterator<Counter> iterator() {
-      return new ArrayList<Counter>(subcounters.values()).iterator();
+    @Override
+    protected Counter newCounter(T key) {
+      return new FrameworkCounterImpl(key);
     }
   }
-  
-  // Map from group name (enum class name) to map of int (enum ordinal) to
-  // counter record (name-value pair).
-  private Map<String,Group> counters = new HashMap<String, Group>();
 
-  /**
-   * A cache from enum values to the associated counter. Dramatically speeds up
-   * typical usage.
-   */
-  private Map<Enum, Counter> cache = new IdentityHashMap<Enum, Counter>();
-  
-  /**
-   * Returns the names of all counter classes.
-   * @return Set of counter names.
-   */
-  public synchronized Collection<String> getGroupNames() {
-    return counters.keySet();
-  }
+  // Mix the file system counter group implementation into the Group interface
+  private static class FSGroupImpl extends FileSystemCounterGroup<Counter>
+                                   implements Group {
 
-  public synchronized Iterator<Group> iterator() {
-    return counters.values().iterator();
-  }
+    private class FSCounterImpl extends FSCounter implements Counter {
 
-  /**
-   * Returns the named counter group, or an empty group if there is none
-   * with the specified name.
-   */
-  public synchronized Group getGroup(String groupName) {
-    Group result = counters.get(groupName);
+      FSCounterImpl(String scheme, FileSystemCounter key) {
+        super(scheme, key);
+      }
+
+      @Override
+      public String makeEscapedCompactString() {
+        return toEscapedCompactString(this);
+      }
 
-    if (result == null) {
-      // To provide support for deprecated group names  
-      if (groupName.equals("org.apache.hadoop.mapred.Task$Counter")) {
-        LOG.warn("Group org.apache.hadoop.mapred.Task$Counter is deprecated." +
-                 " Use org.apache.hadoop.mapreduce.TaskCounter instead");
-        return getGroup("org.apache.hadoop.mapreduce.TaskCounter");
-      } 
+      @Override @Deprecated
+      public boolean contentEquals(Counter counter) {
+        throw new UnsupportedOperationException("Not supported yet.");
+      }
 
-      if (groupName.equals
-          ("org.apache.hadoop.mapred.JobInProgress$Counter")) {
-        LOG.warn("Group org.apache.hadoop.mapred.JobInProgress$Counter " +
-                 "is deprecated. Use " +
-                 "org.apache.hadoop.mapreduce.JobCounter instead");
-        return getGroup("org.apache.hadoop.mapreduce.JobCounter");
+      @Override
+      public long getCounter() {
+        return getValue();
       }
 
-      result = new Group(groupName);
-      counters.put(groupName, result);
     }
 
-    return result;
-  }
+    @Override
+    protected Counter newCounter(String scheme, FileSystemCounter key) {
+      return new FSCounterImpl(scheme, key);
+    }
 
-  /**
-   * Find the counter for the given enum. The same enum will always return the
-   * same counter.
-   * @param key the counter key
-   * @return the matching counter object
-   */
-  public synchronized Counter findCounter(Enum key) {
-    Counter counter = cache.get(key);
-    if (counter == null) {
-      Group group = getGroup(key.getDeclaringClass().getName());
-      counter = group.getCounterForName(key.toString());
-      cache.put(key, counter);
+    @Override
+    public long getCounter(String counterName) {
+      return getCounterValue(this, counterName);
+    }
+
+    @Override
+    public String makeEscapedCompactString() {
+      return toEscapedCompactString(this);
+    }
+
+    @Override @Deprecated
+    public Counter getCounter(int id, String name) {
+      return findCounter(name);
+    }
+
+    @Override
+    public Counter getCounterForName(String name) {
+      return findCounter(name);
     }
-    return counter;    
+
   }
 
   /**
-   * Find a counter given the group and the name.
-   * @param group the name of the group
-   * @param name the internal name of the counter
-   * @return the counter for that name
+   * Provide factory methods for counter group factory implementation.
+   * See also the GroupFactory in
+   *  {@link org.apache.hadoop.mapreduce.Counters mapreduce.Counters}
    */
-  public synchronized Counter findCounter(String group, String name) {
-    if (name.equals("MAP_INPUT_BYTES")) {
-      group = FileInputFormat.COUNTER_GROUP; 
-      name = FileInputFormat.BYTES_READ; 
-      LOG.warn("Counter name MAP_INPUT_BYTES is deprecated. " +
-               "Use FileInputFormatCounters as group name and " +
-               " BYTES_READ as counter name instead");
+  static class GroupFactory extends CounterGroupFactory<Counter, Group> {
+
+    @Override
+    protected <T extends Enum<T>>
+    FrameworkGroupFactory<Group> newFrameworkGroupFactory(final Class<T> cls) {
+      return new FrameworkGroupFactory<Group>() {
+        @Override public Group newGroup(String name) {
+          return new FrameworkGroupImpl<T>(cls); // impl in this package
+        }
+      };
+    }
+
+    @Override
+    protected Group newGenericGroup(String name, String displayName,
+                                    Limits limits) {
+      return new GenericGroup(name, displayName, limits);
+    }
+
+    @Override
+    protected Group newFileSystemGroup() {
+      return new FSGroupImpl();
     }
-    return getGroup(group).getCounterForName(name);
   }
 
+  private static final GroupFactory groupFactory = new GroupFactory();
+
   /**
    * Find a counter by using strings
    * @param group the name of the group
    * @param id the id of the counter within the group (0 to N-1)
    * @param name the internal name of the counter
    * @return the counter for that name
-   * @deprecated
+   * @deprecated use {@link findCounter(String, String)} instead
    */
   @Deprecated
-  public synchronized Counter findCounter(String group, int id, String name) {
+  public Counter findCounter(String group, int id, String name) {
     return findCounter(group, name);
   }
 
@@ -484,10 +369,10 @@ public class Counters implements Writabl
    * @param key identifies a counter
    * @param amount amount by which counter is to be incremented
    */
-  public synchronized void incrCounter(Enum key, long amount) {
+  public void incrCounter(Enum<?> key, long amount) {
     findCounter(key).increment(amount);
   }
-  
+
   /**
    * Increments the specified counter by the specified amount, creating it if
    * it didn't already exist.
@@ -495,27 +380,29 @@ public class Counters implements Writabl
    * @param counter the internal name of the counter
    * @param amount amount by which counter is to be incremented
    */
-  public synchronized void incrCounter(String group, String counter, long amount) {
+  public void incrCounter(String group, String counter, long amount) {
     findCounter(group, counter).increment(amount);
   }
-  
+
   /**
    * Returns current value of the specified counter, or 0 if the counter
    * does not exist.
+   * @param key the counter enum to lookup
+   * @return the counter value or 0 if counter not found
    */
-  public synchronized long getCounter(Enum key) {
+  public synchronized long getCounter(Enum<?> key) {
     return findCounter(key).getValue();
   }
-  
+
   /**
-   * Increments multiple counters by their amounts in another Counters 
+   * Increments multiple counters by their amounts in another Counters
    * instance.
    * @param other the other Counters instance
    */
   public synchronized void incrAllCounters(Counters other) {
     for (Group otherGroup: other) {
       Group group = getGroup(otherGroup.getName());
-      group.displayName = otherGroup.displayName;
+      group.setDisplayName(otherGroup.getDisplayName());
       for (Counter otherCounter : otherGroup) {
         Counter counter = group.getCounterForName(otherCounter.getName());
         counter.setDisplayName(otherCounter.getDisplayName());
@@ -525,7 +412,18 @@ public class Counters implements Writabl
   }
 
   /**
+   * @return the total number of counters
+   * @deprecated use {@link #countCounters()} instead
+   */
+  public int size() {
+    return countCounters();
+  }
+
+  /**
    * Convenience method for computing the sum of two sets of counters.
+   * @param a the first counters
+   * @param b the second counters
+   * @return a new summed counters object
    */
   public static Counters sum(Counters a, Counters b) {
     Counters counters = new Counters();
@@ -533,55 +431,7 @@ public class Counters implements Writabl
     counters.incrAllCounters(b);
     return counters;
   }
-  
-  /**
-   * Returns the total number of counters, by summing the number of counters
-   * in each group.
-   */
-  public synchronized  int size() {
-    int result = 0;
-    for (Group group : this) {
-      result += group.size();
-    }
-    return result;
-  }
-  
-  /**
-   * Write the set of groups.
-   * The external format is:
-   *     #groups (groupName group)*
-   *
-   * i.e. the number of groups followed by 0 or more groups, where each 
-   * group is of the form:
-   *
-   *     groupDisplayName #counters (false | true counter)*
-   *
-   * where each counter is of the form:
-   *
-   *     name (false | true displayName) value
-   */
-  public synchronized void write(DataOutput out) throws IOException {
-    out.writeInt(counters.size());
-    for (Group group: counters.values()) {
-      Text.writeString(out, group.getName());
-      group.write(out);
-    }
-  }
-  
-  /**
-   * Read a set of groups.
-   */
-  public synchronized void readFields(DataInput in) throws IOException {
-    int numClasses = in.readInt();
-    counters.clear();
-    while (numClasses-- > 0) {
-      String groupName = Text.readString(in);
-      Group group = new Group(groupName);
-      group.readFields(in);
-      counters.put(groupName, group);
-    }
-  }
-  
+
   /**
    * Logs the current counter values.
    * @param log The log to use.
@@ -591,206 +441,31 @@ public class Counters implements Writabl
     for(Group group: this) {
       log.info("  " + group.getDisplayName());
       for (Counter counter: group) {
-        log.info("    " + counter.getDisplayName() + "=" + 
+        log.info("    " + counter.getDisplayName() + "=" +
                  counter.getCounter());
-      }   
-    }
-  }
-  
-  /**
-   * Return textual representation of the counter values.
-   */
-  public synchronized String toString() {
-    StringBuilder sb = new StringBuilder("Counters: " + size());
-    for (Group group: this) {
-      sb.append("\n\t" + group.getDisplayName());
-      for (Counter counter: group) {
-        sb.append("\n\t\t" + counter.getDisplayName() + "=" + 
-                  counter.getCounter());
       }
     }
-    return sb.toString();
   }
 
   /**
-   * Convert a counters object into a single line that is easy to parse.
-   * @return the string with "name=value" for each counter and separated by ","
-   */
-  public synchronized String makeCompactString() {
-    StringBuffer buffer = new StringBuffer();
-    boolean first = true;
-    for(Group group: this){   
-      for(Counter counter: group) {
-        if (first) {
-          first = false;
-        } else {
-          buffer.append(',');
-        }
-        buffer.append(group.getDisplayName());
-        buffer.append('.');
-        buffer.append(counter.getDisplayName());
-        buffer.append(':');
-        buffer.append(counter.getCounter());
-      }
-    }
-    return buffer.toString();
-  }
-  
-  /**
-   * Represent the counter in a textual format that can be converted back to 
+   * Represent the counter in a textual format that can be converted back to
    * its object form
    * @return the string in the following format
-   * {(groupname)(group-displayname)[(countername)(displayname)(value)][][]}{}{}
+   * {(groupName)(group-displayName)[(counterName)(displayName)(value)][]*}*
    */
-  public synchronized String makeEscapedCompactString() {
-    String[] groupsArray = new String[counters.size()];
-    int i = 0;
-    int length = 0;
-
-    // First up, obtain the escaped string for each group so that we can
-    // determine the buffer length apriori.
-    for (Group group : this) {
-      String escapedString = group.makeEscapedCompactString();
-      groupsArray[i++] = escapedString;
-      length += escapedString.length();
-    }
-
-    // Now construct the buffer
-    StringBuilder builder = new StringBuilder(length);
-    for (String group : groupsArray) {
-      builder.append(group);
-    }
-    return builder.toString();
-  }
-
-  // Extracts a block (data enclosed within delimeters) ignoring escape 
-  // sequences. Throws ParseException if an incomplete block is found else 
-  // returns null.
-  private static String getBlock(String str, char open, char close, 
-                                IntWritable index) throws ParseException {
-    StringBuilder split = new StringBuilder();
-    int next = StringUtils.findNext(str, open, StringUtils.ESCAPE_CHAR, 
-                                    index.get(), split);
-    split.setLength(0); // clear the buffer
-    if (next >= 0) {
-      ++next; // move over '('
-      
-      next = StringUtils.findNext(str, close, StringUtils.ESCAPE_CHAR, 
-                                   next, split);
-      if (next >= 0) {
-        ++next; // move over ')'
-        index.set(next);
-        return split.toString(); // found a block
-      } else {
-        throw new ParseException("Unexpected end of block", next);
-      }
-    }
-    return null; // found nothing
+  public String makeEscapedCompactString() {
+    return toEscapedCompactString(this);
   }
-  
+
   /**
-   * Convert a stringified counter representation into a counter object. Note 
-   * that the counter can be recovered if its stringified using 
-   * {@link #makeEscapedCompactString()}. 
-   * @return a Counter
+   * Convert a stringified (by {@link #makeEscapedCompactString()} counter
+   * representation into a counter object.
+   * @param compactString to parse
+   * @return a new counters object
+   * @throws ParseException
    */
-  public static Counters fromEscapedCompactString(String compactString) 
-  throws ParseException {
-    Counters counters = new Counters();
-    IntWritable index = new IntWritable(0);
-    
-    // Get the group to work on
-    String groupString = 
-      getBlock(compactString, GROUP_OPEN, GROUP_CLOSE, index);
-    
-    while (groupString != null) {
-      IntWritable groupIndex = new IntWritable(0);
-      
-      // Get the actual name
-      String groupName = 
-        getBlock(groupString, UNIT_OPEN, UNIT_CLOSE, groupIndex);
-      groupName = unescape(groupName);
-      
-      // Get the display name
-      String groupDisplayName = 
-        getBlock(groupString, UNIT_OPEN, UNIT_CLOSE, groupIndex);
-      groupDisplayName = unescape(groupDisplayName);
-      
-      // Get the counters
-      Group group = counters.getGroup(groupName);
-      group.setDisplayName(groupDisplayName);
-      
-      String counterString = 
-        getBlock(groupString, COUNTER_OPEN, COUNTER_CLOSE, groupIndex);
-      
-      while (counterString != null) {
-        IntWritable counterIndex = new IntWritable(0);
-        
-        // Get the actual name
-        String counterName = 
-          getBlock(counterString, UNIT_OPEN, UNIT_CLOSE, counterIndex);
-        counterName = unescape(counterName);
-        
-        // Get the display name
-        String counterDisplayName = 
-          getBlock(counterString, UNIT_OPEN, UNIT_CLOSE, counterIndex);
-        counterDisplayName = unescape(counterDisplayName);
-        
-        // Get the value
-        long value = 
-          Long.parseLong(getBlock(counterString, UNIT_OPEN, UNIT_CLOSE, 
-                                  counterIndex));
-        
-        // Add the counter
-        Counter counter = group.getCounterForName(counterName);
-        counter.setDisplayName(counterDisplayName);
-        counter.increment(value);
-        
-        // Get the next counter
-        counterString = 
-          getBlock(groupString, COUNTER_OPEN, COUNTER_CLOSE, groupIndex);
-      }
-      
-      groupString = getBlock(compactString, GROUP_OPEN, GROUP_CLOSE, index);
-    }
-    return counters;
-  }
-
-  // Escapes all the delimiters for counters i.e {,[,(,),],}
-  private static String escape(String string) {
-    return StringUtils.escapeString(string, StringUtils.ESCAPE_CHAR, 
-                                    charsToEscape);
-  }
-  
-  // Unescapes all the delimiters for counters i.e {,[,(,),],}
-  private static String unescape(String string) {
-    return StringUtils.unEscapeString(string, StringUtils.ESCAPE_CHAR, 
-                                      charsToEscape);
-  }
-
-  @Override 
-  public synchronized int hashCode() {
-    return counters.hashCode();
-  }
-
-  @Override
-  public synchronized boolean equals(Object obj) {
-    boolean isEqual = false;
-    if (obj != null && obj instanceof Counters) {
-      Counters other = (Counters) obj;
-      if (size() == other.size()) {
-        isEqual = true;
-        for (Map.Entry<String, Group> entry : this.counters.entrySet()) {
-          String key = entry.getKey();
-          Group sourceGroup = entry.getValue();
-          Group targetGroup = other.getGroup(key);
-          if (!sourceGroup.equals(targetGroup)) {
-            isEqual = false;
-            break;
-          }
-        }
-      }
-    }
-    return isEqual;
+  public static Counters fromEscapedCompactString(String compactString)
+      throws ParseException {
+    return parseEscapedCompactString(compactString, new Counters());
   }
 }

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java?rev=1079197&r1=1079196&r2=1079197&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java Tue Mar  8 05:54:43 2011
@@ -77,8 +77,10 @@ interface InterTrackerProtocol extends V
    * Version 29: Adding user name to the serialized Task for use by TT.
    * Version 30: Adding available memory and CPU usage information on TT to
    *             TaskTrackerStatus for MAPREDUCE-1218
+   * Version 31: Efficient serialization format for Framework counters
+   *             (MAPREDUCE-901)
    */             
-  public static final long versionID = 30L;
+  public static final long versionID = 31L;
   
   public final static int TRACKERS_OK = 0;
   public final static int UNKNOWN_TASKTRACKER = 1;

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=1079197&r1=1079196&r2=1079197&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/JobInProgress.java Tue Mar  8 05:54:43 2011
@@ -52,6 +52,7 @@ import org.apache.hadoop.mapreduce.JobCo
 import org.apache.hadoop.mapreduce.JobSubmissionFiles;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.counters.LimitExceededException;
 import org.apache.hadoop.mapreduce.jobhistory.JobFinishedEvent;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistory;
 import org.apache.hadoop.mapreduce.jobhistory.JobInfoChangeEvent;
@@ -1364,8 +1365,12 @@ public class JobInProgress {
    */
   private Counters incrementTaskCounters(Counters counters,
                                          TaskInProgress[] tips) {
-    for (TaskInProgress tip : tips) {
-      counters.incrAllCounters(tip.getCounters());
+    try {
+      for (TaskInProgress tip : tips) {
+        counters.incrAllCounters(tip.getCounters());
+      }
+    } catch (LimitExceededException e) {
+      // too many user counters/groups, leaving existing counters intact.
     }
     return counters;
   }
@@ -2822,6 +2827,9 @@ public class JobInProgress {
       retireMap(tip);
       if ((finishedMapTasks + failedMapTIPs) == (numMapTasks)) {
         this.status.setMapProgress(1.0f);
+        if (canLaunchJobCleanupTask()) {
+          checkCountersLimitsOrFail();
+        }
       }
     } else {
       runningReduceTasks -= 1;
@@ -2834,6 +2842,9 @@ public class JobInProgress {
       retireReduce(tip);
       if ((finishedReduceTasks + failedReduceTIPs) == (numReduceTasks)) {
         this.status.setReduceProgress(1.0f);
+        if (canLaunchJobCleanupTask()) {
+          checkCountersLimitsOrFail();
+        }
       }
     }
     decrementSpeculativeCount(wasSpeculating, tip);
@@ -2843,6 +2854,19 @@ public class JobInProgress {
     }
     return true;
   }
+
+  /*
+   * add up the counters and fail the job if it exceeds the limits.
+   * Make sure we do not recalculate the counters after we fail the job.
+   * Currently this is taken care by terminateJob() since it does not
+   * calculate the counters.
+   */
+  private void checkCountersLimitsOrFail() {
+    Counters counters = getCounters();
+    if (counters.limits().violation() != null) {
+      jobtracker.failJob(this);
+    }
+  }
   
   private void updateTaskTrackerStats(TaskInProgress tip, TaskTrackerStatus ttStatus, 
       Map<String,DataStatistics> trackerStats, DataStatistics overallStats) {

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/Task.java?rev=1079197&r1=1079196&r2=1079197&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/Task.java Tue Mar  8 05:54:43 2011
@@ -31,7 +31,6 @@ import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-
 import javax.crypto.SecretKey;
 
 import org.apache.commons.logging.Log;
@@ -53,6 +52,7 @@ import org.apache.hadoop.io.serializer.D
 import org.apache.hadoop.io.serializer.SerializationFactory;
 import org.apache.hadoop.mapred.IFile.Writer;
 import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.FileSystemCounter;
 import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.TaskCounter;
 import org.apache.hadoop.mapreduce.JobStatus;
@@ -63,7 +63,6 @@ import org.apache.hadoop.mapreduce.util.
 import org.apache.hadoop.mapreduce.util.ResourceCalculatorPlugin.*;
 import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
 import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.util.Progress;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.ReflectionUtils;
@@ -79,16 +78,6 @@ abstract public class Task implements Wr
     LogFactory.getLog(Task.class);
 
   public static String MERGED_OUTPUT_PREFIX = ".merged";
-
-  /**
-   * Counters to measure the usage of the different file systems.
-   * Always return the String array with two elements. First one is the name of  
-   * BYTES_READ counter and second one is of the BYTES_WRITTEN counter.
-   */
-  protected static String[] getFileSystemCounterNames(String uriScheme) {
-    String scheme = uriScheme.toUpperCase();
-    return new String[]{scheme+"_BYTES_READ", scheme+"_BYTES_WRITTEN"};
-  }
   
   /**
    * Name of the FileSystem counters' group
@@ -805,37 +794,41 @@ abstract public class Task implements Wr
    * system and only creates the counters when they are needed.
    */
   class FileSystemStatisticUpdater {
-    private long prevReadBytes = 0;
-    private long prevWriteBytes = 0;
     private FileSystem.Statistics stats;
-    private Counters.Counter readCounter = null;
-    private Counters.Counter writeCounter = null;
-    private String[] counterNames;
+    private Counters.Counter readBytesCounter, writeBytesCounter,
+        readOpsCounter, largeReadOpsCounter, writeOpsCounter;
     
-    FileSystemStatisticUpdater(String uriScheme, FileSystem.Statistics stats) {
+    FileSystemStatisticUpdater(FileSystem.Statistics stats) {
       this.stats = stats;
-      this.counterNames = getFileSystemCounterNames(uriScheme);
     }
 
     void updateCounters() {
-      long newReadBytes = stats.getBytesRead();
-      long newWriteBytes = stats.getBytesWritten();
-      if (prevReadBytes != newReadBytes) {
-        if (readCounter == null) {
-          readCounter = counters.findCounter(FILESYSTEM_COUNTER_GROUP, 
-              counterNames[0]);
-        }
-        readCounter.increment(newReadBytes - prevReadBytes);
-        prevReadBytes = newReadBytes;
-      }
-      if (prevWriteBytes != newWriteBytes) {
-        if (writeCounter == null) {
-          writeCounter = counters.findCounter(FILESYSTEM_COUNTER_GROUP, 
-              counterNames[1]);
-        }
-        writeCounter.increment(newWriteBytes - prevWriteBytes);
-        prevWriteBytes = newWriteBytes;
+      String scheme = stats.getScheme();
+      if (readBytesCounter == null) {
+        readBytesCounter = counters.findCounter(scheme,
+            FileSystemCounter.BYTES_READ);
+      }
+      readBytesCounter.setValue(stats.getBytesRead());
+      if (writeBytesCounter == null) {
+        writeBytesCounter = counters.findCounter(scheme,
+            FileSystemCounter.BYTES_WRITTEN);
+      }
+      writeBytesCounter.setValue(stats.getBytesWritten());
+      if (readOpsCounter == null) {
+        readOpsCounter = counters.findCounter(scheme,
+            FileSystemCounter.READ_OPS);
+      }
+      readOpsCounter.setValue(stats.getReadOps());
+      if (largeReadOpsCounter == null) {
+        largeReadOpsCounter = counters.findCounter(scheme,
+            FileSystemCounter.LARGE_READ_OPS);
+      }
+      largeReadOpsCounter.setValue(stats.getLargeReadOps());
+      if (writeOpsCounter == null) {
+        writeOpsCounter = counters.findCounter(scheme,
+            FileSystemCounter.WRITE_OPS);
       }
+      writeOpsCounter.setValue(stats.getWriteOps());
     }
   }
   
@@ -850,7 +843,7 @@ abstract public class Task implements Wr
       String uriScheme = stat.getScheme();
       FileSystemStatisticUpdater updater = statisticUpdaters.get(uriScheme);
       if(updater==null) {//new FileSystem has been found in the cache
-        updater = new FileSystemStatisticUpdater(uriScheme, stat);
+        updater = new FileSystemStatisticUpdater(stat);
         statisticUpdaters.put(uriScheme, updater);
       }
       updater.updateCounters();      

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskInProgress.java?rev=1079197&r1=1079196&r2=1079197&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskInProgress.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskInProgress.java Tue Mar  8 05:54:43 2011
@@ -106,9 +106,9 @@ class TaskInProgress {
   private FailedRanges failedRanges = new FailedRanges();
   private volatile boolean skipping = false;
 
-  private static Enum CPU_COUNTER_KEY = TaskCounter.CPU_MILLISECONDS;
-  private static Enum VM_BYTES_KEY = TaskCounter.VIRTUAL_MEMORY_BYTES;
-  private static Enum PHYSICAL_BYTES_KEY = TaskCounter.PHYSICAL_MEMORY_BYTES;
+  static final Enum<?> CPU_COUNTER_KEY = TaskCounter.CPU_MILLISECONDS;
+  static final Enum<?> VM_BYTES_KEY = TaskCounter.VIRTUAL_MEMORY_BYTES;
+  static final Enum<?> PHYSICAL_BYTES_KEY = TaskCounter.PHYSICAL_MEMORY_BYTES;
    
   // The 'next' usable taskid of this tip
   int nextTaskId = 0;
@@ -226,13 +226,10 @@ class TaskInProgress {
   }
 
   private void updateProgressSplits(TaskStatus taskStatus) {
-    if (!taskStatus.getIncludeCounters()) {
-      return;
-    }
-
     double newProgress = taskStatus.getProgress();
 
     Counters counters = taskStatus.getCounters();
+    if (counters == null) return;
 
     TaskAttemptID statusAttemptID = taskStatus.getTaskID();
     ProgressSplitsBlock splitsBlock = getSplits(statusAttemptID);
@@ -1079,7 +1076,7 @@ class TaskInProgress {
           if (status.getProgress() >= bestProgress) {
             bestProgress = status.getProgress();
             bestState = status.getStateString();
-            if (status.getIncludeCounters()) {
+            if (status.getIncludeAllCounters()) {
               bestCounters = status.getCounters();
             } else {
               bestCounters = this.counters;

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskStatus.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskStatus.java?rev=1079197&r1=1079196&r2=1079197&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskStatus.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskStatus.java Tue Mar  8 05:54:43 2011
@@ -71,7 +71,7 @@ public abstract class TaskStatus impleme
     
   private volatile Phase phase = Phase.STARTING; 
   private Counters counters;
-  private boolean includeCounters;
+  private boolean includeAllCounters;
   private SortedRanges.Range nextRecordRange = new SortedRanges.Range();
   
   // max task-status string size
@@ -105,7 +105,7 @@ public abstract class TaskStatus impleme
     this.taskTracker = taskTracker;
     this.phase = phase;
     this.counters = counters;
-    this.includeCounters = true;
+    this.includeAllCounters = true;
   }
   
   public TaskAttemptID getTaskID() { return taskid; }
@@ -317,12 +317,13 @@ public abstract class TaskStatus impleme
       this.runState == TaskStatus.State.KILLED_UNCLEAN));
   }
   
-  public boolean getIncludeCounters() {
-    return includeCounters; 
+  public boolean getIncludeAllCounters() {
+    return includeAllCounters;
   }
   
-  public void setIncludeCounters(boolean send) {
-    includeCounters = send;
+  public void setIncludeAllCounters(boolean send) {
+    includeAllCounters = send;
+    counters.setWriteAllCounters(send);
   }
   
   /**
@@ -471,11 +472,9 @@ public abstract class TaskStatus impleme
     WritableUtils.writeEnum(out, phase);
     out.writeLong(startTime);
     out.writeLong(finishTime);
-    out.writeBoolean(includeCounters);
+    out.writeBoolean(includeAllCounters);
     out.writeLong(outputSize);
-    if (includeCounters) {
-      counters.write(out);
-    }
+    counters.write(out);
     nextRecordRange.write(out);
   }
 
@@ -490,11 +489,9 @@ public abstract class TaskStatus impleme
     this.startTime = in.readLong(); 
     this.finishTime = in.readLong(); 
     counters = new Counters();
-    this.includeCounters = in.readBoolean();
+    this.includeAllCounters = in.readBoolean();
     this.outputSize = in.readLong();
-    if (includeCounters) {
-      counters.readFields(in);
-    }
+    counters.readFields(in);
     nextRecordRange.readFields(in);
   }
   

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=1079197&r1=1079196&r2=1079197&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskTracker.java Tue Mar  8 05:54:43 2011
@@ -1578,13 +1578,13 @@ public class TaskTracker 
    */
   HeartbeatResponse transmitHeartBeat(long now) throws IOException {
     // Send Counters in the status once every COUNTER_UPDATE_INTERVAL
-    boolean sendCounters;
+    boolean sendAllCounters;
     if (now > (previousUpdate + COUNTER_UPDATE_INTERVAL)) {
-      sendCounters = true;
+      sendAllCounters = true;
       previousUpdate = now;
     }
     else {
-      sendCounters = false;
+      sendAllCounters = false;
     }
 
     // 
@@ -1597,7 +1597,7 @@ public class TaskTracker 
         status = new TaskTrackerStatus(taskTrackerName, localHostname, 
                                        httpPort, 
                                        cloneAndResetRunningTaskStatuses(
-                                         sendCounters), 
+                                         sendAllCounters),
                                        failures, 
                                        maxMapSlots,
                                        maxReduceSlots); 
@@ -3440,10 +3440,10 @@ public class TaskTracker 
     List<TaskStatus> result = new ArrayList<TaskStatus>(runningTasks.size());
     for(TaskInProgress tip: runningTasks.values()) {
       TaskStatus status = tip.getStatus();
-      status.setIncludeCounters(sendCounters);
+      status.setIncludeAllCounters(sendCounters);
       // send counters for finished or failed tasks and commit pending tasks
       if (status.getRunState() != TaskStatus.State.RUNNING) {
-        status.setIncludeCounters(true);
+        status.setIncludeAllCounters(true);
       }
       result.add((TaskStatus)status.clone());
       status.clearStatus();

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/Counter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/Counter.java?rev=1079197&r1=1079196&r2=1079197&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/Counter.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/Counter.java Tue Mar  8 05:54:43 2011
@@ -18,137 +18,58 @@
 
 package org.apache.hadoop.mapreduce;
 
-import java.io.IOException;
-import java.io.DataInput;
-import java.io.DataOutput;
-
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
 
 /**
  * A named counter that tracks the progress of a map/reduce job.
- * 
- * <p><code>Counters</code> represent global counters, defined either by the 
+ *
+ * <p><code>Counters</code> represent global counters, defined either by the
  * Map-Reduce framework or applications. Each <code>Counter</code> is named by
  * an {@link Enum} and has a long for the value.</p>
- * 
+ *
  * <p><code>Counters</code> are bunched into Groups, each comprising of
- * counters from a particular <code>Enum</code> class. 
+ * counters from a particular <code>Enum</code> class.
  */
 @InterfaceAudience.Public
 @InterfaceStability.Stable
-public class Counter implements Writable {
+public interface Counter extends Writable {
 
-  private String name;
-  private String displayName;
-  private long value = 0;
-    
-  protected Counter() { 
-  }
-
-  protected Counter(String name, String displayName) {
-    this.name = name;
-    this.displayName = displayName;
-  }
-  
-  /** Create a counter.
-   * @param name the name within the group's enum.
-   * @param displayName a name to be displayed.
-   * @param value the counter value.
-   */
-  public Counter(String name, String displayName, long value) {
-    this.name = name;
-    this.displayName = displayName;
-    this.value = value;
-  }
-  
+  /**
+   * Set the display name of the counter
+   * @param displayName of the counter
+   * @deprecated (and no-op by default)
+   */
   @Deprecated
-  protected synchronized void setDisplayName(String displayName) {
-    this.displayName = displayName;
-  }
-    
-  /**
-   * Read the binary representation of the counter
-   */
-  @Override
-  public synchronized void readFields(DataInput in) throws IOException {
-    name = Text.readString(in);
-    if (in.readBoolean()) {
-      displayName = Text.readString(in);
-    } else {
-      displayName = name;
-    }
-    value = WritableUtils.readVLong(in);
-  }
-    
-  /**
-   * Write the binary representation of the counter
-   */
-  @Override
-  public synchronized void write(DataOutput out) throws IOException {
-    Text.writeString(out, name);
-    boolean distinctDisplayName = ! name.equals(displayName);
-    out.writeBoolean(distinctDisplayName);
-    if (distinctDisplayName) {
-      Text.writeString(out, displayName);
-    }
-    WritableUtils.writeVLong(out, value);
-  }
-
-  public synchronized String getName() {
-    return name;
-  }
+  void setDisplayName(String displayName);
 
   /**
-   * Get the name of the counter.
+   * @return the name of the counter
+   */
+  String getName();
+
+  /**
+   * Get the display name of the counter.
    * @return the user facing name of the counter
    */
-  public synchronized String getDisplayName() {
-    return displayName;
-  }
-    
+  String getDisplayName();
+
   /**
    * What is the current value of this counter?
    * @return the current value
    */
-  public synchronized long getValue() {
-    return value;
-  }
+  long getValue();
 
   /**
    * Set this counter by the given value
    * @param value the value to set
    */
-  public synchronized void setValue(long value) {
-    this.value = value;
-  }
+  void setValue(long value);
 
   /**
    * Increment this counter by the given value
    * @param incr the value to increase this counter by
    */
-  public synchronized void increment(long incr) {
-    value += incr;
-  }
-
-  @Override
-  public synchronized boolean equals(Object genericRight) {
-    if (genericRight instanceof Counter) {
-      synchronized (genericRight) {
-        Counter right = (Counter) genericRight;
-        return name.equals(right.name) && 
-               displayName.equals(right.displayName) &&
-               value == right.value;
-      }
-    }
-    return false;
-  }
-  
-  @Override
-  public synchronized int hashCode() {
-    return name.hashCode() + displayName.hashCode();
-  }
+  void increment(long incr);
 }

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/CounterGroup.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/CounterGroup.java?rev=1079197&r1=1079196&r2=1079197&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/CounterGroup.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/CounterGroup.java Tue Mar  8 05:54:43 2011
@@ -18,19 +18,9 @@
 
 package org.apache.hadoop.mapreduce;
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.MissingResourceException;
-import java.util.ResourceBundle;
-import java.util.TreeMap;
-
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapreduce.counters.CounterGroupBase;
 
 /**
  * A group of {@link Counter}s that logically belong together. Typically,
@@ -38,156 +28,6 @@ import org.apache.hadoop.io.WritableUtil
  */
 @InterfaceAudience.Public
 @InterfaceStability.Stable
-public class CounterGroup implements Writable, Iterable<Counter> {
-  private String name;
-  private String displayName;
-  private TreeMap<String, Counter> counters = new TreeMap<String, Counter>();
-  // Optional ResourceBundle for localization of group and counter names.
-  private ResourceBundle bundle = null;    
-  
-  /**
-   * Returns the specified resource bundle, or throws an exception.
-   * @throws MissingResourceException if the bundle isn't found
-   */
-  private static ResourceBundle getResourceBundle(String enumClassName) {
-    String bundleName = enumClassName.replace('$','_');
-    return ResourceBundle.getBundle(bundleName);
-  }
-
-  protected CounterGroup(String name) {
-    this.name = name;
-    try {
-      bundle = getResourceBundle(name);
-    }
-    catch (MissingResourceException neverMind) {
-    }
-    displayName = localize("CounterGroupName", name);
-  }
-  
-  /** Create a CounterGroup.
-   * @param name the name of the group's enum.
-   * @param displayName a name to be displayed for the group.
-   */
-  public CounterGroup(String name, String displayName) {
-    this.name = name;
-    this.displayName = displayName;
-  }
- 
-  /**
-   * Get the internal name of the group
-   * @return the internal name
-   */
-  public synchronized String getName() {
-    return name;
-  }
-  
-  /**
-   * Get the display name of the group.
-   * @return the human readable name
-   */
-  public synchronized String getDisplayName() {
-    return displayName;
-  }
-
-  /** Add a counter to this group. */
-  public synchronized void addCounter(Counter counter) {
-    counters.put(counter.getName(), counter);
-  }
-
-  /**
-   * Find a counter in a group.
-   * @param counterName the name of the counter
-   * @param displayName the display name of the counter
-   * @return the counter that was found or added
-   */
-  public Counter findCounter(String counterName, String displayName) {
-    Counter result = counters.get(counterName);
-    if (result == null) {
-      result = new Counter(counterName, displayName);
-      counters.put(counterName, result);
-    }
-    return result;
-  }
-
-  public synchronized Counter findCounter(String counterName) {
-    Counter result = counters.get(counterName);
-    if (result == null) {
-      String displayName = localize(counterName, counterName);
-      result = new Counter(counterName, displayName);
-      counters.put(counterName, result);
-    }
-    return result;
-  }
-  
-  public synchronized Iterator<Counter> iterator() {
-    return counters.values().iterator();
-  }
-
-  public synchronized void write(DataOutput out) throws IOException {
-    Text.writeString(out, displayName);
-    WritableUtils.writeVInt(out, counters.size());
-    for(Counter counter: counters.values()) {
-      counter.write(out);
-    }
-  }
-  
-  public synchronized void readFields(DataInput in) throws IOException {
-    displayName = Text.readString(in);
-    counters.clear();
-    int size = WritableUtils.readVInt(in);
-    for(int i=0; i < size; i++) {
-      Counter counter = new Counter();
-      counter.readFields(in);
-      counters.put(counter.getName(), counter);
-    }
-  }
-
-  /**
-   * Looks up key in the ResourceBundle and returns the corresponding value.
-   * If the bundle or the key doesn't exist, returns the default value.
-   */
-  private String localize(String key, String defaultValue) {
-    String result = defaultValue;
-    if (bundle != null) {
-      try {
-        result = bundle.getString(key);
-      }
-      catch (MissingResourceException mre) {
-      }
-    }
-    return result;
-  }
-
-  /**
-   * Returns the number of counters in this group.
-   */
-  public synchronized int size() {
-    return counters.size();
-  }
-
-  public synchronized boolean equals(Object genericRight) {
-    if (genericRight instanceof CounterGroup) {
-      Iterator<Counter> right = ((CounterGroup) genericRight).counters.
-                                       values().iterator();
-      Iterator<Counter> left = counters.values().iterator();
-      while (left.hasNext()) {
-        if (!right.hasNext() || !left.next().equals(right.next())) {
-          return false;
-        }
-      }
-      return !right.hasNext();
-    }
-    return false;
-  }
-
-  public synchronized int hashCode() {
-    return counters.hashCode();
-  }
-  
-  public synchronized void incrAllCounters(CounterGroup rightGroup) {
-    for(Counter right: rightGroup.counters.values()) {
-      Counter left = findCounter(right.getName(), right.getDisplayName());
-      left.increment(right.getValue());
-    }
-  }
+public interface CounterGroup extends CounterGroupBase<Counter> {
+  // essentially a typedef so user doesn't have to use generic syntax
 }

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/Counters.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/Counters.java?rev=1079197&r1=1079196&r2=1079197&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/Counters.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/Counters.java Tue Mar  8 05:54:43 2011
@@ -17,200 +17,121 @@
  */
 package org.apache.hadoop.mapreduce;
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.IdentityHashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.TreeMap;
-
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-
+import org.apache.hadoop.mapreduce.counters.Limits;
+import org.apache.hadoop.mapreduce.counters.GenericCounter;
+import org.apache.hadoop.mapreduce.counters.AbstractCounterGroup;
+import org.apache.hadoop.mapreduce.counters.CounterGroupBase;
+import org.apache.hadoop.mapreduce.counters.FileSystemCounterGroup;
+import org.apache.hadoop.mapreduce.counters.AbstractCounters;
+import org.apache.hadoop.mapreduce.counters.CounterGroupFactory;
+import org.apache.hadoop.mapreduce.counters.FrameworkCounterGroup;
+
+/**
+ * <p><code>Counters</code> holds per job/task counters, defined either by the
+ * Map-Reduce framework or applications. Each <code>Counter</code> can be of
+ * any {@link Enum} type.</p>
+ *
+ * <p><code>Counters</code> are bunched into {@link CounterGroup}s, each
+ * comprising of counters from a particular <code>Enum</code> class.
+ */
 @InterfaceAudience.Public
 @InterfaceStability.Stable
-public class Counters implements Writable,Iterable<CounterGroup> {
-  /**
-   * A cache from enum values to the associated counter. Dramatically speeds up
-   * typical usage.
-   */
-  private Map<Enum<?>, Counter> cache = new IdentityHashMap<Enum<?>, Counter>();
+public class Counters extends AbstractCounters<Counter, CounterGroup> {
 
-  private TreeMap<String, CounterGroup> groups = 
-      new TreeMap<String, CounterGroup>();
-  
-  public Counters() {
-  }
-  
-  /**
-   * Utility method to  create a Counters object from the 
-   * org.apache.hadoop.mapred counters
-   * @param counters
-   */
-  public Counters(org.apache.hadoop.mapred.Counters counters) {
-    for(org.apache.hadoop.mapred.Counters.Group group: counters) {
-      String name = group.getName();
-      CounterGroup newGroup = new CounterGroup(name, group.getDisplayName());
-      groups.put(name, newGroup);
-      for(Counter counter: group) {
-        newGroup.addCounter(counter);
-      }
+  // Mix framework group implementation into CounterGroup interface
+  private static class FrameworkGroupImpl<T extends Enum<T>>
+      extends FrameworkCounterGroup<T, Counter> implements CounterGroup {
+
+    FrameworkGroupImpl(Class<T> cls) {
+      super(cls);
     }
-  }
 
-  /** Add a group. */
-  public void addGroup(CounterGroup group) {
-    groups.put(group.getName(), group);
+    @Override
+    protected FrameworkCounter newCounter(T key) {
+      return new FrameworkCounter(key);
+    }
   }
 
-  public Counter findCounter(String groupName, String counterName) {
-    CounterGroup grp = getGroup(groupName);
-    return grp.findCounter(counterName);
-  }
+  // Mix generic group implementation into CounterGroup interface
+  // and provide some mandatory group factory methods.
+  private static class GenericGroup extends AbstractCounterGroup<Counter>
+      implements CounterGroup {
 
-  /**
-   * Find the counter for the given enum. The same enum will always return the
-   * same counter.
-   * @param key the counter key
-   * @return the matching counter object
-   */
-  public synchronized Counter findCounter(Enum<?> key) {
-    Counter counter = cache.get(key);
-    if (counter == null) {
-      counter = findCounter(key.getDeclaringClass().getName(), key.toString());
-      cache.put(key, counter);
+    GenericGroup(String name, String displayName, Limits limits) {
+      super(name, displayName, limits);
     }
-    return counter;    
-  }
 
-  /**
-   * Returns the names of all counter classes.
-   * @return Set of counter names.
-   */
-  public synchronized Collection<String> getGroupNames() {
-    return groups.keySet();
-  }
+    @Override
+    protected Counter newCounter(String name, String displayName, long value) {
+      return new GenericCounter(name, displayName, value);
+    }
 
-  @Override
-  public Iterator<CounterGroup> iterator() {
-    return groups.values().iterator();
+    @Override
+    protected Counter newCounter() {
+      return new GenericCounter();
+    }
   }
 
-  /**
-   * Returns the named counter group, or an empty group if there is none
-   * with the specified name.
-   */
-  public synchronized CounterGroup getGroup(String groupName) {
-    CounterGroup grp = groups.get(groupName);
-    if (grp == null) {
-      grp = new CounterGroup(groupName);
-      groups.put(groupName, grp);
+  // Mix file system group implementation into the CounterGroup interface
+  private static class FileSystemGroup extends FileSystemCounterGroup<Counter>
+      implements CounterGroup {
+
+    @Override
+    protected Counter newCounter(String scheme, FileSystemCounter key) {
+      return new FSCounter(scheme, key);
     }
-    return grp;
   }
 
   /**
-   * Returns the total number of counters, by summing the number of counters
-   * in each group.
+   * Provide factory methods for counter group factory implementation.
+   * See also the GroupFactory in
+   *  {@link org.apache.hadoop.mapred.Counters mapred.Counters}
    */
-  public synchronized  int countCounters() {
-    int result = 0;
-    for (CounterGroup group : this) {
-      result += group.size();
+  private static class GroupFactory
+      extends CounterGroupFactory<Counter, CounterGroup> {
+
+    @Override
+    protected <T extends Enum<T>>
+    FrameworkGroupFactory<CounterGroup>
+        newFrameworkGroupFactory(final Class<T> cls) {
+      return new FrameworkGroupFactory<CounterGroup>() {
+        @Override public CounterGroup newGroup(String name) {
+          return new FrameworkGroupImpl<T>(cls); // impl in this package
+        }
+      };
     }
-    return result;
-  }
 
-  /**
-   * Write the set of groups.
-   * The external format is:
-   *     #groups (groupName group)*
-   *
-   * i.e. the number of groups followed by 0 or more groups, where each 
-   * group is of the form:
-   *
-   *     groupDisplayName #counters (false | true counter)*
-   *
-   * where each counter is of the form:
-   *
-   *     name (false | true displayName) value
-   */
-  @Override
-  public synchronized void write(DataOutput out) throws IOException {
-    out.writeInt(groups.size());
-    for (org.apache.hadoop.mapreduce.CounterGroup group: groups.values()) {
-      Text.writeString(out, group.getName());
-      group.write(out);
+    @Override
+    protected CounterGroup newGenericGroup(String name, String displayName,
+                                           Limits limits) {
+      return new GenericGroup(name, displayName, limits);
     }
-  }
-  
-  /**
-   * Read a set of groups.
-   */
-  @Override
-  public synchronized void readFields(DataInput in) throws IOException {
-    int numClasses = in.readInt();
-    groups.clear();
-    while (numClasses-- > 0) {
-      String groupName = Text.readString(in);
-      CounterGroup group = new CounterGroup(groupName);
-      group.readFields(in);
-      groups.put(groupName, group);
+
+    @Override
+    protected CounterGroup newFileSystemGroup() {
+      return new FileSystemGroup();
     }
   }
 
+  private static final GroupFactory groupFactory = new GroupFactory();
+
   /**
-   * Return textual representation of the counter values.
+   * Default constructor
    */
-  public synchronized String toString() {
-    StringBuilder sb = new StringBuilder("Counters: " + countCounters());
-    for (CounterGroup group: this) {
-      sb.append("\n\t" + group.getDisplayName());
-      for (Counter counter: group) {
-        sb.append("\n\t\t" + counter.getDisplayName() + "=" + 
-                  counter.getValue());
-      }
-    }
-    return sb.toString();
+  public Counters() {
+    super(groupFactory);
   }
 
   /**
-   * Increments multiple counters by their amounts in another Counters 
-   * instance.
-   * @param other the other Counters instance
+   * Construct the Counters object from the another counters object
+   * @param <C> the type of counter
+   * @param <G> the type of counter group
+   * @param counters the old counters object
    */
-  public synchronized void incrAllCounters(Counters other) {
-    for(Map.Entry<String, CounterGroup> rightEntry: other.groups.entrySet()) {
-      CounterGroup left = groups.get(rightEntry.getKey());
-      CounterGroup right = rightEntry.getValue();
-      if (left == null) {
-        left = new CounterGroup(right.getName(), right.getDisplayName());
-        groups.put(rightEntry.getKey(), left);
-      }
-      left.incrAllCounters(right);
-    }
-  }
-
-  public boolean equals(Object genericRight) {
-    if (genericRight instanceof Counters) {
-      Iterator<CounterGroup> right = ((Counters) genericRight).groups.
-                                       values().iterator();
-      Iterator<CounterGroup> left = groups.values().iterator();
-      while (left.hasNext()) {
-        if (!right.hasNext() || !left.next().equals(right.next())) {
-          return false;
-        }
-      }
-      return !right.hasNext();
-    }
-    return false;
-  }
-  
-  public int hashCode() {
-    return groups.hashCode();
+  public <C extends Counter, G extends CounterGroupBase<C>>
+  Counters(AbstractCounters<C, G> counters) {
+    super(counters, groupFactory);
   }
 }

Added: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/FileSystemCounter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/FileSystemCounter.java?rev=1079197&view=auto
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/FileSystemCounter.java (added)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/FileSystemCounter.java Tue Mar  8 05:54:43 2011
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+@InterfaceAudience.Private
+public enum FileSystemCounter {
+  BYTES_READ,
+  BYTES_WRITTEN,
+  READ_OPS,
+  LARGE_READ_OPS,
+  WRITE_OPS,
+}

Added: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/FileSystemCounter.properties
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/FileSystemCounter.properties?rev=1079197&view=auto
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/FileSystemCounter.properties (added)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/FileSystemCounter.properties Tue Mar  8 05:54:43 2011
@@ -0,0 +1,21 @@
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+
+# ResourceBundle properties file for job-level counters
+
+CounterGroupName=     File System Counters
+
+BYTES_READ.name=      Number of bytes read
+BYTES_WRITTEN.name=   Number of bytes written
+READ_OPS.name=        Number of read operations
+LARGE_READ_OPS.name=  Number of large read operations
+WRITE_OPS.name=       Number of write operations

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/MRJobConfig.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/MRJobConfig.java?rev=1079197&r1=1079196&r2=1079197&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/MRJobConfig.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/MRJobConfig.java Tue Mar  8 05:54:43 2011
@@ -286,4 +286,16 @@ public interface MRJobConfig {
     "mapreduce.ubertask.child.ulimit";     // or mapreduce.uber.ulimit?
   public static final String UBERTASK_ENV =
     "mapreduce.ubertask.child.env";        // or mapreduce.uber.env?
+
+  public static final String COUNTERS_MAX_KEY = "mapreduce.job.counters.max";
+  public static final int COUNTERS_MAX_DEFAULT = 120;
+
+  public static final String COUNTER_GROUP_NAME_MAX_KEY = "mapreduce.job.counters.group.name.max";
+  public static final int COUNTER_GROUP_NAME_MAX_DEFAULT = 128;
+
+  public static final String COUNTER_NAME_MAX_KEY = "mapreduce.job.counters.counter.name.max";
+  public static final int COUNTER_NAME_MAX_DEFAULT = 64;
+
+  public static final String COUNTER_GROUPS_MAX_KEY = "mapreduce.job.counters.groups.max";
+  public static final int COUNTER_GROUPS_MAX_DEFAULT = 50;
 }

Added: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/counters/AbstractCounter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/counters/AbstractCounter.java?rev=1079197&view=auto
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/counters/AbstractCounter.java (added)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/counters/AbstractCounter.java Tue Mar  8 05:54:43 2011
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.counters;
+
+import com.google.common.base.Objects;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.mapreduce.Counter;
+
+/**
+ * An abstract counter class to provide common implementation of
+ * the counter interface in both mapred and mapreduce packages.
+ */
+@InterfaceAudience.Private
+public abstract class AbstractCounter implements Counter {
+
+  @Override @Deprecated
+  public void setDisplayName(String name) {}
+
+  @Override
+  public synchronized boolean equals(Object genericRight) {
+    if (genericRight instanceof Counter) {
+      synchronized (genericRight) {
+        Counter right = (Counter) genericRight;
+        return getName().equals(right.getName()) &&
+               getDisplayName().equals(right.getDisplayName()) &&
+               getValue() == right.getValue();
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public synchronized int hashCode() {
+    return Objects.hashCode(getName(), getDisplayName(), getValue());
+  }
+}



Mime
View raw message