hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acmur...@apache.org
Subject svn commit: r1157290 [4/4] - in /hadoop/common/trunk/mapreduce: ./ src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/mapreduce/ src/java/org/apache/hadoop/mapreduce/counters/ src/java/org/apache/hadoop/mapreduce/jobhistory/ src/java/org/apa...
Date Fri, 12 Aug 2011 23:25:52 GMT
Added: hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/counters/FrameworkCounterGroup.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/counters/FrameworkCounterGroup.java?rev=1157290&view=auto
==============================================================================
--- hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/counters/FrameworkCounterGroup.java
(added)
+++ hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/counters/FrameworkCounterGroup.java
Fri Aug 12 23:25:51 2011
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.counters;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Iterator;
+
+import static com.google.common.base.Preconditions.*;
+import com.google.common.collect.AbstractIterator;
+import com.google.common.collect.Iterators;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.util.ResourceBundles;
+
+/**
+ * An abstract class to provide common implementation for the framework
+ * counter group in both mapred and mapreduce packages.
+ *
+ * @param <T> type of the counter enum class
+ * @param <C> type of the counter
+ */
+@InterfaceAudience.Private
+public abstract class FrameworkCounterGroup<T extends Enum<T>,
+    C extends Counter> implements CounterGroupBase<C> {
+
+  private final Class<T> enumClass; // for Enum.valueOf
+  private final Object[] counters;  // local casts are OK and save a class ref
+  private String displayName = null;
+
+  /**
+   * A counter facade for framework counters.
+   * Use old (which extends new) interface to make compatibility easier.
+   */
+  @InterfaceAudience.Private
+  public class FrameworkCounter extends AbstractCounter {
+    final T key;
+    private long value;
+
+    public FrameworkCounter(T ref) {
+      key = ref;
+    }
+
+    @Override
+    public String getName() {
+      return key.name();
+    }
+
+    @Override
+    public String getDisplayName() {
+      return localizeCounterName(getName());
+    }
+
+    @Override
+    public long getValue() {
+      return value;
+    }
+
+    @Override
+    public void setValue(long value) {
+      this.value = value;
+    }
+
+    @Override
+    public void increment(long incr) {
+      value += incr;
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+      assert false : "shouldn't be called";
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      assert false : "shouldn't be called";
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  public FrameworkCounterGroup(Class<T> enumClass) {
+    this.enumClass = enumClass;
+    T[] enums = enumClass.getEnumConstants();
+    counters = new Object[enums.length];
+  }
+
+  @Override
+  public String getName() {
+    return enumClass.getName();
+  }
+
+  @Override
+  public String getDisplayName() {
+    if (displayName == null) {
+      displayName = ResourceBundles.getCounterGroupName(getName(), getName());
+    }
+    return displayName;
+  }
+
+  @Override
+  public void setDisplayName(String displayName) {
+    this.displayName = displayName;
+  }
+
+    private String localizeCounterName(String counterName) {
+      return ResourceBundles.getCounterName(getName(), counterName, counterName);
+    }
+
+  private T valueOf(String name) {
+    return Enum.valueOf(enumClass, name);
+  }
+
+  @Override
+  public void addCounter(C counter) {
+    C ours = findCounter(counter.getName());
+    ours.setValue(counter.getValue());
+  }
+
+  @Override
+  public C addCounter(String name, String displayName, long value) {
+    C counter = findCounter(name);
+    counter.setValue(value);
+    return counter;
+  }
+
+  @Override
+  public C findCounter(String counterName, String displayName) {
+    return findCounter(counterName);
+  }
+
+  @Override
+  public C findCounter(String counterName, boolean create) {
+    try {
+      return findCounter(valueOf(counterName));
+    }
+    catch (Exception e) {
+      if (create) throw new IllegalArgumentException(e);
+      return null;
+    }
+  }
+
+  @Override
+  public C findCounter(String counterName) {
+    return findCounter(valueOf(counterName));
+  }
+
+  @SuppressWarnings("unchecked")
+  private C findCounter(T key) {
+    int i = key.ordinal();
+    if (counters[i] == null) {
+      counters[i] = newCounter(key);
+    }
+    return (C) counters[i];
+  }
+
+  /**
+   * Abstract factory method for new framework counter
+   * @param key for the enum value of a counter
+   * @return a new counter for the key
+   */
+  protected abstract C newCounter(T key);
+
+  @Override
+  public int size() {
+    int n = 0;
+    for (int i = 0; i < counters.length; ++i) {
+      if (counters[i] != null) ++n;
+    }
+    return n;
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public void incrAllCounters(CounterGroupBase<C> other) {
+    if (checkNotNull(other, "other counter group")
+        instanceof FrameworkCounterGroup<?, ?>) {
+      for (Counter counter : other) {
+        findCounter(((FrameworkCounter) counter).key)
+            .increment(counter.getValue());
+      }
+    }
+  }
+
+  /**
+   * FrameworkGroup ::= #counter (key value)*
+   */
+  @Override
+  @SuppressWarnings("unchecked")
+  public void write(DataOutput out) throws IOException {
+    WritableUtils.writeVInt(out, size());
+    for (int i = 0; i < counters.length; ++i) {
+      Counter counter = (C) counters[i];
+      if (counter != null) {
+        WritableUtils.writeVInt(out, i);
+        WritableUtils.writeVLong(out, counter.getValue());
+      }
+    }
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    clear();
+    int len = WritableUtils.readVInt(in);
+    T[] enums = enumClass.getEnumConstants();
+    for (int i = 0; i < len; ++i) {
+      int ord = WritableUtils.readVInt(in);
+      Counter counter = newCounter(enums[ord]);
+      counter.setValue(WritableUtils.readVLong(in));
+      counters[ord] = counter;
+    }
+  }
+
+  private void clear() {
+    for (int i = 0; i < counters.length; ++i) {
+      counters[i] = null;
+    }
+  }
+
+  @Override
+  public Iterator<C> iterator() {
+    return new AbstractIterator<C>() {
+      int i = 0;
+      @Override
+      protected C computeNext() {
+        while (i < counters.length) {
+          @SuppressWarnings("unchecked")
+          C counter = (C) counters[i++];
+          if (counter != null) return counter;
+        }
+        return endOfData();
+      }
+    };
+  }
+
+  @Override
+  public boolean equals(Object genericRight) {
+    if (genericRight instanceof CounterGroupBase<?>) {
+      @SuppressWarnings("unchecked")
+      CounterGroupBase<C> right = (CounterGroupBase<C>) genericRight;
+      return Iterators.elementsEqual(iterator(), right.iterator());
+    }
+    return false;
+  }
+
+  @Override
+  public synchronized int hashCode() {
+    // need to be deep as counters is an array
+    return Arrays.deepHashCode(new Object[]{enumClass, counters, displayName});
+  }
+}

Added: hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/counters/GenericCounter.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/counters/GenericCounter.java?rev=1157290&view=auto
==============================================================================
--- hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/counters/GenericCounter.java
(added)
+++ hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/counters/GenericCounter.java
Fri Aug 12 23:25:51 2011
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.counters;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableUtils;
+
+/**
+ * A generic counter implementation
+ */
+@InterfaceAudience.Private
+public class GenericCounter extends AbstractCounter {
+
+  private String name;
+  private String displayName;
+  private long value = 0;
+
+  public GenericCounter() {
+    // mostly for readFields
+  }
+
+  public GenericCounter(String name, String displayName) {
+    this.name = name;
+    this.displayName = displayName;
+  }
+
+  public GenericCounter(String name, String displayName, long value) {
+    this.name = name;
+    this.displayName = displayName;
+    this.value = value;
+  }
+
+  @Override @Deprecated
+  public synchronized void setDisplayName(String displayName) {
+    this.displayName = displayName;
+  }
+
+  @Override
+  public synchronized void readFields(DataInput in) throws IOException {
+    name = Text.readString(in);
+    displayName = in.readBoolean() ? Text.readString(in) : name;
+    value = WritableUtils.readVLong(in);
+  }
+
+  /**
+   * GenericCounter ::= keyName isDistinctDisplayName [displayName] value
+   */
+  @Override
+  public synchronized void write(DataOutput out) throws IOException {
+    Text.writeString(out, name);
+    boolean distinctDisplayName = ! name.equals(displayName);
+    out.writeBoolean(distinctDisplayName);
+    if (distinctDisplayName) {
+      Text.writeString(out, displayName);
+    }
+    WritableUtils.writeVLong(out, value);
+  }
+
+  @Override
+  public synchronized String getName() {
+    return name;
+  }
+
+  @Override
+  public synchronized String getDisplayName() {
+    return displayName;
+  }
+
+  @Override
+  public synchronized long getValue() {
+    return value;
+  }
+
+  @Override
+  public synchronized void setValue(long value) {
+    this.value = value;
+  }
+
+  @Override
+  public synchronized void increment(long incr) {
+    value += incr;
+  }
+}

Added: hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/counters/LimitExceededException.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/counters/LimitExceededException.java?rev=1157290&view=auto
==============================================================================
--- hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/counters/LimitExceededException.java
(added)
+++ hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/counters/LimitExceededException.java
Fri Aug 12 23:25:51 2011
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.counters;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+@InterfaceAudience.Private
+public class LimitExceededException extends RuntimeException {
+
+  private static final long serialVersionUID = 1L;
+
+  public LimitExceededException(String msg) {
+    super(msg);
+  }
+
+  // Only allows chaining of related exceptions
+  public LimitExceededException(LimitExceededException cause) {
+    super(cause);
+  }
+}

Added: hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/counters/Limits.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/counters/Limits.java?rev=1157290&view=auto
==============================================================================
--- hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/counters/Limits.java
(added)
+++ hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/counters/Limits.java
Fri Aug 12 23:25:51 2011
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.counters;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import static org.apache.hadoop.mapreduce.MRJobConfig.*;
+
+@InterfaceAudience.Private
+public class Limits {
+
+  static final Configuration conf = new Configuration();
+  public static final int GROUP_NAME_MAX =
+      conf.getInt(COUNTER_GROUP_NAME_MAX_KEY, COUNTER_GROUP_NAME_MAX_DEFAULT);
+  public static final int COUNTER_NAME_MAX =
+      conf.getInt(COUNTER_NAME_MAX_KEY, COUNTER_NAME_MAX_DEFAULT);
+  public static final int GROUPS_MAX =
+      conf.getInt(COUNTER_GROUPS_MAX_KEY, COUNTER_GROUPS_MAX_DEFAULT);
+  public static final int COUNTERS_MAX =
+      conf.getInt(COUNTERS_MAX_KEY, COUNTERS_MAX_DEFAULT);
+
+  private int totalCounters;
+  private LimitExceededException firstViolation;
+
+  public static String filterName(String name, int maxLen) {
+    return name.length() > maxLen ? name.substring(0, maxLen - 1) : name;
+  }
+
+  public String filterCounterName(String name) {
+    return filterName(name, COUNTER_NAME_MAX);
+  }
+
+  public String filterGroupName(String name) {
+    return filterName(name, GROUP_NAME_MAX);
+  }
+
+  public synchronized void checkCounters(int size) {
+    if (firstViolation != null) {
+      throw new LimitExceededException(firstViolation);
+    }
+    if (size > COUNTERS_MAX) {
+      firstViolation = new LimitExceededException("Too many counters: "+ size +
+                                                  " max="+ COUNTERS_MAX);
+      throw firstViolation;
+    }
+  }
+
+  public synchronized void incrCounters() {
+    checkCounters(totalCounters + 1);
+    ++totalCounters;
+  }
+
+  public synchronized void checkGroups(int size) {
+    if (firstViolation != null) {
+      throw new LimitExceededException(firstViolation);
+    }
+    if (size > GROUPS_MAX) {
+      firstViolation = new LimitExceededException("Too many counter groups: "+
+                                                  size +" max="+ GROUPS_MAX);
+    }
+  }
+
+  public synchronized LimitExceededException violation() {
+    return firstViolation;
+  }
+}

Added: hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/counters/package-info.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/counters/package-info.java?rev=1157290&view=auto
==============================================================================
--- hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/counters/package-info.java
(added)
+++ hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/counters/package-info.java
Fri Aug 12 23:25:51 2011
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This package contains the implementations of different types of
+ * map-reduce counters.
+ *
+ * cf. MAPREDUCE-901 for rationales.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+package org.apache.hadoop.mapreduce.counters;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;

Modified: hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/jobhistory/EventReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/jobhistory/EventReader.java?rev=1157290&r1=1157289&r2=1157290&view=diff
==============================================================================
--- hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/jobhistory/EventReader.java
(original)
+++ hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/jobhistory/EventReader.java
Fri Aug 12 23:25:51 2011
@@ -22,18 +22,15 @@ import java.io.Closeable;
 import java.io.DataInputStream;
 import java.io.IOException;
 import java.io.EOFException;
-import java.io.StringBufferInputStream;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Counter;
 import org.apache.hadoop.mapreduce.CounterGroup;
 import org.apache.hadoop.mapreduce.Counters;
 
 import org.apache.avro.Schema;
-import org.apache.avro.AvroRuntimeException;
 import org.apache.avro.io.Decoder;
 import org.apache.avro.io.JsonDecoder;
 import org.apache.avro.io.DatumReader;
@@ -171,13 +168,10 @@ public class EventReader implements Clos
     Counters result = new Counters();
     for (JhCounterGroup g : counters.groups) {
       CounterGroup group =
-        new CounterGroup(g.name.toString(), g.displayName.toString());
+          result.addGroup(g.name.toString(), g.displayName.toString());
       for (JhCounter c : g.counts) {
-        group.addCounter(new Counter(c.name.toString(),
-                                     c.displayName.toString(),
-                                     c.value));
+        group.addCounter(c.name.toString(), c.displayName.toString(), c.value);
       }
-      result.addGroup(group);
     }
     return result;
   }

Modified: hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java?rev=1157290&r1=1157289&r2=1157290&view=diff
==============================================================================
--- hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java
(original)
+++ hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java
Fri Aug 12 23:25:51 2011
@@ -114,8 +114,10 @@ public interface ClientProtocol extends 
    *             MAPREDUCE-1664.
    * Version 36: Added the method getJobTrackerStatus() as part of
    *             MAPREDUCE-2337.
+   * Version 37: More efficient serialization format for framework counters
+   *             (MAPREDUCE-901)
    */
-  public static final long versionID = 36L;
+  public static final long versionID = 37L;
 
   /**
    * Allocate a name for the job.

Added: hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/util/CountersStrings.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/util/CountersStrings.java?rev=1157290&view=auto
==============================================================================
--- hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/util/CountersStrings.java
(added)
+++ hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/util/CountersStrings.java
Fri Aug 12 23:25:51 2011
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.util;
+
+import java.text.ParseException;
+import java.util.List;
+
+import com.google.common.collect.Lists;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapreduce.counters.AbstractCounters;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.counters.CounterGroupBase;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * String conversion utilities for counters.
+ * Candidate for deprecation since we start to use JSON in 0.21+
+ */
+@InterfaceAudience.Private
+public class CountersStrings {
+  private static final char GROUP_OPEN = '{';
+  private static final char GROUP_CLOSE = '}';
+  private static final char COUNTER_OPEN = '[';
+  private static final char COUNTER_CLOSE = ']';
+  private static final char UNIT_OPEN = '(';
+  private static final char UNIT_CLOSE = ')';
+  private static char[] charsToEscape =  {GROUP_OPEN, GROUP_CLOSE,
+                                          COUNTER_OPEN, COUNTER_CLOSE,
+                                          UNIT_OPEN, UNIT_CLOSE};
+  /**
+   * Make the pre 0.21 counter string (for e.g. old job history files)
+   * [(actual-name)(display-name)(value)]
+   * @param counter to stringify
+   * @return the stringified result
+   */
+  public static String toEscapedCompactString(Counter counter) {
+
+    // First up, obtain the strings that need escaping. This will help us
+    // determine the buffer length apriori.
+    String escapedName, escapedDispName;
+    long currentValue;
+    synchronized(counter) {
+      escapedName = escape(counter.getName());
+      escapedDispName = escape(counter.getDisplayName());
+      currentValue = counter.getValue();
+    }
+    int length = escapedName.length() + escapedDispName.length() + 4;
+
+
+    length += 8; // For the following delimiting characters
+    StringBuilder builder = new StringBuilder(length);
+    builder.append(COUNTER_OPEN);
+
+    // Add the counter name
+    builder.append(UNIT_OPEN);
+    builder.append(escapedName);
+    builder.append(UNIT_CLOSE);
+
+    // Add the display name
+    builder.append(UNIT_OPEN);
+    builder.append(escapedDispName);
+    builder.append(UNIT_CLOSE);
+
+    // Add the value
+    builder.append(UNIT_OPEN);
+    builder.append(currentValue);
+    builder.append(UNIT_CLOSE);
+
+    builder.append(COUNTER_CLOSE);
+
+    return builder.toString();
+  }
+
+  /**
+   * Make the 0.21 counter group string.
+   * format: {(actual-name)(display-name)(value)[][][]}
+   * where [] are compact strings for the counters within.
+   * @param <G> type of the group
+   * @param group to stringify
+   * @return the stringified result
+   */
+  public static <G extends CounterGroupBase<?>>
+  String toEscapedCompactString(G group) {
+    List<String> escapedStrs = Lists.newArrayList();
+    int length;
+    String escapedName, escapedDispName;
+    synchronized(group) {
+      // First up, obtain the strings that need escaping. This will help us
+      // determine the buffer length apriori.
+      escapedName = escape(group.getName());
+      escapedDispName = escape(group.getDisplayName());
+      int i = 0;
+      length = escapedName.length() + escapedDispName.length();
+      for (Counter counter : group) {
+        String escapedStr = toEscapedCompactString(counter);
+        escapedStrs.add(escapedStr);
+        length += escapedStr.length();
+      }
+    }
+    length += 6; // for all the delimiting characters below
+    StringBuilder builder = new StringBuilder(length);
+    builder.append(GROUP_OPEN); // group start
+
+    // Add the group name
+    builder.append(UNIT_OPEN);
+    builder.append(escapedName);
+    builder.append(UNIT_CLOSE);
+
+    // Add the display name
+    builder.append(UNIT_OPEN);
+    builder.append(escapedDispName);
+    builder.append(UNIT_CLOSE);
+
+    // write the value
+    for(String escaped : escapedStrs) {
+      builder.append(escaped);
+    }
+
+    builder.append(GROUP_CLOSE); // group end
+    return builder.toString();
+  }
+
+  /**
+   * Make the pre 0.21 counters string
+   * @param <C> type of the counter
+   * @param <G> type of the counter group
+   * @param <T> type of the counters object
+   * @param counters the object to stringify
+   * @return the string in the following format
+   * {(groupName)(group-displayName)[(counterName)(displayName)(value)]*}*
+   */
+  public static <C extends Counter, G extends CounterGroupBase<C>,
+                 T extends AbstractCounters<C, G>>
+  String toEscapedCompactString(T counters) {
+    String[] groupsArray;
+    int length = 0;
+    synchronized(counters) {
+      groupsArray = new String[counters.countCounters()];
+      int i = 0;
+      // First up, obtain the escaped string for each group so that we can
+      // determine the buffer length apriori.
+      for (G group : counters) {
+        String escapedString = toEscapedCompactString(group);
+        groupsArray[i++] = escapedString;
+        length += escapedString.length();
+      }
+    }
+
+    // Now construct the buffer
+    StringBuilder builder = new StringBuilder(length);
+    for (String group : groupsArray) {
+      builder.append(group);
+    }
+    return builder.toString();
+  }
+
+  // Escapes all the delimiters for counters i.e {,[,(,),],}
+  private static String escape(String string) {
+    return StringUtils.escapeString(string, StringUtils.ESCAPE_CHAR,
+                                    charsToEscape);
+  }
+
+  // Unescapes all the delimiters for counters i.e {,[,(,),],}
+  private static String unescape(String string) {
+    return StringUtils.unEscapeString(string, StringUtils.ESCAPE_CHAR,
+                                      charsToEscape);
+  }
+
+  // Extracts a block (data enclosed within delimeters) ignoring escape
+  // sequences. Throws ParseException if an incomplete block is found else
+  // returns null.
+  private static String getBlock(String str, char open, char close,
+                                IntWritable index) throws ParseException {
+    StringBuilder split = new StringBuilder();
+    int next = StringUtils.findNext(str, open, StringUtils.ESCAPE_CHAR,
+                                    index.get(), split);
+    split.setLength(0); // clear the buffer
+    if (next >= 0) {
+      ++next; // move over '('
+
+      next = StringUtils.findNext(str, close, StringUtils.ESCAPE_CHAR,
+                                  next, split);
+      if (next >= 0) {
+        ++next; // move over ')'
+        index.set(next);
+        return split.toString(); // found a block
+      } else {
+        throw new ParseException("Unexpected end of block", next);
+      }
+    }
+    return null; // found nothing
+  }
+
+  /**
+   * Parse a pre 0.21 counters string into a counter object.
+   * @param <C> type of the counter
+   * @param <G> type of the counter group
+   * @param <T> type of the counters object
+   * @param compactString to parse
+   * @param counters an empty counters object to hold the result
+   * @return the counters object holding the result
+   * @throws ParseException
+   */
+  @SuppressWarnings("deprecation")
+  public static <C extends Counter, G extends CounterGroupBase<C>,
+                 T extends AbstractCounters<C, G>>
+  T parseEscapedCompactString(String compactString, T counters)
+      throws ParseException {
+    IntWritable index = new IntWritable(0);
+
+    // Get the group to work on
+    String groupString =
+      getBlock(compactString, GROUP_OPEN, GROUP_CLOSE, index);
+
+    while (groupString != null) {
+      IntWritable groupIndex = new IntWritable(0);
+
+      // Get the actual name
+      String groupName =
+        getBlock(groupString, UNIT_OPEN, UNIT_CLOSE, groupIndex);
+      groupName = unescape(groupName);
+
+      // Get the display name
+      String groupDisplayName =
+        getBlock(groupString, UNIT_OPEN, UNIT_CLOSE, groupIndex);
+      groupDisplayName = unescape(groupDisplayName);
+
+      // Get the counters
+      G group = counters.getGroup(groupName);
+      group.setDisplayName(groupDisplayName);
+
+      String counterString =
+        getBlock(groupString, COUNTER_OPEN, COUNTER_CLOSE, groupIndex);
+
+      while (counterString != null) {
+        IntWritable counterIndex = new IntWritable(0);
+
+        // Get the actual name
+        String counterName =
+          getBlock(counterString, UNIT_OPEN, UNIT_CLOSE, counterIndex);
+        counterName = unescape(counterName);
+
+        // Get the display name
+        String counterDisplayName =
+          getBlock(counterString, UNIT_OPEN, UNIT_CLOSE, counterIndex);
+        counterDisplayName = unescape(counterDisplayName);
+
+        // Get the value
+        long value =
+          Long.parseLong(getBlock(counterString, UNIT_OPEN, UNIT_CLOSE,
+                                  counterIndex));
+
+        // Add the counter
+        Counter counter = group.findCounter(counterName);
+        counter.setDisplayName(counterDisplayName);
+        counter.increment(value);
+
+        // Get the next counter
+        counterString =
+          getBlock(groupString, COUNTER_OPEN, COUNTER_CLOSE, groupIndex);
+      }
+
+      groupString = getBlock(compactString, GROUP_OPEN, GROUP_CLOSE, index);
+    }
+    return counters;
+  }
+}

Added: hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/util/ResourceBundles.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/util/ResourceBundles.java?rev=1157290&view=auto
==============================================================================
--- hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/util/ResourceBundles.java
(added)
+++ hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/util/ResourceBundles.java
Fri Aug 12 23:25:51 2011
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.util;
+
+import java.util.ResourceBundle;
+import java.util.MissingResourceException;
+
+/**
+ * Helper class to handle resource bundles in a saner way
+ */
+public class ResourceBundles {
+
+  /**
+   * Get a resource bundle
+   * @param bundleName of the resource
+   * @return the resource bundle
+   * @throws MissingResourceException
+   */
+  public static ResourceBundle getBundle(String bundleName) {
+    return ResourceBundle.getBundle(bundleName.replace('$', '_'));
+  }
+
+  /**
+   * Get a resource given bundle name and key
+   * @param <T> type of the resource
+   * @param bundleName name of the resource bundle
+   * @param key to lookup the resource
+   * @param suffix for the key to lookup
+   * @param defaultValue of the resource
+   * @return the resource or the defaultValue
+   * @throws ClassCastException if the resource found doesn't match T
+   */
+  @SuppressWarnings("unchecked")
+  public static synchronized <T> T getValue(String bundleName, String key,
+                                            String suffix, T defaultValue) {
+    T value;
+    try {
+      ResourceBundle bundle = getBundle(bundleName);
+      value = (T) bundle.getObject(getLookupKey(key, suffix));
+    }
+    catch (Exception e) {
+      return defaultValue;
+    }
+    return value == null ? defaultValue : value;
+  }
+
+  private static String getLookupKey(String key, String suffix) {
+    if (suffix == null || suffix.isEmpty()) return key;
+    return key + suffix;
+  }
+
+  /**
+   * Get the counter group display name
+   * @param group the group name to lookup
+   * @param defaultValue of the group
+   * @return the group display name
+   */
+  public static String getCounterGroupName(String group, String defaultValue) {
+    return getValue(group, "CounterGroupName", "", defaultValue);
+  }
+
+  /**
+   * Get the counter display name
+   * @param group the counter group name for the counter
+   * @param counter the counter name to lookup
+   * @param defaultValue of the counter
+   * @return the counter display name
+   */
+  public static String getCounterName(String group, String counter,
+                                      String defaultValue) {
+    return getValue(group, counter, ".name", defaultValue);
+  }
+}

Modified: hadoop/common/trunk/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestCombineOutputCollector.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestCombineOutputCollector.java?rev=1157290&r1=1157289&r2=1157290&view=diff
==============================================================================
--- hadoop/common/trunk/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestCombineOutputCollector.java
(original)
+++ hadoop/common/trunk/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestCombineOutputCollector.java
Fri Aug 12 23:25:51 2011
@@ -22,7 +22,12 @@ import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.Counters.Counter;
 import org.apache.hadoop.mapred.IFile.Writer;
 import org.apache.hadoop.mapred.Task.CombineOutputCollector;
 import org.apache.hadoop.mapred.Task.TaskReporter;
@@ -31,11 +36,75 @@ import org.junit.Test;
 public class TestCombineOutputCollector {
   private CombineOutputCollector<String, Integer> coc;
 
+  Counters.Counter outCounter = new Counters.Counter() {
+    
+    @Override
+    public void setValue(long value) {
+      // TODO Auto-generated method stub
+      
+    }
+    
+    @Override
+    public void setDisplayName(String displayName) {
+      // TODO Auto-generated method stub
+      
+    }
+    
+    @Override
+    public void increment(long incr) {
+      // TODO Auto-generated method stub
+      
+    }
+    
+    @Override
+    public long getValue() {
+      // TODO Auto-generated method stub
+      return 0;
+    }
+    
+    @Override
+    public String getName() {
+      // TODO Auto-generated method stub
+      return null;
+    }
+    
+    @Override
+    public String getDisplayName() {
+      // TODO Auto-generated method stub
+      return null;
+    }
+    
+    @Override
+    public String makeEscapedCompactString() {
+      // TODO Auto-generated method stub
+      return null;
+    }
+    
+    @Override
+    public long getCounter() {
+      // TODO Auto-generated method stub
+      return 0;
+    }
+    
+    @Override
+    public boolean contentEquals(Counter counter) {
+      // TODO Auto-generated method stub
+      return false;
+    }
+    
+    @Override
+    public void write(DataOutput out) throws IOException {
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+    }
+  };
+
   @Test
   public void testCustomCollect() throws Throwable {
     //mock creation
     TaskReporter mockTaskReporter = mock(TaskReporter.class);
-    Counters.Counter outCounter = new Counters.Counter();
     Writer<String, Integer> mockWriter = mock(Writer.class);
 
     Configuration conf = new Configuration();
@@ -56,7 +125,6 @@ public class TestCombineOutputCollector 
   public void testDefaultCollect() throws Throwable {
     //mock creation
     TaskReporter mockTaskReporter = mock(TaskReporter.class);
-    Counters.Counter outCounter = new Counters.Counter();
     Writer<String, Integer> mockWriter = mock(Writer.class);
 
     Configuration conf = new Configuration();

Modified: hadoop/common/trunk/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestJobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestJobInProgress.java?rev=1157290&r1=1157289&r2=1157290&view=diff
==============================================================================
--- hadoop/common/trunk/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestJobInProgress.java
(original)
+++ hadoop/common/trunk/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestJobInProgress.java
Fri Aug 12 23:25:51 2011
@@ -324,7 +324,7 @@ public class TestJobInProgress extends T
 
     verify(jspy).getStatus();
     verify(jspy).getProfile();
-    verify(jspy).getJobCounters();
+    verify(jspy, atLeastOnce()).getJobCounters();
     verify(jspy, atLeastOnce()).getJobID();
     verify(jspy).getStartTime();
     verify(jspy).getFirstTaskLaunchTimes();

Modified: hadoop/common/trunk/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRDFSSort.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRDFSSort.java?rev=1157290&r1=1157289&r2=1157290&view=diff
==============================================================================
--- hadoop/common/trunk/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRDFSSort.java
(original)
+++ hadoop/common/trunk/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRDFSSort.java
Fri Aug 12 23:25:51 2011
@@ -34,6 +34,7 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.lib.IdentityMapper;
 import org.apache.hadoop.mapred.lib.IdentityReducer;
 import org.apache.hadoop.mapred.lib.NullOutputFormat;
+import org.apache.hadoop.mapreduce.FileSystemCounter;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -104,8 +105,8 @@ public class TestMiniMRDFSSort extends T
     org.apache.hadoop.mapreduce.Counters counters = sort.getResult().getCounters();
     long mapInput = counters.findCounter(FileInputFormatCounter.BYTES_READ)
         .getValue();
-    long hdfsRead = counters.findCounter(Task.FILESYSTEM_COUNTER_GROUP,
-                                         "HDFS_BYTES_READ").getValue();
+    long hdfsRead = counters.findCounter("hdfs", FileSystemCounter.BYTES_READ)
+        .getValue();
     // the hdfs read should be between 100% and 110% of the map input bytes
     assertTrue("map input = " + mapInput + ", hdfs read = " + hdfsRead,
                (hdfsRead < (mapInput * 1.1)) &&

Modified: hadoop/common/trunk/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFS.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFS.java?rev=1157290&r1=1157289&r2=1157290&view=diff
==============================================================================
--- hadoop/common/trunk/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFS.java
(original)
+++ hadoop/common/trunk/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFS.java
Fri Aug 12 23:25:51 2011
@@ -40,6 +40,7 @@ import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.FileSystemCounter;
 import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.MapReduceTestUtil;
 import org.apache.hadoop.mapreduce.TaskCounter;
@@ -244,12 +245,10 @@ public class TestMiniMRWithDFS extends T
     result = launchWordCount(jobConf, inDir, outDir, input, 0, 1);
     assertEquals("is\t1\noom\t1\nowen\t1\n", result.output);
     Counters counters = result.job.getCounters();
-    long hdfsRead = 
-      counters.findCounter(Task.FILESYSTEM_COUNTER_GROUP, 
-          Task.getFileSystemCounterNames("hdfs")[0]).getCounter();
-    long hdfsWrite = 
-      counters.findCounter(Task.FILESYSTEM_COUNTER_GROUP, 
-          Task.getFileSystemCounterNames("hdfs")[1]).getCounter();
+    long hdfsRead = counters.findCounter("HDFS",
+        FileSystemCounter.BYTES_READ).getValue();
+    long hdfsWrite = counters.findCounter("HDFS",
+        FileSystemCounter.BYTES_WRITTEN).getValue();
     long rawSplitBytesRead = 
       counters.findCounter(TaskCounter.SPLIT_RAW_BYTES).getCounter();
     assertEquals(result.output.length(), hdfsWrite);

Modified: hadoop/common/trunk/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestSeveral.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestSeveral.java?rev=1157290&r1=1157289&r2=1157290&view=diff
==============================================================================
--- hadoop/common/trunk/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestSeveral.java
(original)
+++ hadoop/common/trunk/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestSeveral.java
Fri Aug 12 23:25:51 2011
@@ -279,7 +279,7 @@ public class TestSeveral extends TestCas
     ByteArrayOutputStream out = new ByteArrayOutputStream();
     int exitCode = TestJobClient.runTool(conf, new JobClient(),
         new String[] { "-counter", jobId.toString(),
-      "org.apache.hadoop.mapred.Task$Counter", "MAP_INPUT_RECORDS" },
+      "org.apache.hadoop.mapreduce.TaskCounter", "MAP_INPUT_RECORDS" },
       out);
     assertEquals(0, exitCode);
     assertEquals(numReduces, Integer.parseInt(out.toString().trim()));

Modified: hadoop/common/trunk/mapreduce/src/test/mapred/org/apache/hadoop/mapreduce/TestCounters.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/test/mapred/org/apache/hadoop/mapreduce/TestCounters.java?rev=1157290&r1=1157289&r2=1157290&view=diff
==============================================================================
--- hadoop/common/trunk/mapreduce/src/test/mapred/org/apache/hadoop/mapreduce/TestCounters.java
(original)
+++ hadoop/common/trunk/mapreduce/src/test/mapred/org/apache/hadoop/mapreduce/TestCounters.java
Fri Aug 12 23:25:51 2011
@@ -17,17 +17,23 @@
  */
 package org.apache.hadoop.mapreduce;
 
-import java.io.IOException;
 import java.util.Random;
 
 import org.junit.Test;
 import static org.junit.Assert.*;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapreduce.counters.LimitExceededException;
+import org.apache.hadoop.mapreduce.counters.Limits;
+
 /**
  * TestCounters checks the sanity and recoverability of {@code Counters}
  */
 public class TestCounters {
 
+  static final Log LOG = LogFactory.getLog(TestCounters.class);
+
   /**
    * Verify counter value works
    */
@@ -39,7 +45,8 @@ public class TestCounters {
     for (int i = 0; i < NUMBER_TESTS; i++) {
       long initValue = rand.nextInt();
       long expectedValue = initValue;
-      Counter counter = new Counter("foo", "bar", expectedValue);
+      Counter counter = new Counters().findCounter("test", "foo");
+      counter.setValue(initValue);
       assertEquals("Counter value is not initialized correctly",
           expectedValue, counter.getValue());
       for (int j = 0; j < NUMBER_INC; j++) {
@@ -56,4 +63,69 @@ public class TestCounters {
     }
   }
 
+  @Test public void testLimits() {
+    for (int i = 0; i < 3; ++i) {
+      // make sure limits apply to separate containers
+      testMaxCounters(new Counters());
+      testMaxGroups(new Counters());
+    }
+  }
+
+  static final Enum<?> FRAMEWORK_COUNTER = TaskCounter.CPU_MILLISECONDS;
+  static final long FRAMEWORK_COUNTER_VALUE = 8;
+  static final String FS_SCHEME = "HDFS";
+  static final FileSystemCounter FS_COUNTER = FileSystemCounter.BYTES_READ;
+  static final long FS_COUNTER_VALUE = 10;
+
+  private void testMaxCounters(final Counters counters) {
+    LOG.info("counters max="+ Limits.COUNTERS_MAX);
+    for (int i = 0; i < Limits.COUNTERS_MAX; ++i) {
+      counters.findCounter("test", "test"+ i);
+    }
+    setExpected(counters);
+    shouldThrow(LimitExceededException.class, new Runnable() {
+      public void run() {
+        counters.findCounter("test", "bad");
+      }
+    });
+    checkExpected(counters);
+  }
+
+  private void testMaxGroups(final Counters counters) {
+    LOG.info("counter groups max="+ Limits.GROUPS_MAX);
+    for (int i = 0; i < Limits.GROUPS_MAX; ++i) {
+      // assuming COUNTERS_MAX > GROUPS_MAX
+      counters.findCounter("test"+ i, "test");
+    }
+    setExpected(counters);
+    shouldThrow(LimitExceededException.class, new Runnable() {
+      public void run() {
+        counters.findCounter("bad", "test");
+      }
+    });
+    checkExpected(counters);
+  }
+
+  private void setExpected(Counters counters) {
+    counters.findCounter(FRAMEWORK_COUNTER).setValue(FRAMEWORK_COUNTER_VALUE);
+    counters.findCounter(FS_SCHEME, FS_COUNTER).setValue(FS_COUNTER_VALUE);
+  }
+
+  private void checkExpected(Counters counters) {
+    assertEquals(FRAMEWORK_COUNTER_VALUE,
+                 counters.findCounter(FRAMEWORK_COUNTER).getValue());
+    assertEquals(FS_COUNTER_VALUE,
+                 counters.findCounter(FS_SCHEME, FS_COUNTER).getValue());
+  }
+
+  private void shouldThrow(Class<? extends Exception> ecls, Runnable runnable) {
+    try {
+      runnable.run();
+    } catch (Exception e) {
+      assertSame(ecls, e.getClass());
+      LOG.info("got expected: "+ e);
+      return;
+    }
+    assertTrue("Should've thrown "+ ecls.getSimpleName(), false);
+  }
 }



Mime
View raw message