hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acmur...@apache.org
Subject svn commit: r1157290 [1/4] - in /hadoop/common/trunk/mapreduce: ./ src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/mapreduce/ src/java/org/apache/hadoop/mapreduce/counters/ src/java/org/apache/hadoop/mapreduce/jobhistory/ src/java/org/apa...
Date Fri, 12 Aug 2011 23:25:52 GMT
Author: acmurthy
Date: Fri Aug 12 23:25:51 2011
New Revision: 1157290

URL: http://svn.apache.org/viewvc?rev=1157290&view=rev
Log:
 MAPREDUCE-901. Efficient framework counters. Contributed by Luke Lu.

Added:
    hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/JobInProgress.java.orig
    hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/FileSystemCounter.java
    hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/FileSystemCounter.properties
    hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/counters/
    hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/counters/AbstractCounter.java
    hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/counters/AbstractCounterGroup.java
    hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/counters/AbstractCounters.java
    hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/counters/CounterGroupBase.java
    hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/counters/CounterGroupFactory.java
    hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/counters/FileSystemCounterGroup.java
    hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/counters/FrameworkCounterGroup.java
    hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/counters/GenericCounter.java
    hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/counters/LimitExceededException.java
    hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/counters/Limits.java
    hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/counters/package-info.java
    hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/util/CountersStrings.java
    hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/util/ResourceBundles.java
Modified:
    hadoop/common/trunk/mapreduce/CHANGES.txt
    hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/Counters.java
    hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java
    hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/Task.java
    hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/TaskInProgress.java
    hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/TaskStatus.java
    hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/TaskTracker.java
    hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/Counter.java
    hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/CounterGroup.java
    hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/Counters.java
    hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/JobCounter.properties
    hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/MRJobConfig.java
    hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/TaskCounter.properties
    hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/jobhistory/EventReader.java
    hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java
    hadoop/common/trunk/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestCombineOutputCollector.java
    hadoop/common/trunk/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestJobInProgress.java
    hadoop/common/trunk/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRDFSSort.java
    hadoop/common/trunk/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFS.java
    hadoop/common/trunk/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestSeveral.java
    hadoop/common/trunk/mapreduce/src/test/mapred/org/apache/hadoop/mapreduce/TestCounters.java

Modified: hadoop/common/trunk/mapreduce/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/CHANGES.txt?rev=1157290&r1=1157289&r2=1157290&view=diff
==============================================================================
--- hadoop/common/trunk/mapreduce/CHANGES.txt (original)
+++ hadoop/common/trunk/mapreduce/CHANGES.txt Fri Aug 12 23:25:51 2011
@@ -227,6 +227,8 @@ Trunk (unreleased changes)
     MAPREDUCE-2740. MultipleOutputs in new API creates needless
     TaskAttemptContexts. (todd)
 
+    MAPREDUCE-901. Efficient framework counters. (llu via acmurthy)
+
   BUG FIXES
 
     MAPREDUCE-2603. Disable High-Ram emulation in system tests. 

Modified: hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/Counters.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/Counters.java?rev=1157290&r1=1157289&r2=1157290&view=diff
==============================================================================
--- hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/Counters.java (original)
+++ hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/Counters.java Fri Aug
12 23:25:51 2011
@@ -18,20 +18,9 @@
 
 package org.apache.hadoop.mapred;
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
 import java.text.ParseException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.IdentityHashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.MissingResourceException;
-import java.util.ResourceBundle;
 
-import org.apache.commons.logging.*;
+import org.apache.commons.logging.Log;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.io.IntWritable;
@@ -40,426 +29,302 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.mapreduce.FileSystemCounter;
+import org.apache.hadoop.mapreduce.counters.AbstractCounterGroup;
+import org.apache.hadoop.mapreduce.counters.AbstractCounters;
+import org.apache.hadoop.mapreduce.counters.CounterGroupBase;
+import org.apache.hadoop.mapreduce.counters.CounterGroupFactory;
+import org.apache.hadoop.mapreduce.counters.FrameworkCounterGroup;
+import org.apache.hadoop.mapreduce.counters.FileSystemCounterGroup;
+import org.apache.hadoop.mapreduce.counters.GenericCounter;
+import org.apache.hadoop.mapreduce.counters.Limits;
+import static org.apache.hadoop.mapreduce.util.CountersStrings.*;
 
 /**
  * A set of named counters.
- * 
- * <p><code>Counters</code> represent global counters, defined either by
the 
+ *
+ * <p><code>Counters</code> represent global counters, defined either by
the
  * Map-Reduce framework or applications. Each <code>Counter</code> can be of
  * any {@link Enum} type.</p>
- * 
+ *
  * <p><code>Counters</code> are bunched into {@link Group}s, each comprising
of
- * counters from a particular <code>Enum</code> class. 
+ * counters from a particular <code>Enum</code> class.
  * @deprecated Use {@link org.apache.hadoop.mapreduce.Counters} instead.
  */
 @Deprecated
 @InterfaceAudience.Public
 @InterfaceStability.Stable
