hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From d...@apache.org
Subject svn commit: r604440 - in /lucene/hadoop/trunk: ./ src/java/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/
Date Sat, 15 Dec 2007 15:16:56 GMT
Author: ddas
Date: Sat Dec 15 07:16:55 2007
New Revision: 604440

URL: http://svn.apache.org/viewvc?rev=604440&view=rev
Log:
HADOOP-2248. Speeds up the framework w.r.t Counters. Also has API updates to the Counters
part. Contributed by Owen O'Malley.

Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Counters.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=604440&r1=604439&r2=604440&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Sat Dec 15 07:16:55 2007
@@ -120,6 +120,9 @@
     It now uses the map-reduce framework for load generation.
     (Mukund Madhugiri via dhruba)
 
+    HADOOP-2248. Speeds up the framework w.r.t Counters. Also has API
+    updates to the Counters part. (Owen O'Malley via ddas)
+
   OPTIMIZATIONS
 
     HADOOP-1898.  Release the lock protecting the last time of the last stack

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Counters.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Counters.java?rev=604440&r1=604439&r2=604440&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Counters.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Counters.java Sat Dec 15 07:16:55
2007
@@ -23,16 +23,18 @@
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
-import java.util.LinkedHashMap;
+import java.util.HashMap;
+import java.util.IdentityHashMap;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.MissingResourceException;
 import java.util.ResourceBundle;
-import java.util.TreeMap;
 
 import org.apache.commons.logging.*;
-import org.apache.hadoop.io.UTF8;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
 
 /**
  * A set of named counters.
@@ -44,24 +46,68 @@
  * <p><code>Counters</code> are bunched into {@link Group}s, each comprising
of
  * counters from a particular <code>Enum</code> class. 
  */
-public class Counters implements Writable {
+public class Counters implements Writable, Iterable<Counters.Group> {
+  private static final Log LOG = LogFactory.getLog(Counters.class);
   
   //private static Log log = LogFactory.getLog("Counters.class");
   
   /**
    * A counter record, comprising its name and value. 
    */
-  private static class CounterRec {
-    
-    public String name;
-    public long value;
+  public static class Counter implements Writable {
+
+    private String displayName;
+    private long value;
     
-    public CounterRec(String name, long value) {
-      this.name = name;
+    Counter() { 
+      value = 0L;
+    }
+
+    Counter(String displayName, long value) {
+      this.displayName = displayName;
       this.value = value;
     }
     
-  } // end class CounterRec
+    /**
+     * Read the binary representation of the counter
+     */
+    public synchronized void readFields(DataInput in) throws IOException {
+      displayName = Text.readString(in);
+      value = WritableUtils.readVLong(in);
+    }
+    
+    /**
+     * Write the binary representation of the counter
+     */
+    public synchronized void write(DataOutput out) throws IOException {
+      Text.writeString(out, displayName);
+      WritableUtils.writeVLong(out, value);
+    }
+    
+    /**
+     * Get the name of the counter.
+     * @return the user facing name of the counter
+     */
+    public String getDisplayName() {
+      return displayName;
+    }
+    
+    /**
+     * What is the current value of this counter?
+     * @return the current value
+     */
+    public synchronized long getCounter() {
+      return value;
+    }
+    
+    /**
+     * Increment this counter by the given value
+     * @param incr the value to increase this counter by
+     */
+    public synchronized void increment(long incr) {
+      value += incr;
+    }
+  }
   
   /**
    *  <code>Group</code> of counters, comprising of counters from a particular

@@ -70,31 +116,24 @@
    *  <p><code>Group</code>handles localization of the class name and the

    *  counter names.</p>
    */
-  public static class Group {
-    
-    // The group name is the fully qualified enum class name. 
+  public static class Group implements Writable, Iterable<Counter> {
     private String groupName;
+    private String displayName;
+    private ArrayList<Counter> subcounters = new ArrayList<Counter>();
     
     // Optional ResourceBundle for localization of group and counter names.
-    private ResourceBundle bundle = null;
-    
-    // Maps counter names to their current values.  Note that the iteration
-    // order of this Map is the same as the ordering of the Enum class in which 
-    // these counter names were defined.
-    private Map<String,Long> groupCounters = new LinkedHashMap<String,Long>();
-    
+    private ResourceBundle bundle = null;    
     
-    Group(String groupName, Collection<CounterRec> counters) {
-      this.groupName = groupName;
+    Group(String groupName) {
       try {
         bundle = getResourceBundle(groupName);
       }
       catch (MissingResourceException neverMind) {
       }
-      
-      for (CounterRec counter : counters) {
-        groupCounters.put(counter.name, counter.value);
-      }
+      this.groupName = groupName;
+      this.displayName = localize("CounterGroupName", groupName);
+      LOG.debug("Creating group " + groupName + " with " +
+               (bundle == null ? "nothing" : "bundle"));
     }
     
     /**
@@ -119,37 +158,81 @@
      * default, but different if an appropriate ResourceBundle is found.
      */
     public String getDisplayName() {
-      return localize("CounterGroupName", groupName);
+      return displayName;
     }
     
     /**
      * Returns localized name of the specified counter.
+     * @deprecated get the counter directly
      */
     public String getDisplayName(String counter) {
-      return localize(counter + ".name", counter);
+      return counter;
     }
     
     /**
      * Returns the counters for this group, with their names localized.
+     * @deprecated iterate through the group instead
      */
-    public Collection<String> getCounterNames() {
-      return groupCounters.keySet();
+    public synchronized Collection<String> getCounterNames() {
+      List<String> result = new ArrayList<String>();
+      for (Counter counter:subcounters) {
+        if (counter != null) {
+          result.add(counter.displayName);
+        }
+      }
+      return result;
     }
     
     /**
      * Returns the value of the specified counter, or 0 if the counter does
      * not exist.
+     * @deprecated
      */
-    public long getCounter(String counter) {
-      Long result = groupCounters.get(counter);
-      return (result == null ? 0L : result);
+    public synchronized long getCounter(String counterName) {
+      for(Counter counter: subcounters) {
+        if (counter != null && counter.displayName.equals(counterName)) {
+          return counter.value;
+        }
+      }
+      return 0L;
+    }
+    
+    /**
+     * Get the counter for the given id and create it if it doesn't exist.
+     * @param id the numeric id of the counter within the group
+     * @param name the internal counter name
+     * @return the counter
+     */
+    public synchronized Counter getCounter(int id, String name) {
+      Counter result = null;
+      int size = subcounters.size();
+      if (id < size) {
+        result = subcounters.get(id);
+      }
+      if (result == null) {
+        LOG.debug("Adding " + name + " at " + id);
+        result = new Counter(localize(name + ".name", name), 0L);
+        // extend the list
+        subcounters.ensureCapacity(id + 1);
+        for(int i=size; i <= id; ++i) {
+          subcounters.add(null);
+        }
+        subcounters.set(id, result);
+      }
+      return result;
     }
     
     /**
      * Returns the number of counters in this group.
      */
-    public int size() {
-      return groupCounters.size();
+    public synchronized int size() {
+      int num = 0;
+      for(Counter counter: subcounters) {
+        if (counter != null) {
+          num += 1;
+        }
+      }
+      return num;
     }
     
     /**
@@ -168,110 +251,160 @@
       return result;
     }
     
+    public synchronized void write(DataOutput out) throws IOException {
+      Text.writeString(out, displayName);
+      WritableUtils.writeVInt(out, subcounters.size());
+      for(Counter counter: subcounters) {
+        if (counter == null) {
+          out.writeBoolean(false);
+        } else {
+          out.writeBoolean(true);
+          counter.write(out);
+        }
+      }
+    }
     
-  } // end class Group
-  
+    public synchronized void readFields(DataInput in) throws IOException {
+      displayName = Text.readString(in);
+      subcounters.clear();
+      int size = WritableUtils.readVInt(in);
+      subcounters.ensureCapacity(size);
+      for(int i=0; i < size; i++) {
+        Counter counter = null;
+        if (in.readBoolean()) {
+          counter = new Counter();
+          counter.readFields(in);
+        }
+        subcounters.add(counter);
+      }
+    }
+    
+    private class CounterIterator implements Iterator<Counter> {
+      private int current = -1;
+
+      CounterIterator() {
+        getNext();
+      }
+
+      private void getNext() {
+        synchronized (Group.this) {
+          int len = subcounters.size();
+          while (++current < len) {
+            if (subcounters.get(current) != null) {
+              return;
+            }
+          }
+        }
+        current = Integer.MAX_VALUE;
+      }
+
+      public boolean hasNext() {
+        synchronized (Group.this) {
+          return current < subcounters.size();
+        }
+      }
+
+      public Counter next() {
+        synchronized (Group.this) {
+          int result = current;
+          getNext();
+          return subcounters.get(result);
+        }
+      }
+      
+      public void remove() {
+        throw new UnsupportedOperationException
+        ("NonNullIterator doesn't support remove");
+      }
+    }
+
+    public Iterator<Counter> iterator() {
+      return new CounterIterator();
+    }
+  }
   
   // Map from group name (enum class name) to map of int (enum ordinal) to
   // counter record (name-value pair).
-  private Map<String,Map<Integer,CounterRec>> counters =
-    new TreeMap<String,Map<Integer,CounterRec>>();
+  private Map<String,Group> counters = new HashMap<String, Group>();
+
+  /**
+   * A cache from enum values to the associated counter. Dramatically speeds up
+   * typical usage.
+   */
+  @SuppressWarnings("unchecked")
+  private Map<Enum, Counter> cache = new IdentityHashMap<Enum, Counter>();
   
   /**
    * Returns the names of all counter classes.
    * @return Set of counter names.
    */
   public synchronized Collection<String> getGroupNames() {
-    return new ArrayList<String>(counters.keySet());
+    return counters.keySet();
   }
-  
+
+  public synchronized Iterator<Group> iterator() {
+    return counters.values().iterator();
+  }
+
   /**
    * Returns the named counter group, or an empty group if there is none
    * with the specified name.
    */
   public synchronized Group getGroup(String groupName) {
-    Map<Integer,CounterRec> counterMap = counters.get(groupName);
-    Collection<CounterRec> groupCounters;
-    if (counterMap == null) {
-      groupCounters = Collections.emptySet();
+    Group result = counters.get(groupName);
+    if (result == null) {
+      result = new Group(groupName);
+      counters.put(groupName, result);
     }
-    else {
-      groupCounters = counterMap.values();
+    return result;
+  }
+
+  /**
+   * Find the counter for the given enum. The same enum will always return the
+   * same counter.
+   * @param key the counter key
+   * @return the matching counter object
+   */
+  @SuppressWarnings("unchecked")
+  public synchronized Counter findCounter(Enum key) {
+    Counter counter = cache.get(key);
+    if (counter == null) {
+      Group group = getGroup(key.getDeclaringClass().getName());
+      counter = group.getCounter(key.ordinal(), key.toString());
+      cache.put(key, counter);
     }
-    return new Group(groupName, groupCounters);
+    return counter;    
   }
-  
+
+  /**
+   * Find a counter by using strings
+   * @param group the name of the group
+   * @param id the id of the counter within the group (0 to N-1)
+   * @param name the internal name of the counter
+   * @return the counter for that name
+   */
+  public synchronized Counter findCounter(String group, int id, String name) {
+    return getGroup(group).getCounter(id, name);
+  }
+
   /**
    * Increments the specified counter by the specified amount, creating it if
    * it didn't already exist.
    * @param key identifies a counter
    * @param amount amount by which counter is to be incremented
    */
+  @SuppressWarnings("unchecked")
   public synchronized void incrCounter(Enum key, long amount) {
-    int ordinal = key.ordinal();
-    String counterName = key.toString();
-    String groupName = key.getDeclaringClass().getName();
-    Map<Integer,CounterRec> counterMap = getCounterMap(groupName);
-    CounterRec counter = getCounter(counterMap, counterName, ordinal);
-    counter.value += amount;
+    findCounter(key).value += amount;
   }
   
   /**
    * Returns current value of the specified counter, or 0 if the counter
    * does not exist.
    */
+  @SuppressWarnings("unchecked")
   public synchronized long getCounter(Enum key) {
-    long result = 0L;
-    String groupName = key.getDeclaringClass().getName();
-    Map<Integer,CounterRec> counterMap = counters.get(groupName);
-    if (counterMap != null) {
-      int ordinal = key.ordinal();
-      String name = key.toString();
-      CounterRec counter = counterMap.get(ordinal);
-      if (counter != null && counter.name.equals(name)) {
-        result = counter.value;
-      }
-      else {
-        // ordinal lookup failed, but maybe there is a matching name; this 
-        // could happen if e.g. a client has a different version of the Enum class.
-        for (CounterRec ctr : counterMap.values()) {
-          if (ctr.name.equals(name)) {
-            result = ctr.value;
-            break;
-          }
-        }
-      }
-    }
-    return result;
-  }
-  
-  /**
-   * Returns the counters for the specified counter class. The counters are
-   * returned as a map from ordinal number, so that their ordering in the 
-   * enum class declaration is preserved.
-   */
-  private Map<Integer,CounterRec> getCounterMap(String groupName) {
-    Map<Integer,CounterRec> map = counters.get(groupName);
-    if (map == null) {
-      map = new TreeMap<Integer,CounterRec>();
-      counters.put(groupName, map);
-    }
-    return map;
-  }
-
-  /**
-   * Returns the counter record with the specified name and ordinal by 
-   * finding or creating it in the specified counterMap.
-   */
-  private CounterRec getCounter(Map<Integer,CounterRec> counterMap, 
-                                String counterName, int ordinal)
-  {
-    CounterRec result = counterMap.get(ordinal);
-    if (result == null) {
-      result = new CounterRec(counterName, 0L);
-      counterMap.put(ordinal, result);
-    }
-    return result;
+    return findCounter(key).value;
   }
   
   /**
@@ -280,13 +413,14 @@
    * @param other the other Counters instance
    */
   public synchronized void incrAllCounters(Counters other) {
-    for (String groupName : other.counters.keySet()) {
-      Map<Integer,CounterRec> otherCounters = other.counters.get(groupName);
-      Map<Integer,CounterRec> myCounters = getCounterMap(groupName);
-      for (int i : otherCounters.keySet()) {
-        CounterRec otherCounter = otherCounters.get(i);
-        CounterRec counter = getCounter(myCounters, otherCounter.name, i);
-        counter.value += otherCounter.value;
+    for (Group otherGroup: other) {
+      Group group = getGroup(otherGroup.getName());
+      for(int i=0; i < otherGroup.subcounters.size(); ++i) {
+        Counter otherCounter = otherGroup.subcounters.get(i);
+        if (otherCounter != null) {
+          group.getCounter(i, otherCounter.displayName).value += 
+            otherCounter.value;
+        }
       }
     }
   }
@@ -307,55 +441,45 @@
    */
   public synchronized  int size() {
     int result = 0;
-    for (String groupName : counters.keySet()) {
-      result += counters.get(groupName).size();
+    for (Group group : this) {
+      result += group.size();
     }
     return result;
   }
   
-  // Writable.  The external format is:
-  //  
-  //     #groups group*
-  //
-  // i.e. the number of groups followed by 0 or more groups, where each 
-  // group is of the form:
-  //
-  //     groupName #counters counter*
-  //
-  // where each counter is of the form:
-  //
-  //     ordinal name value
-  //
-  
+  /**
+   * Write the set of groups.
+   * The external format is:
+   *     #groups (groupName group)*
+   *
+   * i.e. the number of groups followed by 0 or more groups, where each 
+   * group is of the form:
+   *
+   *     groupDisplayName #counters (false | true counter)*
+   *
+   * where each counter is of the form:
+   *
+   *     name value
+   */
   public synchronized void write(DataOutput out) throws IOException {
     out.writeInt(counters.size());
-    for (Map.Entry<String, Map<Integer, CounterRec>> e1 : counters.entrySet())
{
-      String groupName = e1.getKey();
-      Map<Integer, CounterRec> map = e1.getValue();
-      UTF8.writeString(out, groupName);
-      out.writeInt(map.size());
-      for (Map.Entry<Integer, CounterRec> e2 : map.entrySet()) {
-        Integer ordinal = e2.getKey();
-        CounterRec counter = e2.getValue();
-        out.writeInt(ordinal);
-        UTF8.writeString(out, counter.name);
-        out.writeLong(counter.value);
-      }
+    for (Group group: counters.values()) {
+      Text.writeString(out, group.getName());
+      group.write(out);
     }
   }
   
+  /**
+   * Read a set of groups.
+   */
   public synchronized void readFields(DataInput in) throws IOException {
     int numClasses = in.readInt();
+    counters.clear();
     while (numClasses-- > 0) {
-      String groupName = UTF8.readString(in);
-      Map<Integer,CounterRec> counters = getCounterMap(groupName);
-      int numCounters = in.readInt();
-      while (numCounters-- > 0) {
-        int index = in.readInt();
-        String counterName = UTF8.readString(in);
-        long value = in.readLong();
-        counters.put(index, new CounterRec(counterName, value));
-      }
+      String groupName = Text.readString(in);
+      Group group = new Group(groupName);
+      group.readFields(in);
+      counters.put(groupName, group);
     }
   }
   
@@ -365,29 +489,25 @@
    */
   public void log(Log log) {
     log.info("Counters: " + size());
-    Collection<String> groupNames = getGroupNames();
-    for (String groupName : groupNames) {
-      Group group = getGroup(groupName);
+    for(Group group: this) {
       log.info("  " + group.getDisplayName());
-      for (String counterName : group.getCounterNames()) {
-        log.info("    " + group.getDisplayName(counterName) + "=" + 
-                 group.getCounter(counterName));
-      }
+      for (Counter counter: group) {
+        log.info("    " + counter.getDisplayName() + "=" + 
+                 counter.getCounter());
+      }   
     }
   }
   
   /**
    * Return textual representation of the counter values.
    */
-  public String toString() {
+  public synchronized String toString() {
     StringBuilder sb = new StringBuilder("Counters: " + size());
-    Collection<String> groupNames = getGroupNames();
-    for (String groupName : groupNames) {
-      Group group = getGroup(groupName);
+    for (Group group: this) {
       sb.append("\n\t" + group.getDisplayName());
-      for (String counterName : group.getCounterNames()) {
-        sb.append("\n\t\t" + group.getDisplayName(counterName) + "=" + 
-                 group.getCounter(counterName));
+      for (Counter counter: group) {
+        sb.append("\n\t\t" + counter.getDisplayName() + "=" + 
+                  counter.getCounter());
       }
     }
     return sb.toString();
@@ -397,22 +517,21 @@
    * Convert a counters object into a single line that is easy to parse.
    * @return the string with "name=value" for each counter and separated by ","
    */
-  public String makeCompactString() {
+  public synchronized String makeCompactString() {
     StringBuffer buffer = new StringBuffer();
-    for(String groupName: getGroupNames()){
-      Counters.Group group = getGroup(groupName);
+    for(Group group: this){
       boolean first = true;
-      for(String counterName: group.getCounterNames()) {
+      for(Counter counter: group) {
         if (first) {
           first = false;
         } else {
           buffer.append(',');
         }
-        buffer.append(groupName);
+        buffer.append(group.getDisplayName());
         buffer.append('.');
-        buffer.append(counterName);
+        buffer.append(counter.getDisplayName());
         buffer.append('=');
-        buffer.append(group.getCounter(counterName));
+        buffer.append(counter.getCounter());
       }
     }
     return buffer.toString();

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java?rev=604440&r1=604439&r2=604440&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java Sat Dec
15 07:16:55 2007
@@ -38,8 +38,9 @@
    * version 7 replaces maxTasks by maxMapTasks and maxReduceTasks in 
    * TaskTrackerStatus for HADOOP-1274
    * Version 8: HeartbeatResponse is added with the next heartbeat interval.
+   * version 9 changes the counter representation for HADOOP-2248
    */
-  public static final long versionID = 8L;
+  public static final long versionID = 9L;
   
   public final static int TRACKERS_OK = 0;
   public final static int UNKNOWN_TASKTRACKER = 1;

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=604440&r1=604439&r2=604440&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Sat Dec 15 07:16:55
2007
@@ -175,14 +175,12 @@
    */
   public void updateMetrics() {
     Counters counters = getCounters();
-    for (String groupName : counters.getGroupNames()) {
-      Counters.Group group = counters.getGroup(groupName);
+    for (Counters.Group group : counters) {
       jobMetrics.setTag("group", group.getDisplayName());
           
-      for (String counter : group.getCounterNames()) {
-        long value = group.getCounter(counter);
-        jobMetrics.setTag("counter", group.getDisplayName(counter));
-        jobMetrics.setMetric("value", (float) value);
+      for (Counters.Counter counter : group) {
+        jobMetrics.setTag("counter", counter.getDisplayName());
+        jobMetrics.setMetric("value", (float) counter.getCounter());
         jobMetrics.update();
       }
     }
@@ -504,14 +502,14 @@
    *  Returns map phase counters by summing over all map tasks in progress.
    */
   public synchronized Counters getMapCounters() {
-    return sumTaskCounters(maps);
+    return incrementTaskCounters(new Counters(), maps);
   }
     
   /**
    *  Returns map phase counters by summing over all map tasks in progress.
    */
   public synchronized Counters getReduceCounters() {
-    return sumTaskCounters(reduces);
+    return incrementTaskCounters(new Counters(), reduces);
   }
     
   /**
@@ -519,16 +517,20 @@
    *  the map and the reduce counters.
    */
   public Counters getCounters() {
-    return Counters.sum(getJobCounters(), 
-                        Counters.sum(getMapCounters(), getReduceCounters()));
+    Counters result = new Counters();
+    result.incrAllCounters(getJobCounters());
+    incrementTaskCounters(result, maps);
+    return incrementTaskCounters(result, reduces);
   }
     
   /**
-   * Returns a Counters instance representing the sum of all the counters in
-   * the array of tasks in progress.
+   * Increments the counters with the counters from each task.
+   * @param counters the counters to increment
+   * @param tips the tasks to add in to counters
+   * @return counters the same object passed in as counters
    */
-  private Counters sumTaskCounters(TaskInProgress[] tips) {
-    Counters counters = new Counters();
+  private Counters incrementTaskCounters(Counters counters,
+                                         TaskInProgress[] tips) {
     for (TaskInProgress tip : tips) {
       counters.incrAllCounters(tip.getCounters());
     }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java?rev=604440&r1=604439&r2=604440&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java Sat Dec
15 07:16:55 2007
@@ -35,8 +35,9 @@
    *Version 4: added jobtracker state to ClusterStatus
    *Version 5: max_tasks in ClusterStatus is replaced by
    * max_map_tasks and max_reduce_tasks for HADOOP-1274
+   * Version 6: change the counters representation for HADOOP-2248
    */
-  public static final long versionID = 5L;
+  public static final long versionID = 6L;
 
   /**
    * Allocate a name for the job.

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java?rev=604440&r1=604439&r2=604440&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java Sat Dec 15 07:16:55
2007
@@ -49,7 +49,6 @@
 import org.apache.hadoop.io.compress.DefaultCodec;
 import org.apache.hadoop.mapred.ReduceTask.ValuesIterator;
 import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.hadoop.util.StringUtils;
 
 import static org.apache.hadoop.mapred.Task.Counter.*;
 
@@ -114,6 +113,51 @@
     return instantiatedSplit;
   }
 
+  /**
+   * This class wraps the user's record reader to update the counters and progress
+   * as records are read.
+   * @param <K>
+   * @param <V>
+   */
+  class TrackedRecordReader<K extends WritableComparable, V extends Writable> 
+      implements RecordReader<K,V> {
+    private RecordReader<K,V> rawIn;
+    private Counters.Counter inputByteCounter;
+    private Counters.Counter inputRecordCounter;
+    
+    TrackedRecordReader(RecordReader<K,V> raw, Counters counters) {
+      rawIn = raw;
+      inputRecordCounter = counters.findCounter(MAP_INPUT_RECORDS);
+      inputByteCounter = counters.findCounter(MAP_INPUT_BYTES);
+    }
+
+    public K createKey() {
+      return rawIn.createKey();
+    }
+      
+    public V createValue() {
+      return rawIn.createValue();
+    }
+     
+    public synchronized boolean next(K key, V value)
+      throws IOException {
+
+      setProgress(getProgress());
+      long beforePos = getPos();
+      boolean ret = rawIn.next(key, value);
+      if (ret) {
+        inputRecordCounter.increment(1);
+        inputByteCounter.increment(getPos() - beforePos);
+      }
+      return ret;
+    }
+    public long getPos() throws IOException { return rawIn.getPos(); }
+    public void close() throws IOException { rawIn.close(); }
+    public float getProgress() throws IOException {
+      return rawIn.getProgress();
+    }
+  };
+
   @SuppressWarnings("unchecked")
   public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)
     throws IOException {
@@ -153,37 +197,9 @@
       job.setLong("map.input.length", fileSplit.getLength());
     }
       
-    final RecordReader rawIn =                  // open input
+    RecordReader rawIn =                  // open input
       job.getInputFormat().getRecordReader(instantiatedSplit, job, reporter);
-
-    RecordReader in = new RecordReader() {      // wrap in progress reporter
-
-        public WritableComparable createKey() {
-          return rawIn.createKey();
-        }
-          
-        public Writable createValue() {
-          return rawIn.createValue();
-        }
-         
-        public synchronized boolean next(WritableComparable key, Writable value)
-          throws IOException {
-
-          setProgress(getProgress());
-          long beforePos = getPos();
-          boolean ret = rawIn.next(key, value);
-          if (ret) {
-            reporter.incrCounter(MAP_INPUT_RECORDS, 1);
-            reporter.incrCounter(MAP_INPUT_BYTES, (getPos() - beforePos));
-          }
-          return ret;
-        }
-        public long getPos() throws IOException { return rawIn.getPos(); }
-        public void close() throws IOException { rawIn.close(); }
-        public float getProgress() throws IOException {
-          return rawIn.getProgress();
-        }
-      };
+    RecordReader in = new TrackedRecordReader(rawIn, getCounters());
 
     MapRunnable runner =
       (MapRunnable)ReflectionUtils.newInstance(job.getMapRunnerClass(), job);
@@ -251,7 +267,6 @@
     private Partitioner partitioner;
     private JobConf job;
     private Reporter reporter;
-    final private TaskUmbilicalProtocol umbilical;
 
     private DataOutputBuffer keyValBuffer; //the buffer where key/val will
                                            //be stored before they are 
@@ -271,6 +286,11 @@
     private FSDataOutputStream out;
     private FSDataOutputStream indexOut;
     private long segmentStart;
+    private Counters.Counter mapOutputByteCounter;
+    private Counters.Counter mapOutputRecordCounter;
+    private Counters.Counter combineInputCounter;
+    private Counters.Counter combineOutputCounter;
+    
     public MapOutputBuffer(TaskUmbilicalProtocol umbilical, JobConf job, 
                            Reporter reporter) throws IOException {
       this.partitions = job.getNumReduceTasks();
@@ -281,7 +301,6 @@
 
       this.job = job;
       this.reporter = reporter;
-      this.umbilical = umbilical;
       this.comparator = job.getOutputKeyComparator();
       this.keyClass = job.getMapOutputKeyClass();
       this.valClass = job.getMapOutputValueClass();
@@ -299,6 +318,11 @@
           ReflectionUtils.newInstance(codecClass, job);
       }
       sortImpl = new BufferSorter[partitions];
+      Counters counters = getCounters();
+      mapOutputByteCounter = counters.findCounter(MAP_OUTPUT_BYTES);
+      mapOutputRecordCounter = counters.findCounter(MAP_OUTPUT_RECORDS);
+      combineInputCounter = getCounters().findCounter(COMBINE_INPUT_RECORDS);
+      combineOutputCounter = counters.findCounter(COMBINE_OUTPUT_RECORDS);
       for (int i = 0; i < partitions; i++)
         sortImpl[i] = (BufferSorter)ReflectionUtils.newInstance(
                                                                 job.getClass("map.sort.class",
MergeSorter.class,
@@ -352,9 +376,8 @@
         int partNumber = partitioner.getPartition(key, value, partitions);
         sortImpl[partNumber].addKeyValue(keyOffset, keyLength, valLength);
 
-        reporter.incrCounter(MAP_OUTPUT_RECORDS, 1);
-        reporter.incrCounter(MAP_OUTPUT_BYTES,
-                             (keyValBuffer.getLength() - keyOffset));
+        mapOutputRecordCounter.increment(1);
+        mapOutputByteCounter.increment(keyValBuffer.getLength() - keyOffset);
 
         //now check whether we need to spill to disk
         long totalMem = 0;
@@ -438,7 +461,7 @@
       while (values.more()) {
         combiner.reduce(values.getKey(), values, combineCollector, reporter);
         values.nextKey();
-        reporter.incrCounter(COMBINE_OUTPUT_RECORDS, 1);
+        combineOutputCounter.increment(1);
         // indicate we're making progress
         reporter.progress();
       }
@@ -589,7 +612,7 @@
       }
       
       public Object next() {
-        reporter.incrCounter(COMBINE_INPUT_RECORDS, 1);
+        combineInputCounter.increment(1);
         return super.next();
       }
     }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java?rev=604440&r1=604439&r2=604440&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Sat Dec 15 07:16:55
2007
@@ -89,6 +89,12 @@
   private Progress copyPhase = getProgress().addPhase("copy");
   private Progress sortPhase  = getProgress().addPhase("sort");
   private Progress reducePhase = getProgress().addPhase("reduce");
+  private Counters.Counter reduceInputKeyCounter = 
+    getCounters().findCounter(REDUCE_INPUT_GROUPS);
+  private Counters.Counter reduceInputValueCounter = 
+    getCounters().findCounter(REDUCE_INPUT_RECORDS);
+  private Counters.Counter reduceOutputCounter = 
+    getCounters().findCounter(REDUCE_OUTPUT_RECORDS);
 
   public ReduceTask() {
     super();
@@ -232,7 +238,7 @@
       reporter.progress();
     }
     public Object next() {
-      reporter.incrCounter(REDUCE_INPUT_RECORDS, 1);
+      reduceInputValueCounter.increment(1);
       return super.next();
     }
   }
@@ -306,7 +312,7 @@
         public void collect(WritableComparable key, Writable value)
           throws IOException {
           out.write(key, value);
-          reporter.incrCounter(REDUCE_OUTPUT_RECORDS, 1);
+          reduceOutputCounter.increment(1);
           // indicate that progress update needs to be sent
           reporter.progress();
         }
@@ -322,7 +328,7 @@
           job, reporter);
       values.informReduceProgress();
       while (values.more()) {
-        reporter.incrCounter(REDUCE_INPUT_GROUPS, 1);
+        reduceInputKeyCounter.increment(1);
         reducer.reduce(values.getKey(), values, collector, reporter);
         values.nextKey();
         values.informReduceProgress();

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java?rev=604440&r1=604439&r2=604440&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java Sat Dec 15 07:16:55 2007
@@ -340,7 +340,6 @@
           setProgressFlag();
         }
         public void incrCounter(Enum key, long amount) {
-          Counters counters = getCounters();
           if (counters != null) {
             counters.incrCounter(key, amount);
           }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java?rev=604440&r1=604439&r2=604440&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java Sat Dec
15 07:16:55 2007
@@ -36,8 +36,9 @@
    *         TaskUmbilicalProtocol.progress(String, float, String, 
    *         org.apache.hadoop.mapred.TaskStatus.Phase, Counters) 
    *         with {@link #statusUpdate(String, TaskStatus)}
+   * Version 5 changed counters representation for HADOOP-2248
    * */
-  public static final long versionID = 4L;
+  public static final long versionID = 5L;
   
   /** Called when a child task process starts, to get its task.*/
   Task getTask(String taskid) throws IOException;

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java?rev=604440&r1=604439&r2=604440&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java Sat Dec 15
07:16:55 2007
@@ -69,6 +69,11 @@
       assertEquals("number of maps", 1, reports.length);
       reports = client.getReduceTaskReports(jobid);
       assertEquals("number of reduces", 1, reports.length);
+      Counters counters = ret.job.getCounters();
+      assertEquals("number of map inputs", 3, 
+                   counters.getCounter(Task.Counter.MAP_INPUT_RECORDS));
+      assertEquals("number of reduce outputs", 9, 
+                   counters.getCounter(Task.Counter.REDUCE_OUTPUT_RECORDS));
       runCustomFormats(mr);
     } finally {
       if (mr != null) { mr.shutdown(); }



Mime
View raw message