hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject svn commit: r654313 - in /hadoop/core/trunk: ./ src/java/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/
Date Wed, 07 May 2008 22:41:36 GMT
Author: omalley
Date: Wed May  7 15:41:34 2008
New Revision: 654313

URL: http://svn.apache.org/viewvc?rev=654313&view=rev
Log:
HADOOP-1915. Allow users to specify counters via strings instead
of enumerations. Contributed by Tom White.

Added:
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestUserDefinedCounters.java
Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/Counters.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/Reporter.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/Task.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=654313&r1=654312&r2=654313&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Wed May  7 15:41:34 2008
@@ -68,6 +68,9 @@
     HADOOP-3058. Add FSNamesystem status metrics. 
     (Lohit Vjayarenu via rangadi)
 
+    HADOOP-1915. Allow users to specify counters via strings instead
+    of enumerations. (tomwhite via omalley)
+
   IMPROVEMENTS
    
     HADOOP-2928. Remove deprecated FileSystem.getContentLength().

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/Counters.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/Counters.java?rev=654313&r1=654312&r2=654313&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/Counters.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/Counters.java Wed May  7 15:41:34
2008
@@ -26,7 +26,6 @@
 import java.util.HashMap;
 import java.util.IdentityHashMap;
 import java.util.Iterator;
-import java.util.List;
 import java.util.Map;
 import java.util.MissingResourceException;
 import java.util.ResourceBundle;
@@ -56,6 +55,7 @@
    */
   public static class Counter implements Writable {
 
+    private String name;
     private String displayName;
     private long value;
     
@@ -63,7 +63,8 @@
       value = 0L;
     }
 