-public class Counters implements Writable, Iterable<Counters.Group> {
-  private static final Log LOG = LogFactory.getLog(Counters.class);
-  private static final char GROUP_OPEN = '{';
-  private static final char GROUP_CLOSE = '}';
-  private static final char COUNTER_OPEN = '[';
-  private static final char COUNTER_CLOSE = ']';
-  private static final char UNIT_OPEN = '(';
-  private static final char UNIT_CLOSE = ')';
-  private static char[] charsToEscape =  {GROUP_OPEN, GROUP_CLOSE, 
-                                          COUNTER_OPEN, COUNTER_CLOSE, 
-                                          UNIT_OPEN, UNIT_CLOSE};
-  
-  //private static Log log = LogFactory.getLog("Counters.class");
-  
+public class Counters
+    extends AbstractCounters<Counters.Counter, Counters.Group> {
+
+  public Counters() {
+    super(groupFactory);
+  }
+
+  public Counters(org.apache.hadoop.mapreduce.Counters newCounters) {
+    super(newCounters, groupFactory);
+  }
+
   /**
    * Downgrade new {@link org.apache.hadoop.mapreduce.Counters} to old Counters
    * @param newCounters new Counters
    * @return old Counters instance corresponding to newCounters
    */
   static Counters downgrade(org.apache.hadoop.mapreduce.Counters newCounters) {
-    Counters oldCounters = new Counters();
-    for (org.apache.hadoop.mapreduce.CounterGroup newGroup: newCounters) {
-      String groupName = newGroup.getName();
-      Group oldGroup = oldCounters.getGroup(groupName);
-      for (org.apache.hadoop.mapreduce.Counter newCounter: newGroup) {
-        Counter oldCounter = oldGroup.getCounterForName(newCounter.getName());
-        oldCounter.setDisplayName(newCounter.getDisplayName());
-        oldCounter.increment(newCounter.getValue());
-      }
-    }
-    return oldCounters;
+    return new Counters(newCounters);
   }
 
   /**
-   * A counter record, comprising its name and value. 
+   * A counter record, comprising its name and value.
    */
-  public static class Counter extends org.apache.hadoop.mapreduce.Counter {
-    
-    Counter() { 
-    }
+  public interface Counter extends org.apache.hadoop.mapreduce.Counter {
 
-    Counter(String name, String displayName, long value) {
-      super(name, displayName);
-      increment(value);
-    }
-    
-    public void setDisplayName(String newName) {
-      super.setDisplayName(newName);
-    }
-    
     /**
      * Returns the compact stringified version of the counter in the format
      * [(actual-name)(display-name)(value)]
+     * @return the stringified result
      */
-    public synchronized String makeEscapedCompactString() {
+    String makeEscapedCompactString();
 
-      // First up, obtain the strings that need escaping. This will help us
-      // determine the buffer length apriori.
-      String escapedName = escape(getName());
-      String escapedDispName = escape(getDisplayName());
-      long currentValue = this.getValue();
-      int length = escapedName.length() + escapedDispName.length() + 4;
-
-      length += 8; // For the following delimiting characters
-      StringBuilder builder = new StringBuilder(length);
-      builder.append(COUNTER_OPEN);
-      
-      // Add the counter name
-      builder.append(UNIT_OPEN);
-      builder.append(escapedName);
-      builder.append(UNIT_CLOSE);
-      
-      // Add the display name
-      builder.append(UNIT_OPEN);
-      builder.append(escapedDispName);
-      builder.append(UNIT_CLOSE);
-      
-      // Add the value
-      builder.append(UNIT_OPEN);
-      builder.append(currentValue);
-      builder.append(UNIT_CLOSE);
-      
-      builder.append(COUNTER_CLOSE);
-      
-      return builder.toString();
-    }
-    
-    // Checks for (content) equality of two (basic) counters
+    /**
+     * Checks for (content) equality of two (basic) counters
+     * @param counter to compare
+     * @return true if content equals
+     * @deprecated
+     */
     @Deprecated
-    synchronized boolean contentEquals(Counter c) {
-      return this.equals(c);
-    }
-    
+    boolean contentEquals(Counter counter);
+
     /**
-     * What is the current value of this counter?
-     * @return the current value
+     * @return the value of the counter
      */
-    public synchronized long getCounter() {
+    long getCounter();
+  }
+
+  static class OldCounterImpl extends GenericCounter implements Counter {
+
+    OldCounterImpl() {
+    }
+
+    OldCounterImpl(String name, String displayName, long value) {
+      super(name, displayName, value);
+    }
+
+    @Override
+    public synchronized String makeEscapedCompactString() {
+      return toEscapedCompactString(this);
+    }
+
+    @Override @Deprecated
+    public boolean contentEquals(Counter counter) {
+      return equals(counter);
+    }
+
+    @Override
+    public long getCounter() {
       return getValue();
     }
-    
   }
-  
+
   /**
-   *  <code>Group</code> of counters, comprising of counters from a particular

-   *  counter {@link Enum} class.  
+   *  <code>Group</code> of counters, comprising of counters from a particular
+   *  counter {@link Enum} class.
    *
-   *  <p><code>Group</code>handles localization of the class name and the

+   *  <p><code>Group</code>handles localization of the class name and the
    *  counter names.</p>
    */
-  public static class Group implements Writable, Iterable<Counter> {
-    private String groupName;
-    private String displayName;
-    private Map<String, Counter> subcounters = new HashMap<String, Counter>();
-    
-    // Optional ResourceBundle for localization of group and counter names.
-    private ResourceBundle bundle = null;    
-    
-    Group(String groupName) {
-      try {
-        bundle = getResourceBundle(groupName);
-      }
-      catch (MissingResourceException neverMind) {
-      }
-      this.groupName = groupName;
-      this.displayName = localize("CounterGroupName", groupName);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Creating group " + groupName + " with " +
-                  (bundle == null ? "nothing" : "bundle"));
-      }
-    }
-    
+  public static interface Group extends CounterGroupBase<Counter> {
+
     /**
-     * Returns the specified resource bundle, or throws an exception.
-     * @throws MissingResourceException if the bundle isn't found
+     * @param counterName the name of the counter
+     * @return the value of the specified counter, or 0 if the counter does
+     * not exist.
      */
-    private static ResourceBundle getResourceBundle(String enumClassName) {
-      String bundleName = enumClassName.replace('$','_');
-      return ResourceBundle.getBundle(bundleName);
-    }
-    
+    long getCounter(String counterName);
+
     /**
-     * Returns raw name of the group.  This is the name of the enum class
-     * for this group of counters.
+     * @return the compact stringified version of the group in the format
+     * {(actual-name)(display-name)(value)[][][]} where [] are compact strings
+     * for the counters within.
      */
-    public String getName() {
-      return groupName;
-    }
-    
+    String makeEscapedCompactString();
+
     /**
-     * Returns localized name of the group.  This is the same as getName() by
-     * default, but different if an appropriate ResourceBundle is found.
+     * Get the counter for the given id and create it if it doesn't exist.
+     * @param id the numeric id of the counter within the group
+     * @param name the internal counter name
+     * @return the counter
+     * @deprecated use {@link #findCounter(String)} instead
      */
-    public String getDisplayName() {
-      return displayName;
-    }
-    
+    @Deprecated
+    Counter getCounter(int id, String name);
+
     /**
-     * Set the display name
+     * Get the counter for the given name and create it if it doesn't exist.
+     * @param name the internal counter name
+     * @return the counter
      */
-    public void setDisplayName(String displayName) {
-      this.displayName = displayName;
+    Counter getCounterForName(String name);
+  }
+
+  // All the group impls need this for legacy group interface
+  static long getCounterValue(Group group, String counterName) {
+    Counter counter = group.findCounter(counterName, false);
+    if (counter != null) return counter.getValue();
+    return 0L;
+  }
+
+  // Mix the generic group implementation into the Group interface
+  private static class GenericGroup extends AbstractCounterGroup<Counter>
+                                    implements Group {
+
+    GenericGroup(String name, String displayName, Limits limits) {
+      super(name, displayName, limits);
     }
-    
-    /**
-     * Returns the compact stringified version of the group in the format
-     * {(actual-name)(display-name)(value)[][][]} where [] are compact strings for the
-     * counters within.
-     */
+
+    @Override
+    public long getCounter(String counterName) {
+      return getCounterValue(this, counterName);
+    }
+
+    @Override
     public String makeEscapedCompactString() {
-      String[] subcountersArray = new String[subcounters.size()];
+      return toEscapedCompactString(this);
+    }
 
-      // First up, obtain the strings that need escaping. This will help us
-      // determine the buffer length apriori.
-      String escapedName = escape(getName());
-      String escapedDispName = escape(getDisplayName());
-      int i = 0;
-      int length = escapedName.length() + escapedDispName.length();
-      for (Counter counter : subcounters.values()) {
-        String escapedStr = counter.makeEscapedCompactString();
-        subcountersArray[i++] = escapedStr;
-        length += escapedStr.length();
-      }
+    @Override
+    public Counter getCounter(int id, String name) {
+      return findCounter(name);
+    }
 
-      length += 6; // for all the delimiting characters below
-      StringBuilder builder = new StringBuilder(length);
-      builder.append(GROUP_OPEN); // group start
-      
-      // Add the group name
-      builder.append(UNIT_OPEN);
-      builder.append(escapedName);
-      builder.append(UNIT_CLOSE);
-      
-      // Add the display name
-      builder.append(UNIT_OPEN);
-      builder.append(escapedDispName);
-      builder.append(UNIT_CLOSE);
-      
-      // write the value
-      for(Counter counter: subcounters.values()) {
-        builder.append(counter.makeEscapedCompactString());
-      }
-      
-      builder.append(GROUP_CLOSE); // group end
-      return builder.toString();
+    @Override
+    public Counter getCounterForName(String name) {
+      return findCounter(name);
     }
 
     @Override
-    public int hashCode() {
-      return subcounters.hashCode();
+    protected Counter newCounter(String counterName, String displayName,
+                                 long value) {
+      return new OldCounterImpl(counterName, displayName, value);
     }
 
-    /** 
-     * Checks for (content) equality of Groups
-     */
     @Override
-    public boolean equals(Object obj) {
-      if (this == obj) {
-        return true;
+    protected Counter newCounter() {
+      return new OldCounterImpl();
+    }
+  }
+
+  // Mix the framework group implementation into the Group interface
+  private static class FrameworkGroupImpl<T extends Enum<T>>
+      extends FrameworkCounterGroup<T, Counter> implements Group {
+
+    // Mix the framework counter implmementation into the Counter interface
+    class FrameworkCounterImpl extends FrameworkCounter implements Counter {
+
+      FrameworkCounterImpl(T key) {
+        super(key);
       }
-      if (obj == null || obj.getClass() != getClass()) {
-        return false;
+
+      @Override
+      public String makeEscapedCompactString() {
+        return toEscapedCompactString(this);
       }
-      boolean isEqual = false;
-      Group g = (Group) obj;
-      synchronized (this) {
-        if (size() == g.size()) {
-          isEqual = true;
-          for (Map.Entry<String, Counter> entry : subcounters.entrySet()) {
-            String key = entry.getKey();
-            Counter c1 = entry.getValue();
-            Counter c2 = g.getCounterForName(key);
-            if (!c1.contentEquals(c2)) {
-              isEqual = false;
-              break;
-            }
-          }
-        }
+
+      @Override
+      public boolean contentEquals(Counter counter) {
+        return equals(counter);
       }
-      return isEqual;
-    }
-    
-    /**
-     * Returns the value of the specified counter, or 0 if the counter does
-     * not exist.
-     */
-    public synchronized long getCounter(String counterName) {
-      for(Counter counter: subcounters.values()) {
-        if (counter != null && counter.getDisplayName().equals(counterName)) {
-          return counter.getValue();
-        }
+
+      @Override
+      public long getCounter() {
+        return getValue();
       }
-      return 0L;
     }
-    
-    /**
-     * Get the counter for the given id and create it if it doesn't exist.
-     * @param id the numeric id of the counter within the group
-     * @param name the internal counter name
-     * @return the counter
-     * @deprecated use {@link #getCounter(String)} instead
-     */
-    @Deprecated
-    public synchronized Counter getCounter(int id, String name) {
-      return getCounterForName(name);
-    }
-    
-    /**
-     * Get the counter for the given name and create it if it doesn't exist.
-     * @param name the internal counter name
-     * @return the counter
-     */
-    public synchronized Counter getCounterForName(String name) {
-      Counter result = subcounters.get(name);
-      if (result == null) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Adding " + name);
-        }
-        result = new Counter(name, localize(name + ".name", name), 0L);
-        subcounters.put(name, result);
-      }
-      return result;
+
+    FrameworkGroupImpl(Class<T> cls) {
+      super(cls);
     }
-    
-    /**
-     * Returns the number of counters in this group.
-     */
-    public synchronized int size() {
-      return subcounters.size();
+
+    @Override
+    public long getCounter(String counterName) {
+      return getCounterValue(this, counterName);
     }
-    
-    /**
-     * Looks up key in the ResourceBundle and returns the corresponding value.
-     * If the bundle or the key doesn't exist, returns the default value.
-     */
-    private String localize(String key, String defaultValue) {
-      String result = defaultValue;
-      if (bundle != null) {
-        try {
-          result = bundle.getString(key);
-        }
-        catch (MissingResourceException mre) {
-        }
-      }
-      return result;
+
+    @Override
+    public String makeEscapedCompactString() {
+      return toEscapedCompactString(this);
     }
-    
-    public synchronized void write(DataOutput out) throws IOException {
-      Text.writeString(out, displayName);
-      WritableUtils.writeVInt(out, subcounters.size());
-      for(Counter counter: subcounters.values()) {
-        counter.write(out);
-      }
+
+    @Override @Deprecated
+    public Counter getCounter(int id, String name) {
+      return findCounter(name);
     }
-    
-    public synchronized void readFields(DataInput in) throws IOException {
-      displayName = Text.readString(in);
-      subcounters.clear();
-      int size = WritableUtils.readVInt(in);
-      for(int i=0; i < size; i++) {
-        Counter counter = new Counter();
-        counter.readFields(in);
-        subcounters.put(counter.getName(), counter);
-      }
+
+    @Override
+    public Counter getCounterForName(String name) {
+      return findCounter(name);
     }
 
-    public synchronized Iterator<Counter> iterator() {
-      return new ArrayList<Counter>(subcounters.values()).iterator();
+    @Override
+    protected Counter newCounter(T key) {
+      return new FrameworkCounterImpl(key);
     }
   }
-  
-  // Map from group name (enum class name) to map of int (enum ordinal) to
-  // counter record (name-value pair).
-  private Map<String,Group> counters = new HashMap<String, Group>();
 
-  /**
-   * A cache from enum values to the associated counter. Dramatically speeds up
-   * typical usage.
-   */
-  private Map<Enum, Counter> cache = new IdentityHashMap<Enum, Counter>();
-  
-  /**
-   * Returns the names of all counter classes.
-   * @return Set of counter names.
-   */
-  public synchronized Collection<String> getGroupNames() {
-    return counters.keySet();
-  }
+  // Mix the file system counter group implementation into the Group interface
+  private static class FSGroupImpl extends FileSystemCounterGroup<Counter>
+                                   implements Group {
 
-  public synchronized Iterator<Group> iterator() {
-    return counters.values().iterator();
-  }
+    private class FSCounterImpl extends FSCounter implements Counter {
 
-  /**
-   * Returns the named counter group, or an empty group if there is none
-   * with the specified name.
-   */
-  public synchronized Group getGroup(String groupName) {
-    Group result = counters.get(groupName);
+      FSCounterImpl(String scheme, FileSystemCounter key) {
+        super(scheme, key);
+      }
 
-    if (result == null) {
-      // To provide support for deprecated group names  
-      if (groupName.equals("org.apache.hadoop.mapred.Task$Counter")) {
-        LOG.warn("Group org.apache.hadoop.mapred.Task$Counter is deprecated." +
-                 " Use org.apache.hadoop.mapreduce.TaskCounter instead");
-        return getGroup("org.apache.hadoop.mapreduce.TaskCounter");
-      } 
+      @Override
+      public String makeEscapedCompactString() {
+        return toEscapedCompactString(this);
+      }
 
-      if (groupName.equals
-          ("org.apache.hadoop.mapred.JobInProgress$Counter")) {
-        LOG.warn("Group org.apache.hadoop.mapred.JobInProgress$Counter " +
-                 "is deprecated. Use " +
-                 "org.apache.hadoop.mapreduce.JobCounter instead");
-        return getGroup("org.apache.hadoop.mapreduce.JobCounter");
+      @Override @Deprecated
+      public boolean contentEquals(Counter counter) {
+        throw new UnsupportedOperationException("Not supported yet.");
+      }
+
+      @Override
+      public long getCounter() {
+        return getValue();
       }
 
-      result = new Group(groupName);
-      counters.put(groupName, result);
     }
 
-    return result;
-  }
+    @Override
+    protected Counter newCounter(String scheme, FileSystemCounter key) {
+      return new FSCounterImpl(scheme, key);
+    }
 
-  /**
-   * Find the counter for the given enum. The same enum will always return the
-   * same counter.
-   * @param key the counter key
-   * @return the matching counter object
-   */
-  public synchronized Counter findCounter(Enum key) {
-    Counter counter = cache.get(key);
-    if (counter == null) {
-      Group group = getGroup(key.getDeclaringClass().getName());
-      counter = group.getCounterForName(key.toString());
-      cache.put(key, counter);
+    @Override
+    public long getCounter(String counterName) {
+      return getCounterValue(this, counterName);
     }
-    return counter;    
+
+    @Override
+    public String makeEscapedCompactString() {
+      return toEscapedCompactString(this);
+    }
+
+    @Override @Deprecated
+    public Counter getCounter(int id, String name) {
+      return findCounter(name);
+    }
+
+    @Override
+    public Counter getCounterForName(String name) {
+      return findCounter(name);
+    }
+
   }
 
-  /**
-   * Find a counter given the group and the name.
-   * @param group the name of the group
-   * @param name the internal name of the counter
-   * @return the counter for that name
-   */
   public synchronized Counter findCounter(String group, String name) {
     if (name.equals("MAP_INPUT_BYTES")) {
       LOG.warn("Counter name MAP_INPUT_BYTES is deprecated. " +
@@ -471,15 +336,46 @@ public class Counters implements Writabl
   }
 
   /**
+   * Provide factory methods for counter group factory implementation.
+   * See also the GroupFactory in
+   *  {@link org.apache.hadoop.mapreduce.Counters mapreduce.Counters}
+   */
+  static class GroupFactory extends CounterGroupFactory<Counter, Group> {
+
+    @Override
+    protected <T extends Enum<T>>
+    FrameworkGroupFactory<Group> newFrameworkGroupFactory(final Class<T> cls)
{
+      return new FrameworkGroupFactory<Group>() {
+        @Override public Group newGroup(String name) {
+          return new FrameworkGroupImpl<T>(cls); // impl in this package
+        }
+      };
+    }
+
+    @Override
+    protected Group newGenericGroup(String name, String displayName,
+                                    Limits limits) {
+      return new GenericGroup(name, displayName, limits);
+    }
+
+    @Override
+    protected Group newFileSystemGroup() {
+      return new FSGroupImpl();
+    }
+  }
+
+  private static final GroupFactory groupFactory = new GroupFactory();
+
+  /**
    * Find a counter by using strings
    * @param group the name of the group
    * @param id the id of the counter within the group (0 to N-1)
    * @param name the internal name of the counter
    * @return the counter for that name
-   * @deprecated
+   * @deprecated use {@link findCounter(String, String)} instead
    */
   @Deprecated
-  public synchronized Counter findCounter(String group, int id, String name) {
+  public Counter findCounter(String group, int id, String name) {
     return findCounter(group, name);
   }
 
@@ -489,10 +385,10 @@ public class Counters implements Writabl
    * @param key identifies a counter
    * @param amount amount by which counter is to be incremented
    */
-  public synchronized void incrCounter(Enum key, long amount) {
+  public void incrCounter(Enum<?> key, long amount) {
     findCounter(key).increment(amount);
   }
-  
+
   /**
    * Increments the specified counter by the specified amount, creating it if
    * it didn't already exist.
@@ -500,27 +396,29 @@ public class Counters implements Writabl
    * @param counter the internal name of the counter
    * @param amount amount by which counter is to be incremented
    */
-  public synchronized void incrCounter(String group, String counter, long amount) {
+  public void incrCounter(String group, String counter, long amount) {
     findCounter(group, counter).increment(amount);
   }
-  
+
   /**
    * Returns current value of the specified counter, or 0 if the counter
    * does not exist.
+   * @param key the counter enum to lookup
+   * @return the counter value or 0 if counter not found
    */
-  public synchronized long getCounter(Enum key) {
+  public synchronized long getCounter(Enum<?> key) {
     return findCounter(key).getValue();
   }
-  
+
   /**
-   * Increments multiple counters by their amounts in another Counters 
+   * Increments multiple counters by their amounts in another Counters
    * instance.
    * @param other the other Counters instance
    */
   public synchronized void incrAllCounters(Counters other) {
     for (Group otherGroup: other) {
       Group group = getGroup(otherGroup.getName());
-      group.displayName = otherGroup.displayName;
+      group.setDisplayName(otherGroup.getDisplayName());
       for (Counter otherCounter : otherGroup) {
         Counter counter = group.getCounterForName(otherCounter.getName());
         counter.setDisplayName(otherCounter.getDisplayName());
@@ -530,7 +428,18 @@ public class Counters implements Writabl
   }
 
   /**
+   * @return the total number of counters
+   * @deprecated use {@link #countCounters()} instead
+   */
+  public int size() {
+    return countCounters();
+  }
+
+  /**
    * Convenience method for computing the sum of two sets of counters.
+   * @param a the first counters
+   * @param b the second counters
+   * @return a new summed counters object
    */
   public static Counters sum(Counters a, Counters b) {
     Counters counters = new Counters();
@@ -538,55 +447,7 @@ public class Counters implements Writabl
     counters.incrAllCounters(b);
     return counters;
   }
-  
-  /**
-   * Returns the total number of counters, by summing the number of counters
-   * in each group.
-   */
-  public synchronized  int size() {
-    int result = 0;
-    for (Group group : this) {
-      result += group.size();
-    }
-    return result;
-  }
-  
-  /**
-   * Write the set of groups.
-   * The external format is:
-   *     #groups (groupName group)*
-   *
-   * i.e. the number of groups followed by 0 or more groups, where each 
-   * group is of the form:
-   *
-   *     groupDisplayName #counters (false | true counter)*
-   *
-   * where each counter is of the form:
-   *
-   *     name (false | true displayName) value
-   */
-  public synchronized void write(DataOutput out) throws IOException {
-    out.writeInt(counters.size());
-    for (Group group: counters.values()) {
-      Text.writeString(out, group.getName());
-      group.write(out);
-    }
-  }
-  
-  /**
-   * Read a set of groups.
-   */
-  public synchronized void readFields(DataInput in) throws IOException {
-    int numClasses = in.readInt();
-    counters.clear();
-    while (numClasses-- > 0) {
-      String groupName = Text.readString(in);
-      Group group = new Group(groupName);
-      group.readFields(in);
-      counters.put(groupName, group);
-    }
-  }
-  
+
   /**
    * Logs the current counter values.
    * @param log The log to use.
@@ -596,212 +457,31 @@ public class Counters implements Writabl
     for(Group group: this) {
       log.info("  " + group.getDisplayName());
       for (Counter counter: group) {
-        log.info("    " + counter.getDisplayName() + "=" + 
+        log.info("    " + counter.getDisplayName() + "=" +
                  counter.getCounter());
-      }   
-    }
-  }
-  
-  /**
-   * Return textual representation of the counter values.
-   */
-  public synchronized String toString() {
-    StringBuilder sb = new StringBuilder("Counters: " + size());
-    for (Group group: this) {
-      sb.append("\n\t" + group.getDisplayName());
-      for (Counter counter: group) {
-        sb.append("\n\t\t" + counter.getDisplayName() + "=" + 
-                  counter.getCounter());
       }
     }
-    return sb.toString();
   }
 
   /**
-   * Convert a counters object into a single line that is easy to parse.
-   * @return the string with "name=value" for each counter and separated by ","
-   */
-  public synchronized String makeCompactString() {
-    StringBuffer buffer = new StringBuffer();
-    boolean first = true;
-    for(Group group: this){   
-      for(Counter counter: group) {
-        if (first) {
-          first = false;
-        } else {
-          buffer.append(',');
-        }
-        buffer.append(group.getDisplayName());
-        buffer.append('.');
-        buffer.append(counter.getDisplayName());
-        buffer.append(':');
-        buffer.append(counter.getCounter());
-      }
-    }
-    return buffer.toString();
-  }
-  
-  /**
-   * Represent the counter in a textual format that can be converted back to 
+   * Represent the counter in a textual format that can be converted back to
    * its object form
    * @return the string in the following format
-   * {(groupname)(group-displayname)[(countername)(displayname)(value)][][]}{}{}
+   * {(groupName)(group-displayName)[(counterName)(displayName)(value)][]*}*
    */
-  public synchronized String makeEscapedCompactString() {
-    String[] groupsArray = new String[counters.size()];
-    int i = 0;
-    int length = 0;
-
-    // First up, obtain the escaped string for each group so that we can
-    // determine the buffer length apriori.
-    for (Group group : this) {
-      String escapedString = group.makeEscapedCompactString();
-      groupsArray[i++] = escapedString;
-      length += escapedString.length();
-    }
-
-    // Now construct the buffer
-    StringBuilder builder = new StringBuilder(length);
-    for (String group : groupsArray) {
-      builder.append(group);
-    }
-    return builder.toString();
-  }
-
-  // Extracts a block (data enclosed within delimeters) ignoring escape 
-  // sequences. Throws ParseException if an incomplete block is found else 
-  // returns null.
-  private static String getBlock(String str, char open, char close, 
-                                IntWritable index) throws ParseException {
-    StringBuilder split = new StringBuilder();
-    int next = StringUtils.findNext(str, open, StringUtils.ESCAPE_CHAR, 
-                                    index.get(), split);
-    split.setLength(0); // clear the buffer
-    if (next >= 0) {
-      ++next; // move over '('
-      
-      next = StringUtils.findNext(str, close, StringUtils.ESCAPE_CHAR, 
-                                   next, split);
-      if (next >= 0) {
-        ++next; // move over ')'
-        index.set(next);
-        return split.toString(); // found a block
-      } else {
-        throw new ParseException("Unexpected end of block", next);
-      }
-    }
-    return null; // found nothing
+  public String makeEscapedCompactString() {
+    return toEscapedCompactString(this);
   }
-  
+
   /**
-   * Convert a stringified counter representation into a counter object. Note 
-   * that the counter can be recovered if its stringified using 
-   * {@link #makeEscapedCompactString()}. 
-   * @return a Counter
+   * Convert a stringified (by {@link #makeEscapedCompactString()} counter
+   * representation into a counter object.
+   * @param compactString to parse
+   * @return a new counters object
+   * @throws ParseException
    */
-  public static Counters fromEscapedCompactString(String compactString) 
-  throws ParseException {
-    Counters counters = new Counters();
-    IntWritable index = new IntWritable(0);
-    
-    // Get the group to work on
-    String groupString = 
-      getBlock(compactString, GROUP_OPEN, GROUP_CLOSE, index);
-    
-    while (groupString != null) {
-      IntWritable groupIndex = new IntWritable(0);
-      
-      // Get the actual name
-      String groupName = 
-        getBlock(groupString, UNIT_OPEN, UNIT_CLOSE, groupIndex);
-      groupName = unescape(groupName);
-      
-      // Get the display name
-      String groupDisplayName = 
-        getBlock(groupString, UNIT_OPEN, UNIT_CLOSE, groupIndex);
-      groupDisplayName = unescape(groupDisplayName);
-      
-      // Get the counters
-      Group group = counters.getGroup(groupName);
-      group.setDisplayName(groupDisplayName);
-      
-      String counterString = 
-        getBlock(groupString, COUNTER_OPEN, COUNTER_CLOSE, groupIndex);
-      
-      while (counterString != null) {
-        IntWritable counterIndex = new IntWritable(0);
-        
-        // Get the actual name
-        String counterName = 
-          getBlock(counterString, UNIT_OPEN, UNIT_CLOSE, counterIndex);
-        counterName = unescape(counterName);
-        
-        // Get the display name
-        String counterDisplayName = 
-          getBlock(counterString, UNIT_OPEN, UNIT_CLOSE, counterIndex);
-        counterDisplayName = unescape(counterDisplayName);
-        
-        // Get the value
-        long value = 
-          Long.parseLong(getBlock(counterString, UNIT_OPEN, UNIT_CLOSE, 
-                                  counterIndex));
-        
-        // Add the counter
-        Counter counter = group.getCounterForName(counterName);
-        counter.setDisplayName(counterDisplayName);
-        counter.increment(value);
-        
-        // Get the next counter
-        counterString = 
-          getBlock(groupString, COUNTER_OPEN, COUNTER_CLOSE, groupIndex);
-      }
-      
-      groupString = getBlock(compactString, GROUP_OPEN, GROUP_CLOSE, index);
-    }
-    return counters;
-  }
-
-  // Escapes all the delimiters for counters i.e {,[,(,),],}
-  private static String escape(String string) {
-    return StringUtils.escapeString(string, StringUtils.ESCAPE_CHAR, 
-                                    charsToEscape);
-  }
-  
-  // Unescapes all the delimiters for counters i.e {,[,(,),],}
-  private static String unescape(String string) {
-    return StringUtils.unEscapeString(string, StringUtils.ESCAPE_CHAR, 
-                                      charsToEscape);
-  }
-
-  @Override 
-  public synchronized int hashCode() {
-    return counters.hashCode();
-  }
-
-  @Override
-  public boolean equals(Object obj) {
-    if (this == obj) {
-      return true;
-    }
-    if (obj == null || obj.getClass() != getClass()) {
-      return false;
-    }
-    boolean isEqual = false;
-    Counters other = (Counters) obj;
-    synchronized (this) {
-      if (size() == other.size()) {
-        isEqual = true;
-        for (Map.Entry<String, Group> entry : this.counters.entrySet()) {
-          String key = entry.getKey();
-          Group sourceGroup = entry.getValue();
-          Group targetGroup = other.getGroup(key);
-          if (!sourceGroup.equals(targetGroup)) {
-            isEqual = false;
-            break;
-          }
-        }
-      }
-    }
-    return isEqual;
+  public static Counters fromEscapedCompactString(String compactString)
+      throws ParseException {
+    return parseEscapedCompactString(compactString, new Counters());
   }
 }

Modified: hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java?rev=1157290&r1=1157289&r2=1157290&view=diff
==============================================================================
--- hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java
(original)
+++ hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java
Fri Aug 12 23:25:51 2011
@@ -77,8 +77,10 @@ interface InterTrackerProtocol extends V
    * Version 29: Adding user name to the serialized Task for use by TT.
    * Version 30: Adding available memory and CPU usage information on TT to
    *             TaskTrackerStatus for MAPREDUCE-1218
+   * Version 31: Efficient serialization format for Framework counters
+   *             (MAPREDUCE-901)
    */             
-  public static final long versionID = 30L;
+  public static final long versionID = 31L;
   
   public final static int TRACKERS_OK = 0;
   public final static int UNKNOWN_TASKTRACKER = 1;

Modified: hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=1157290&r1=1157289&r2=1157290&view=diff
==============================================================================
--- hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/JobInProgress.java Fri
Aug 12 23:25:51 2011
@@ -53,6 +53,7 @@ import org.apache.hadoop.mapreduce.JobCo
 import org.apache.hadoop.mapreduce.JobSubmissionFiles;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.counters.LimitExceededException;
 import org.apache.hadoop.mapreduce.jobhistory.JobFinishedEvent;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistory;
 import org.apache.hadoop.mapreduce.jobhistory.JobInfoChangeEvent;
@@ -1250,6 +1251,11 @@ public class JobInProgress {
    * @return the job-level counters.
    */
   public synchronized Counters getJobCounters() {
+    try {
+      throw new IOException("");
+    } catch (IOException ioe) {
+      LOG.info("getJC", ioe);
+    }
     return jobCounters;
   }
   
@@ -1291,8 +1297,12 @@ public class JobInProgress {
    */
   private Counters incrementTaskCounters(Counters counters,
                                          TaskInProgress[] tips) {
-    for (TaskInProgress tip : tips) {
-      counters.incrAllCounters(tip.getCounters());
+    try {
+      for (TaskInProgress tip : tips) {
+        counters.incrAllCounters(tip.getCounters());
+      }
+    } catch (LimitExceededException e) {
+      // too many user counters/groups, leaving existing counters intact.
     }
     return counters;
   }
@@ -2748,6 +2758,9 @@ public class JobInProgress {
       retireMap(tip);
       if ((finishedMapTasks + failedMapTIPs) == (numMapTasks)) {
         this.status.setMapProgress(1.0f);
+        if (canLaunchJobCleanupTask()) {
+          checkCountersLimitsOrFail();
+        }
       }
     } else {
       runningReduceTasks -= 1;
@@ -2760,6 +2773,9 @@ public class JobInProgress {
       retireReduce(tip);
       if ((finishedReduceTasks + failedReduceTIPs) == (numReduceTasks)) {
         this.status.setReduceProgress(1.0f);
+        if (canLaunchJobCleanupTask()) {
+          checkCountersLimitsOrFail();
+        }
       }
     }
     decrementSpeculativeCount(wasSpeculating, tip);
@@ -2769,6 +2785,19 @@ public class JobInProgress {
     }
     return true;
   }
+
+  /*
+   * add up the counters and fail the job if it exceeds the limits.
+   * Make sure we do not recalculate the counters after we fail the job.
+   * Currently this is taken care by terminateJob() since it does not
+   * calculate the counters.
+   */
+  private void checkCountersLimitsOrFail() {
+    Counters counters = getCounters();
+    if (counters.limits().violation() != null) {
+      jobtracker.failJob(this);
+    }
+  }
   
   private void updateTaskTrackerStats(TaskInProgress tip, TaskTrackerStatus ttStatus, 
       Map<String,DataStatistics> trackerStats, DataStatistics overallStats) {



Mime
View raw message