-    Counter(String displayName, long value) {
+    Counter(String name, String displayName, long value) {
+      this.name = name;
       this.displayName = displayName;
       this.value = value;
     }
@@ -72,7 +73,12 @@
      * Read the binary representation of the counter
      */
     public synchronized void readFields(DataInput in) throws IOException {
-      displayName = Text.readString(in);
+      name = Text.readString(in);
+      if (in.readBoolean()) {
+        displayName = Text.readString(in);
+      } else {
+        displayName = name;
+      }
       value = WritableUtils.readVLong(in);
     }
     
@@ -80,15 +86,28 @@
      * Write the binary representation of the counter
      */
     public synchronized void write(DataOutput out) throws IOException {
-      Text.writeString(out, displayName);
+      Text.writeString(out, name);
+      boolean distinctDisplayName = (! name.equals(displayName));
+      out.writeBoolean(distinctDisplayName);
+      if (distinctDisplayName) {
+        Text.writeString(out, displayName);
+      }
       WritableUtils.writeVLong(out, value);
     }
     
     /**
+     * Get the internal name of the counter.
+     * @return the internal name of the counter
+     */
+    public synchronized String getName() {
+      return name;
+    }
+    
+    /**
      * Get the name of the counter.
      * @return the user facing name of the counter
      */
-    public String getDisplayName() {
+    public synchronized String getDisplayName() {
       return displayName;
     }
     
@@ -119,7 +138,7 @@
   public static class Group implements Writable, Iterable<Counter> {
     private String groupName;
     private String displayName;
-    private ArrayList<Counter> subcounters = new ArrayList<Counter>();
+    private Map<String, Counter> subcounters = new HashMap<String, Counter>();
     
     // Optional ResourceBundle for localization of group and counter names.
     private ResourceBundle bundle = null;    
@@ -166,7 +185,7 @@
      * not exist.
      */
     public synchronized long getCounter(String counterName) {
-      for(Counter counter: subcounters) {
+      for(Counter counter: subcounters.values()) {
         if (counter != null && counter.displayName.equals(counterName)) {
           return counter.value;
         }
@@ -179,22 +198,24 @@
      * @param id the numeric id of the counter within the group
      * @param name the internal counter name
      * @return the counter
+     * @deprecated use {@link #getCounter(String)} instead
      */
+    @Deprecated
     public synchronized Counter getCounter(int id, String name) {
-      Counter result = null;
-      int size = subcounters.size();
-      if (id < size) {
-        result = subcounters.get(id);
-      }
+      return getCounterForName(name);
+    }
+    
+    /**
+     * Get the counter for the given name and create it if it doesn't exist.
+     * @param name the internal counter name
+     * @return the counter
+     */
+    public synchronized Counter getCounterForName(String name) {
+      Counter result = subcounters.get(name);
       if (result == null) {
-        LOG.debug("Adding " + name + " at " + id);
-        result = new Counter(localize(name + ".name", name), 0L);
-        // extend the list
-        subcounters.ensureCapacity(id + 1);
-        for(int i=size; i <= id; ++i) {
-          subcounters.add(null);
-        }
-        subcounters.set(id, result);
+        LOG.debug("Adding " + name);
+        result = new Counter(name, localize(name + ".name", name), 0L);
+        subcounters.put(name, result);
       }
       return result;
     }
@@ -203,13 +224,7 @@
      * Returns the number of counters in this group.
      */
     public synchronized int size() {
-      int num = 0;
-      for(Counter counter: subcounters) {
-        if (counter != null) {
-          num += 1;
-        }
-      }
-      return num;
+      return subcounters.size();
     }
     
     /**
@@ -231,13 +246,8 @@
     public synchronized void write(DataOutput out) throws IOException {
       Text.writeString(out, displayName);
       WritableUtils.writeVInt(out, subcounters.size());
-      for(Counter counter: subcounters) {
-        if (counter == null) {
-          out.writeBoolean(false);
-        } else {
-          out.writeBoolean(true);
-          counter.write(out);
-        }
+      for(Counter counter: subcounters.values()) {
+        counter.write(out);
       }
     }
     
@@ -245,58 +255,15 @@
       displayName = Text.readString(in);
       subcounters.clear();
       int size = WritableUtils.readVInt(in);
-      subcounters.ensureCapacity(size);
       for(int i=0; i < size; i++) {
-        Counter counter = null;
-        if (in.readBoolean()) {
-          counter = new Counter();
-          counter.readFields(in);
-        }
-        subcounters.add(counter);
-      }
-    }
-    
-    private class CounterIterator implements Iterator<Counter> {
-      private int current = -1;
-
-      CounterIterator() {
-        getNext();
-      }
-
-      private void getNext() {
-        synchronized (Group.this) {
-          int len = subcounters.size();
-          while (++current < len) {
-            if (subcounters.get(current) != null) {
-              return;
-            }
-          }
-        }
-        current = Integer.MAX_VALUE;
-      }
-
-      public boolean hasNext() {
-        synchronized (Group.this) {
-          return current < subcounters.size();
-        }
-      }
-
-      public Counter next() {
-        synchronized (Group.this) {
-          int result = current;
-          getNext();
-          return subcounters.get(result);
-        }
-      }
-      
-      public void remove() {
-        throw new UnsupportedOperationException
-        ("NonNullIterator doesn't support remove");
+        Counter counter = new Counter();
+        counter.readFields(in);
+        subcounters.put(counter.getName(), counter);
       }
     }
 
-    public Iterator<Counter> iterator() {
-      return new CounterIterator();
+    public synchronized Iterator<Counter> iterator() {
+      return new ArrayList<Counter>(subcounters.values()).iterator();
     }
   }
   
@@ -347,7 +314,7 @@
     Counter counter = cache.get(key);
     if (counter == null) {
       Group group = getGroup(key.getDeclaringClass().getName());
-      counter = group.getCounter(key.ordinal(), key.toString());
+      counter = group.getCounterForName(key.toString());
       cache.put(key, counter);
     }
     return counter;    
@@ -359,9 +326,11 @@
    * @param id the id of the counter within the group (0 to N-1)
    * @param name the internal name of the counter
    * @return the counter for that name
+   * @deprecated
    */
+  @Deprecated
   public synchronized Counter findCounter(String group, int id, String name) {
-    return getGroup(group).getCounter(id, name);
+    return getGroup(group).getCounterForName(name);
   }
 
   /**
@@ -376,6 +345,18 @@
   }
   
   /**
+   * Increments the specified counter by the specified amount, creating it if
+   * it didn't already exist.
+   * @param group the name of the group
+   * @param counter the internal name of the counter
+   * @param amount amount by which counter is to be incremented
+   */
+  @SuppressWarnings("unchecked")
+  public synchronized void incrCounter(String group, String counter, long amount) {
+    getGroup(group).getCounterForName(counter).value += amount;
+  }
+  
+  /**
    * Returns current value of the specified counter, or 0 if the counter
    * does not exist.
    */
@@ -392,12 +373,11 @@
   public synchronized void incrAllCounters(Counters other) {
     for (Group otherGroup: other) {
       Group group = getGroup(otherGroup.getName());
-      for(int i=0; i < otherGroup.subcounters.size(); ++i) {
-        Counter otherCounter = otherGroup.subcounters.get(i);
-        if (otherCounter != null) {
-          group.getCounter(i, otherCounter.displayName).value += 
-            otherCounter.value;
-        }
+      group.displayName = otherGroup.displayName;
+      for (Counter otherCounter : otherGroup) {
+        Counter counter = group.getCounterForName(otherCounter.getName());
+        counter.displayName = otherCounter.displayName;
+        counter.value += otherCounter.value;
       }
     }
   }
@@ -436,7 +416,7 @@
    *
    * where each counter is of the form:
    *
-   *     name value
+   *     name (false | true displayName) value
    */
   public synchronized void write(DataOutput out) throws IOException {
     out.writeInt(counters.size());

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java?rev=654313&r1=654312&r2=654313&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java Wed May
 7 15:41:34 2008
@@ -41,8 +41,9 @@
    * version 9 changes the counter representation for HADOOP-2248
    * version 10 changes the TaskStatus representation for HADOOP-2208
    * version 11 changes string to JobID in getTaskCompletionEvents().
+   * version 12 changes the counters representation for HADOOP-1915
    */
-  public static final long versionID = 11L;
+  public static final long versionID = 12L;
   
   public final static int TRACKERS_OK = 0;
   public final static int UNKNOWN_TASKTRACKER = 1;

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java?rev=654313&r1=654312&r2=654313&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java Wed May
 7 15:41:34 2008
@@ -38,8 +38,9 @@
    * Version 6: change the counters representation for HADOOP-2248
    * Version 7: added getAllJobs for HADOOP-2487
    * Version 8: change {job|task}id's to use corresponding objects rather that strings.
+   * Version 9: change the counter representation for HADOOP-1915
    */
-  public static final long versionID = 8L;
+  public static final long versionID = 9L;
 
   /**
    * Allocate a name for the job.

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/Reporter.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/Reporter.java?rev=654313&r1=654312&r2=654313&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/Reporter.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/Reporter.java Wed May  7 15:41:34
2008
@@ -48,6 +48,8 @@
       }
       public void incrCounter(Enum key, long amount) {
       }
+      public void incrCounter(String group, String counter, long amount) {
+      }
       public InputSplit getInputSplit() throws UnsupportedOperationException {
         throw new UnsupportedOperationException("NULL reporter has no input");
       }
@@ -72,6 +74,17 @@
   public abstract void incrCounter(Enum key, long amount);
   
   /**
+   * Increments the counter identified by the group and counter name
+   * by the specified amount.
+   * 
+   * @param group name to identify the group of the counter to be incremented.
+   * @param counter name to identify the counter within the group.
+   * @param amount A non-negative amount by which the counter is to 
+   *               be incremented.
+   */
+  public abstract void incrCounter(String group, String counter, long amount);
+  
+  /**
    * Get the {@link InputSplit} object for a map.
    * 
    * @return the <code>InputSplit</code> that the map is reading from.

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/Task.java?rev=654313&r1=654312&r2=654313&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/Task.java Wed May  7 15:41:34 2008
@@ -361,6 +361,12 @@
           }
           setProgressFlag();
         }
+        public void incrCounter(String group, String counter, long amount) {
+          if (counters != null) {
+            counters.incrCounter(group, counter, amount);
+          }
+          setProgressFlag();
+        }
         public InputSplit getInputSplit() throws UnsupportedOperationException {
           return Task.this.getInputSplit();
         }

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java?rev=654313&r1=654312&r2=654313&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java Wed May
 7 15:41:34 2008
@@ -42,9 +42,10 @@
    *           or not the task's output needs to be promoted.
    * Version 8 changes {job|tip|task}id's to use their corresponding 
    * objects rather than strings.
+   * Version 9 changes the counter representation for HADOOP-1915
    * */
 
-  public static final long versionID = 8L;
+  public static final long versionID = 9L;
   
   /** Called when a child task process starts, to get its task.*/
   Task getTask(TaskAttemptID taskid) throws IOException;

Added: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestUserDefinedCounters.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestUserDefinedCounters.java?rev=654313&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestUserDefinedCounters.java (added)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestUserDefinedCounters.java Wed May
 7 15:41:34 2008
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.lib.IdentityMapper;
+import org.apache.hadoop.mapred.lib.IdentityReducer;
+
+public class TestUserDefinedCounters extends ClusterMapReduceTestCase {
+  
+  enum EnumCounter { MAP_RECORDS }
+  
+  static class CountingMapper<K, V> extends IdentityMapper<K, V> {
+
+    public void map(K key, V value,
+        OutputCollector<K, V> output, Reporter reporter)
+        throws IOException {
+      output.collect(key, value);
+      reporter.incrCounter(EnumCounter.MAP_RECORDS, 1);
+      reporter.incrCounter("StringCounter", "MapRecords", 1);
+    }
+
+  }
+  
+  public void testMapReduceJob() throws Exception {
+    OutputStream os = getFileSystem().create(new Path(getInputDir(), "text.txt"));
+    Writer wr = new OutputStreamWriter(os);
+    wr.write("hello1\n");
+    wr.write("hello2\n");
+    wr.write("hello3\n");
+    wr.write("hello4\n");
+    wr.close();
+
+    JobConf conf = createJobConf();
+    conf.setJobName("counters");
+    
+    conf.setInputFormat(TextInputFormat.class);
+
+    conf.setMapOutputKeyClass(LongWritable.class);
+    conf.setMapOutputValueClass(Text.class);
+
+    conf.setOutputFormat(TextOutputFormat.class);
+    conf.setOutputKeyClass(LongWritable.class);
+    conf.setOutputValueClass(Text.class);
+
+    conf.setMapperClass(CountingMapper.class);
+    conf.setReducerClass(IdentityReducer.class);
+
+    FileInputFormat.setInputPaths(conf, getInputDir());
+
+    FileOutputFormat.setOutputPath(conf, getOutputDir());
+
+    RunningJob runningJob = JobClient.runJob(conf);
+
+    Path[] outputFiles = FileUtil.stat2Paths(
+                           getFileSystem().listStatus(getOutputDir(),
+                           new OutputLogFilter()));
+    if (outputFiles.length > 0) {
+      InputStream is = getFileSystem().open(outputFiles[0]);
+      BufferedReader reader = new BufferedReader(new InputStreamReader(is));
+      String line = reader.readLine();
+      int counter = 0;
+      while (line != null) {
+        counter++;
+        assertTrue(line.contains("hello"));
+        line = reader.readLine();
+      }
+      reader.close();
+      assertEquals(4, counter);
+    }
+    
+    assertEquals(4,
+        runningJob.getCounters().getCounter(EnumCounter.MAP_RECORDS));
+    assertEquals(4,
+        runningJob.getCounters().getGroup("StringCounter")
+        .getCounter("MapRecords"));
+  }
+
+}



Mime
View raw message