beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bchamb...@apache.org
Subject [2/3] incubator-beam git commit: Remove Counter and associated code
Date Thu, 11 Aug 2016 18:41:18 GMT
Remove Counter and associated code

Aggregator is the model level concept. Counter was specific to the
Dataflow Runner, and is now not needed as part of Beam.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d20a7ada
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d20a7ada
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d20a7ada

Branch: refs/heads/master
Commit: d20a7ada7eb3ee714917e7c334e1b15ecc2c3b03
Parents: 2a1055d
Author: bchambers <bchambers@google.com>
Authored: Fri Jul 29 09:41:17 2016 -0700
Committer: bchambers <bchambers@google.com>
Committed: Thu Aug 11 10:26:04 2016 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/util/DoFnRunners.java   |   78 --
 .../beam/runners/dataflow/DataflowRunner.java   |    4 +-
 .../org/apache/beam/sdk/transforms/Combine.java |   13 -
 .../org/apache/beam/sdk/transforms/Max.java     |   27 +-
 .../org/apache/beam/sdk/transforms/Min.java     |   28 +-
 .../org/apache/beam/sdk/transforms/Sum.java     |   27 +-
 .../apache/beam/sdk/util/CounterAggregator.java |  128 --
 .../apache/beam/sdk/util/common/Counter.java    | 1287 ------------------
 .../beam/sdk/util/common/CounterName.java       |  153 ---
 .../beam/sdk/util/common/CounterProvider.java   |   27 -
 .../apache/beam/sdk/util/common/CounterSet.java |  179 ---
 .../util/common/ElementByteSizeObserver.java    |   24 +-
 .../beam/sdk/util/CounterAggregatorTest.java    |  256 ----
 .../beam/sdk/util/common/CounterSetTest.java    |  227 ---
 .../beam/sdk/util/common/CounterTest.java       |  736 ----------
 15 files changed, 15 insertions(+), 3179 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d20a7ada/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunners.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunners.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunners.java
index a9f3cf4..6089228 100644
--- a/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunners.java
+++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunners.java
@@ -23,8 +23,6 @@ import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.DoFnRunner.ReduceFnExecutor;
 import org.apache.beam.sdk.util.ExecutionContext.StepContext;
-import org.apache.beam.sdk.util.common.CounterSet;
-import org.apache.beam.sdk.util.common.CounterSet.AddCounterMutator;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.TupleTag;
 
@@ -72,33 +70,6 @@ public class DoFnRunners {
   }
 
   /**
-   * Returns a basic implementation of {@link DoFnRunner} that works for most
-   * {@link OldDoFn OldDoFns}.
-   *
-   * <p>It invokes {@link OldDoFn#processElement} for each input.
-   */
-  public static <InputT, OutputT> DoFnRunner<InputT, OutputT> simpleRunner(
-      PipelineOptions options,
-      OldDoFn<InputT, OutputT> fn,
-      SideInputReader sideInputReader,
-      OutputManager outputManager,
-      TupleTag<OutputT> mainOutputTag,
-      List<TupleTag<?>> sideOutputTags,
-      StepContext stepContext,
-      CounterSet.AddCounterMutator addCounterMutator,
-      WindowingStrategy<?, ?> windowingStrategy) {
-    return simpleRunner(options,
-        fn,
-        sideInputReader,
-        outputManager,
-        mainOutputTag,
-        sideOutputTags,
-        stepContext,
-        CounterAggregator.factoryFor(addCounterMutator),
-        windowingStrategy);
-  }
-
-  /**
    * Returns an implementation of {@link DoFnRunner} that handles late data dropping.
    *
    * <p>It drops elements from expired windows before they reach the underlying {@link OldDoFn}.
@@ -132,33 +103,6 @@ public class DoFnRunners {
         reduceFnExecutor.getDroppedDueToLatenessAggregator());
   }
 
-  /**
-   * Returns an implementation of {@link DoFnRunner} that handles late data dropping.
-   *
-   * <p>It drops elements from expired windows before they reach the underlying {@link OldDoFn}.
-   */
-  public static <K, InputT, OutputT, W extends BoundedWindow>
-  DoFnRunner<KeyedWorkItem<K, InputT>, KV<K, OutputT>> lateDataDroppingRunner(
-      PipelineOptions options,
-      ReduceFnExecutor<K, InputT, OutputT, W> reduceFnExecutor,
-      SideInputReader sideInputReader,
-      OutputManager outputManager,
-      TupleTag<KV<K, OutputT>> mainOutputTag,
-      List<TupleTag<?>> sideOutputTags,
-      StepContext stepContext,
-      CounterSet.AddCounterMutator addCounterMutator,
-      WindowingStrategy<?, W> windowingStrategy) {
-    return lateDataDroppingRunner(
-        options,
-        reduceFnExecutor,
-        sideInputReader,
-        outputManager,
-        mainOutputTag,
-        sideOutputTags,
-        stepContext,
-        CounterAggregator.factoryFor(addCounterMutator),
-        windowingStrategy);
-  }
 
   public static <InputT, OutputT> DoFnRunner<InputT, OutputT> createDefault(
       PipelineOptions options,
@@ -197,26 +141,4 @@ public class DoFnRunners {
         aggregatorFactory,
         windowingStrategy);
   }
-
-  public static <InputT, OutputT> DoFnRunner<InputT, OutputT> createDefault(
-      PipelineOptions options,
-      OldDoFn<InputT, OutputT> doFn,
-      SideInputReader sideInputReader,
-      OutputManager outputManager,
-      TupleTag<OutputT> mainOutputTag,
-      List<TupleTag<?>> sideOutputTags,
-      StepContext stepContext,
-      AddCounterMutator addCounterMutator,
-      WindowingStrategy<?, ?> windowingStrategy) {
-    return createDefault(
-        options,
-        doFn,
-        sideInputReader,
-        outputManager,
-        mainOutputTag,
-        sideOutputTags,
-        stepContext,
-        CounterAggregator.factoryFor(addCounterMutator),
-        windowingStrategy);
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d20a7ada/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index bea6264..667a63b 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -212,9 +212,9 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
   // Default Docker container images that execute Dataflow worker harness, residing in Google
   // Container Registry, separately for Batch and Streaming.
   public static final String BATCH_WORKER_HARNESS_CONTAINER_IMAGE =
-      "dataflow.gcr.io/v1beta3/beam-java-batch:beam-master-20160804-dofn";
+      "dataflow.gcr.io/v1beta3/beam-java-batch:beam-master-20160810";
   public static final String STREAMING_WORKER_HARNESS_CONTAINER_IMAGE =
-      "dataflow.gcr.io/v1beta3/beam-java-streaming:beam-master-20160804-dofn";
+      "dataflow.gcr.io/v1beta3/beam-java-streaming:beam-master-20160810";
 
   // The limit of CreateJob request size.
   private static final int CREATE_JOB_REQUEST_LIMIT_BYTES = 10 * 1024 * 1024;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d20a7ada/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
index a825800..6ba3f8a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
@@ -49,7 +49,6 @@ import org.apache.beam.sdk.util.PerKeyCombineFnRunners;
 import org.apache.beam.sdk.util.PropertyNames;
 import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.util.common.Counter;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionList;
@@ -736,10 +735,6 @@ public class Combine {
       return new int[] { value };
     }
 
-    public Counter<Integer> getCounter(@SuppressWarnings("unused") String name) {
-      throw new UnsupportedOperationException("BinaryCombineIntegerFn does not support getCounter");
-    }
-
     private static final class ToIntegerCodingFunction
         implements DelegateCoder.CodingFunction<int[], Integer> {
       @Override
@@ -839,10 +834,6 @@ public class Combine {
       return new long[] { value };
     }
 
-    public Counter<Long> getCounter(@SuppressWarnings("unused") String name) {
-      throw new UnsupportedOperationException("BinaryCombineLongFn does not support getCounter");
-    }
-
     private static final class ToLongCodingFunction
         implements DelegateCoder.CodingFunction<long[], Long> {
       @Override
@@ -944,10 +935,6 @@ public class Combine {
       return new double[] { value };
     }
 
-    public Counter<Double> getCounter(@SuppressWarnings("unused") String name) {
-      throw new UnsupportedOperationException("BinaryCombineDoubleFn does not support getCounter");
-    }
-
     private static final class ToDoubleCodingFunction
         implements DelegateCoder.CodingFunction<double[], Double> {
       @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d20a7ada/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java
index 52617b6..eed13fb 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java
@@ -19,9 +19,6 @@ package org.apache.beam.sdk.transforms;
 
 import org.apache.beam.sdk.transforms.Combine.BinaryCombineFn;
 import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.util.common.Counter;
-import org.apache.beam.sdk.util.common.Counter.AggregationKind;
-import org.apache.beam.sdk.util.common.CounterProvider;
 
 import java.io.Serializable;
 import java.util.Comparator;
@@ -218,8 +215,7 @@ public class Max {
    * A {@code CombineFn} that computes the maximum of a collection of {@code Integer}s, useful as an
    * argument to {@link Combine#globally} or {@link Combine#perKey}.
    */
-  public static class MaxIntegerFn extends Combine.BinaryCombineIntegerFn
-      implements CounterProvider<Integer> {
+  public static class MaxIntegerFn extends Combine.BinaryCombineIntegerFn {
     @Override
     public int apply(int left, int right) {
       return left >= right ? left : right;
@@ -229,19 +225,13 @@ public class Max {
     public int identity() {
       return Integer.MIN_VALUE;
     }
-
-    @Override
-    public Counter<Integer> getCounter(String name) {
-      return Counter.ints(name, AggregationKind.MAX);
-    }
   }
 
   /**
    * A {@code CombineFn} that computes the maximum of a collection of {@code Long}s, useful as an
    * argument to {@link Combine#globally} or {@link Combine#perKey}.
    */
-  public static class MaxLongFn extends Combine.BinaryCombineLongFn
-      implements CounterProvider<Long> {
+  public static class MaxLongFn extends Combine.BinaryCombineLongFn {
     @Override
     public long apply(long left, long right) {
       return left >= right ? left : right;
@@ -251,19 +241,13 @@ public class Max {
     public long identity() {
       return Long.MIN_VALUE;
     }
-
-    @Override
-    public Counter<Long> getCounter(String name) {
-      return Counter.longs(name, AggregationKind.MAX);
-    }
   }
 
   /**
    * A {@code CombineFn} that computes the maximum of a collection of {@code Double}s, useful as an
    * argument to {@link Combine#globally} or {@link Combine#perKey}.
    */
-  public static class MaxDoubleFn extends Combine.BinaryCombineDoubleFn
-      implements CounterProvider<Double> {
+  public static class MaxDoubleFn extends Combine.BinaryCombineDoubleFn {
     @Override
     public double apply(double left, double right) {
       return left >= right ? left : right;
@@ -273,10 +257,5 @@ public class Max {
     public double identity() {
       return Double.NEGATIVE_INFINITY;
     }
-
-    @Override
-    public Counter<Double> getCounter(String name) {
-      return Counter.doubles(name, AggregationKind.MAX);
-    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d20a7ada/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java
index 3159134..9c9d14f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java
@@ -19,9 +19,6 @@ package org.apache.beam.sdk.transforms;
 
 import org.apache.beam.sdk.transforms.Combine.BinaryCombineFn;
 import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.util.common.Counter;
-import org.apache.beam.sdk.util.common.Counter.AggregationKind;
-import org.apache.beam.sdk.util.common.CounterProvider;
 
 import java.io.Serializable;
 import java.util.Comparator;
@@ -218,8 +215,7 @@ public class Min {
    * A {@code CombineFn} that computes the minimum of a collection of {@code Integer}s, useful as an
    * argument to {@link Combine#globally} or {@link Combine#perKey}.
    */
-  public static class MinIntegerFn extends Combine.BinaryCombineIntegerFn
-      implements CounterProvider<Integer> {
+  public static class MinIntegerFn extends Combine.BinaryCombineIntegerFn {
 
     @Override
     public int apply(int left, int right) {
@@ -230,20 +226,13 @@ public class Min {
     public int identity() {
       return Integer.MAX_VALUE;
     }
-
-    @Override
-    public Counter<Integer> getCounter(String name) {
-      return Counter.ints(name, AggregationKind.MIN);
-    }
   }
 
   /**
    * A {@code CombineFn} that computes the minimum of a collection of {@code Long}s, useful as an
    * argument to {@link Combine#globally} or {@link Combine#perKey}.
    */
-  public static class MinLongFn extends Combine.BinaryCombineLongFn
-      implements CounterProvider<Long> {
-
+  public static class MinLongFn extends Combine.BinaryCombineLongFn {
     @Override
     public long apply(long left, long right) {
       return left <= right ? left : right;
@@ -253,19 +242,13 @@ public class Min {
     public long identity() {
       return Long.MAX_VALUE;
     }
-
-    @Override
-    public Counter<Long> getCounter(String name) {
-      return Counter.longs(name, AggregationKind.MIN);
-    }
   }
 
   /**
    * A {@code CombineFn} that computes the minimum of a collection of {@code Double}s, useful as an
    * argument to {@link Combine#globally} or {@link Combine#perKey}.
    */
-  public static class MinDoubleFn extends Combine.BinaryCombineDoubleFn
-      implements CounterProvider<Double> {
+  public static class MinDoubleFn extends Combine.BinaryCombineDoubleFn {
 
     @Override
     public double apply(double left, double right) {
@@ -276,10 +259,5 @@ public class Min {
     public double identity() {
       return Double.POSITIVE_INFINITY;
     }
-
-    @Override
-    public Counter<Double> getCounter(String name) {
-      return Counter.doubles(name, AggregationKind.MIN);
-    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d20a7ada/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sum.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sum.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sum.java
index 07f78b5..27c5ced 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sum.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sum.java
@@ -17,10 +17,6 @@
  */
 package org.apache.beam.sdk.transforms;
 
-import org.apache.beam.sdk.util.common.Counter;
-import org.apache.beam.sdk.util.common.Counter.AggregationKind;
-import org.apache.beam.sdk.util.common.CounterProvider;
-
 /**
  * {@code PTransform}s for computing the sum of the elements in a
  * {@code PCollection}, or the sum of the values associated with
@@ -123,8 +119,7 @@ public class Sum {
    * {@code Iterable} of {@code Integer}s, useful as an argument to
    * {@link Combine#globally} or {@link Combine#perKey}.
    */
-  public static class SumIntegerFn
-      extends Combine.BinaryCombineIntegerFn implements CounterProvider<Integer> {
+  public static class SumIntegerFn extends Combine.BinaryCombineIntegerFn {
     @Override
     public int apply(int a, int b) {
       return a + b;
@@ -134,11 +129,6 @@ public class Sum {
     public int identity() {
       return 0;
     }
-
-    @Override
-    public Counter<Integer> getCounter(String name) {
-      return Counter.ints(name, AggregationKind.SUM);
-    }
   }
 
   /**
@@ -147,7 +137,7 @@ public class Sum {
    * {@link Combine#globally} or {@link Combine#perKey}.
    */
   public static class SumLongFn
-      extends Combine.BinaryCombineLongFn implements CounterProvider<Long> {
+      extends Combine.BinaryCombineLongFn {
     @Override
     public long apply(long a, long b) {
       return a + b;
@@ -157,11 +147,6 @@ public class Sum {
     public long identity() {
       return 0;
     }
-
-    @Override
-    public Counter<Long> getCounter(String name) {
-      return Counter.longs(name, AggregationKind.SUM);
-    }
   }
 
   /**
@@ -169,8 +154,7 @@ public class Sum {
    * {@code Iterable} of {@code Double}s, useful as an argument to
    * {@link Combine#globally} or {@link Combine#perKey}.
    */
-  public static class SumDoubleFn
-      extends Combine.BinaryCombineDoubleFn implements CounterProvider<Double> {
+  public static class SumDoubleFn extends Combine.BinaryCombineDoubleFn {
     @Override
     public double apply(double a, double b) {
       return a + b;
@@ -180,10 +164,5 @@ public class Sum {
     public double identity() {
       return 0;
     }
-
-    @Override
-    public Counter<Double> getCounter(String name) {
-      return Counter.doubles(name, AggregationKind.SUM);
-    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d20a7ada/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CounterAggregator.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CounterAggregator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CounterAggregator.java
deleted file mode 100644
index 5bde8ef..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CounterAggregator.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.Max;
-import org.apache.beam.sdk.transforms.Min;
-import org.apache.beam.sdk.transforms.Sum;
-import org.apache.beam.sdk.util.common.Counter;
-import org.apache.beam.sdk.util.common.CounterProvider;
-import org.apache.beam.sdk.util.common.CounterSet;
-import org.apache.beam.sdk.util.common.CounterSet.AddCounterMutator;
-import com.google.common.annotations.VisibleForTesting;
-
-/**
- * An implementation of the {@code Aggregator} interface that uses a
- * {@link Counter} as the underlying representation. Supports {@link CombineFn}s
- * from the {@link Sum}, {@link Min} and {@link Max} classes.
- *
- * @param <InputT> the type of input values
- * @param <AccumT> the type of accumulator values
- * @param <OutputT> the type of output value
- */
-public class CounterAggregator<InputT, AccumT, OutputT>
-    implements Aggregator<InputT, OutputT> {
-
-  private static class CounterAggregatorFactory implements AggregatorFactory {
-    private final AddCounterMutator addCounterMutator;
-
-    private CounterAggregatorFactory(CounterSet.AddCounterMutator addCounterMutator) {
-      this.addCounterMutator = addCounterMutator;
-    }
-
-    @Override
-    public <InputT, AccumT, OutputT> Aggregator<InputT, OutputT> createAggregatorForDoFn(
-        Class<?> fnClass, ExecutionContext.StepContext stepContext,
-        String userName, CombineFn<InputT, AccumT, OutputT> combine) {
-      boolean isSystem = fnClass.isAnnotationPresent(SystemDoFnInternal.class);
-      String mangledName = (isSystem ? "" : "user-") + stepContext.getStepName() + "-" + userName;
-
-      return new CounterAggregator<>(mangledName, combine, addCounterMutator);
-    }
-  }
-
-  private final Counter<InputT> counter;
-  private final CombineFn<InputT, AccumT, OutputT> combiner;
-
-  /**
-   * Create a factory for producing {@link CounterAggregator CounterAggregators} backed by the given
-   * {@link CounterSet.AddCounterMutator}.
-   */
-  public static AggregatorFactory factoryFor(
-      CounterSet.AddCounterMutator addCounterMutator) {
-    return new CounterAggregatorFactory(addCounterMutator);
-  }
-
-  /**
-   * Constructs a new aggregator with the given name and aggregation logic
-   * specified in the CombineFn argument. The underlying counter is
-   * automatically added into the provided CounterSet.
-   *
-   *  <p>If a counter with the same name already exists, it will be reused, as
-   * long as it has the same type.
-   */
-  @VisibleForTesting CounterAggregator(
-      String name, CombineFn<? super InputT, AccumT, OutputT> combiner,
-      CounterSet.AddCounterMutator addCounterMutator) {
-    // Safe contravariant cast
-    this(constructCounter(name, combiner), addCounterMutator,
-        (CombineFn<InputT, AccumT, OutputT>) combiner);
-  }
-
-  private CounterAggregator(Counter<InputT> counter,
-      CounterSet.AddCounterMutator addCounterMutator,
-      CombineFn<InputT, AccumT, OutputT> combiner) {
-    try {
-      this.counter = addCounterMutator.addCounter(counter);
-    } catch (IllegalArgumentException ex) {
-      throw new IllegalArgumentException(
-          "aggregator's name collides with an existing aggregator "
-          + "or system-provided counter of an incompatible type");
-    }
-    this.combiner = combiner;
-  }
-
-  private static <T> Counter<T> constructCounter(String name,
-      CombineFn<? super T, ?, ?> combiner) {
-    if (combiner instanceof CounterProvider) {
-      @SuppressWarnings("unchecked")
-      CounterProvider<T> counterProvider = (CounterProvider<T>) combiner;
-      return counterProvider.getCounter(name);
-    } else {
-      throw new IllegalArgumentException("unsupported combiner in Aggregator: "
-        + combiner.getClass().getName());
-    }
-  }
-
-  @Override
-  public void addValue(InputT value) {
-    counter.addValue(value);
-  }
-
-  @Override
-  public String getName() {
-    return counter.getFlatName();
-  }
-
-  @Override
-  public CombineFn<InputT, ?, OutputT> getCombineFn() {
-    return combiner;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d20a7ada/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/Counter.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/Counter.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/Counter.java
deleted file mode 100644
index 550c648..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/Counter.java
+++ /dev/null
@@ -1,1287 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util.common;
-
-import static org.apache.beam.sdk.util.common.Counter.AggregationKind.AND;
-import static org.apache.beam.sdk.util.common.Counter.AggregationKind.MEAN;
-import static org.apache.beam.sdk.util.common.Counter.AggregationKind.OR;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import org.apache.beam.sdk.values.TypeDescriptor;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.util.concurrent.AtomicDouble;
-
-import java.util.Objects;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-
-import javax.annotation.Nullable;
-
-/**
- * A Counter enables the aggregation of a stream of values over time.  The
- * cumulative aggregate value is updated as new values are added, or it can be
- * reset to a new value.  Multiple kinds of aggregation are supported depending
- * on the type of the counter.
- *
- * <p>Counters compare using value equality of their name, kind, and
- * cumulative value.  Equal counters should have equal toString()s.
- *
- * <p>After all possible mutations have completed, the reader should check
- * {@link #isDirty} for every counter, otherwise updates may be lost.
- *
- * <p>A counter may become dirty without a corresponding update to the value.
- * This generally will occur when the calls to {@code addValue()}, {@code committing()},
- * and {@code committed()} are interleaved such that the value is updated
- * between the calls to committing and the read of the value.
- *
- * @param <T> the type of values aggregated by this counter
- */
-public abstract class Counter<T> {
-  /**
-   * Possible kinds of counter aggregation.
-   */
-  public static enum AggregationKind {
-
-    /**
-     * Computes the sum of all added values.
-     * Applicable to {@link Integer}, {@link Long}, and {@link Double} values.
-     */
-    SUM,
-
-    /**
-     * Computes the maximum value of all added values.
-     * Applicable to {@link Integer}, {@link Long}, and {@link Double} values.
-     */
-    MAX,
-
-    /**
-     * Computes the minimum value of all added values.
-     * Applicable to {@link Integer}, {@link Long}, and {@link Double} values.
-     */
-    MIN,
-
-    /**
-     * Computes the arithmetic mean of all added values.  Applicable to
-     * {@link Integer}, {@link Long}, and {@link Double} values.
-     */
-    MEAN,
-
-    /**
-     * Computes boolean AND over all added values.
-     * Applicable only to {@link Boolean} values.
-     */
-    AND,
-
-    /**
-     * Computes boolean OR over all added values. Applicable only to
-     * {@link Boolean} values.
-     */
-    OR
-    // TODO: consider adding VECTOR_SUM, HISTOGRAM, KV_SET, PRODUCT, TOP.
-  }
-
-  /**
-   * Constructs a new {@link Counter} that aggregates {@link Integer}, values
-   * according to the desired aggregation kind. The supported aggregation kinds
-   * are {@link AggregationKind#SUM}, {@link AggregationKind#MIN},
-   * {@link AggregationKind#MAX}, and {@link AggregationKind#MEAN}.
-   * This is a convenience wrapper over a
-   * {@link Counter} implementation that aggregates {@link Long} values. This is
-   * useful when the application handles (boxed) {@link Integer} values that
-   * are not readily convertible to the (boxed) {@link Long} values otherwise
-   * expected by the {@link Counter} implementation aggregating {@link Long}
-   * values.
-   *
-   * @param name the name of the new counter
-   * @param kind the new counter's aggregation kind
-   * @return the newly constructed Counter
-   * @throws IllegalArgumentException if the aggregation kind is not supported
-   */
-  public static Counter<Integer> ints(CounterName name, AggregationKind kind) {
-    return new IntegerCounter(name, kind);
-  }
-
-  /**
-   * Constructs a new {@code Counter<Integer>} with an unstructured name.
-   *
-   * @deprecated use {@link #ints(CounterName, AggregationKind)}.
-   */
-  @Deprecated
-  public static Counter<Integer> ints(String name, AggregationKind kind) {
-    return new IntegerCounter(CounterName.named(name), kind);
-  }
-
-  /**
-   * Constructs a new {@link Counter} that aggregates {@link Long} values
-   * according to the desired aggregation kind. The supported aggregation kinds
-   * are {@link AggregationKind#SUM}, {@link AggregationKind#MIN},
-   * {@link AggregationKind#MAX}, and {@link AggregationKind#MEAN}.
-   *
-   * @param name the name of the new counter
-   * @param kind the new counter's aggregation kind
-   * @return the newly constructed Counter
-   * @throws IllegalArgumentException if the aggregation kind is not supported
-   */
-  public static Counter<Long> longs(CounterName name, AggregationKind kind) {
-    return new LongCounter(name, kind);
-  }
-
-  /**
-   * Constructs a new {@code Counter<Long>} with an unstructured name.
-   *
-   * @deprecated use {@link #longs(CounterName, AggregationKind)}.
-   */
-  @Deprecated
-  public static Counter<Long> longs(String name, AggregationKind kind) {
-    return new LongCounter(CounterName.named(name), kind);
-  }
-
-  /**
-   * Constructs a new {@link Counter} that aggregates {@link Double} values
-   * according to the desired aggregation kind. The supported aggregation kinds
-   * are {@link AggregationKind#SUM}, {@link AggregationKind#MIN},
-   * {@link AggregationKind#MAX}, and {@link AggregationKind#MEAN}.
-   *
-   * @param name the name of the new counter
-   * @param kind the new counter's aggregation kind
-   * @return the newly constructed Counter
-   * @throws IllegalArgumentException if the aggregation kind is not supported
-   */
-  public static Counter<Double> doubles(CounterName name, AggregationKind kind) {
-    return new DoubleCounter(name, kind);
-  }
-
-  /**
-   * Constructs a new {@code Counter<Double>} with an unstructured name.
-   *
-   * @deprecated use {@link #doubles(CounterName, AggregationKind)}.
-   */
-  @Deprecated
-  public static Counter<Double> doubles(String name, AggregationKind kind) {
-    return new DoubleCounter(CounterName.named(name), kind);
-  }
-
-  /**
-   * Constructs a new {@link Counter} that aggregates {@link Boolean} values
-   * according to the desired aggregation kind. The only supported aggregation
-   * kinds are {@link AggregationKind#AND} and {@link AggregationKind#OR}.
-   *
-   * @param name the name of the new counter
-   * @param kind the new counter's aggregation kind
-   * @return the newly constructed Counter
-   * @throws IllegalArgumentException if the aggregation kind is not supported
-   */
-  public static Counter<Boolean> booleans(CounterName name, AggregationKind kind) {
-    return new BooleanCounter(name, kind);
-  }
-
-  /**
-   * Constructs a new {@code Counter<Boolean>} with an unstructured name.
-   *
-   * @deprecated use {@link #booleans(CounterName, AggregationKind)}.
-   */
-  @Deprecated
-  public static Counter<Boolean> booleans(String name, AggregationKind kind) {
-    return new BooleanCounter(CounterName.named(name), kind);
-  }
-
-  //////////////////////////////////////////////////////////////////////////////
-
-  /**
-   * Adds a new value to the aggregation stream. Returns this (to allow method
-   * chaining).
-   */
-  public abstract Counter<T> addValue(T value);
-
-  /**
-   * Resets the aggregation stream to this new value. This aggregator must not
-   * be a MEAN aggregator. Returns this (to allow method chaining).
-   */
-  public abstract Counter<T> resetToValue(T value);
-
-  /**
-   * Resets the aggregation stream to this new value. Returns this (to allow
-   * method chaining). The value of elementCount must be non-negative, and this
-   * aggregator must be a MEAN aggregator.
-   */
-  public abstract Counter<T> resetMeanToValue(long elementCount, T value);
-
-  /**
-   * Resets the counter's delta value to have no values accumulated and returns
-   * the value of the delta prior to the reset.
-   *
-   * @return the aggregate delta at the time this method is called
-   */
-  public abstract T getAndResetDelta();
-
-  /**
-   * Resets the counter's delta value to have no values accumulated and returns
-   * the value of the delta prior to the reset, for a MEAN counter.
-   *
-   * @return the mean delta t the time this method is called
-   */
-  public abstract CounterMean<T> getAndResetMeanDelta();
-
-  /**
-   * Returns the counter's flat name.
-   */
-  public String getFlatName() {
-    return name.getFlatName();
-  }
-
-  /**
-   * Returns the counter's name.
-   *
-   * @deprecated use {@link #getFlatName}.
-   */
-  @Deprecated
-  public String getName() {
-    return name.getFlatName();
-  }
-
-  /**
-   * Returns the counter's aggregation kind.
-   */
-  public AggregationKind getKind() {
-    return kind;
-  }
-
-  /**
-   * Returns the counter's type.
-   */
-  public Class<?> getType() {
-    return new TypeDescriptor<T>(getClass()) {}.getRawType();
-  }
-
-  /**
-   * Returns the aggregated value, or the sum for MEAN aggregation, either
-   * total or, if delta, since the last update extraction or resetDelta.
-   */
-  public abstract T getAggregate();
-
-  /**
-   * The mean value of a {@code Counter}, represented as an aggregate value and
-   * a count.
-   *
-   * @param <T> the type of the aggregate
-   */
-  public static interface CounterMean<T> {
-    /**
-     * Gets the aggregate value of this {@code CounterMean}.
-     */
-    T getAggregate();
-
-    /**
-     * Gets the count of this {@code CounterMean}.
-     */
-    long getCount();
-  }
-
-  /**
-   * Returns the mean in the form of a CounterMean, or null if this is not a
-   * MEAN counter.
-   */
-  @Nullable
-  public abstract CounterMean<T> getMean();
-
-  /**
-   * Represents whether counters' values have been committed to the backend.
-   *
-   * <p>Runners can use this information to optimize counters updates.
-   * For example, if counters are committed, runners may choose to skip the updates.
-   *
-   * <p>Counters' state transition table:
-   * {@code
-   * Action\Current State         COMMITTED        DIRTY        COMMITTING
-   * addValue()                   DIRTY            DIRTY        DIRTY
-   * committing()                 None             COMMITTING   None
-   * committed()                  None             None         COMMITTED
-   * }
-   */
-  @VisibleForTesting
-  enum CommitState {
-    /**
-     * There are no local updates that haven't been committed to the backend.
-     */
-    COMMITTED,
-    /**
-     * There are local updates that haven't been committed to the backend.
-     */
-    DIRTY,
-    /**
-     * Local updates are committing to the backend, but are still pending.
-     */
-    COMMITTING,
-  }
-
-  /**
-   * Returns if the counter contains non-committed aggregate.
-   */
-  public boolean isDirty() {
-    return commitState.get() != CommitState.COMMITTED;
-  }
-
-  /**
-   * Changes the counter from {@code CommitState.DIRTY} to {@code CommitState.COMMITTING}.
-   *
-   * @return true if successful. False return indicates that the commit state
-   * was not in {@code CommitState.DIRTY}.
-   */
-  public boolean committing() {
-    return commitState.compareAndSet(CommitState.DIRTY, CommitState.COMMITTING);
-  }
-
-  /**
-   * Changes the counter from {@code CommitState.COMMITTING} to {@code CommitState.COMMITTED}.
-   *
-   * @return true if successful.
-   *
-   * <p>False return indicates that the counter was updated while the committing is pending.
-   * That counter update might or might not has been committed. The {@code commitState} has to
-   * stay in {@code CommitState.DIRTY}.
-   */
-  public boolean committed() {
-    return commitState.compareAndSet(CommitState.COMMITTING, CommitState.COMMITTED);
-  }
-
-  /**
-   * Sets the counter to {@code CommitState.DIRTY}.
-   *
-   * <p>Must be called at the end of {@link #addValue}, {@link #resetToValue},
-   * {@link #resetMeanToValue}, and {@link #merge}.
-   */
-  protected void setDirty() {
-    commitState.set(CommitState.DIRTY);
-  }
-
-  /**
-   * Returns a string representation of the Counter. Useful for debugging logs.
-   * Example return value: "ElementCount:SUM(15)".
-   */
-  @Override
-  public String toString() {
-    StringBuilder sb = new StringBuilder();
-    sb.append(getFlatName());
-    sb.append(":");
-    sb.append(getKind());
-    sb.append("(");
-    switch (kind) {
-      case SUM:
-      case MAX:
-      case MIN:
-      case AND:
-      case OR:
-        sb.append(getAggregate());
-        break;
-      case MEAN:
-        sb.append(getMean());
-        break;
-      default:
-        throw illegalArgumentException();
-    }
-    sb.append(")");
-
-    return sb.toString();
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) {
-      return true;
-    } else if (o instanceof Counter) {
-      Counter<?> that = (Counter<?>) o;
-      if (this.name.equals(that.name) && this.kind == that.kind
-          && this.getClass().equals(that.getClass())) {
-        if (kind == MEAN) {
-          CounterMean<T> thisMean = this.getMean();
-          CounterMean<?> thatMean = that.getMean();
-          return thisMean == thatMean
-              || (Objects.equals(thisMean.getAggregate(), thatMean.getAggregate())
-                     && thisMean.getCount() == thatMean.getCount());
-        } else {
-          return Objects.equals(this.getAggregate(), that.getAggregate());
-        }
-      }
-    }
-    return false;
-  }
-
-  @Override
-  public int hashCode() {
-    if (kind == MEAN) {
-      CounterMean<T> mean = getMean();
-      return Objects.hash(getClass(), name, kind, mean.getAggregate(), mean.getCount());
-    } else {
-      return Objects.hash(getClass(), name, kind, getAggregate());
-    }
-  }
-
-  /**
-   * Returns whether this Counter is compatible with that Counter.  If
-   * so, they can be merged into a single Counter.
-   */
-  public boolean isCompatibleWith(Counter<?> that) {
-    return this.name.equals(that.name)
-        && this.kind == that.kind
-        && this.getClass().equals(that.getClass());
-  }
-
-  /**
-   * Merges this counter with the provided counter, returning this counter with the combined value
-   * of both counters. This may reset the delta of this counter.
-   *
-   * @throws IllegalArgumentException if the provided Counter is not compatible with this Counter
-   */
-  public abstract Counter<T> merge(Counter<T> that);
-
-  //////////////////////////////////////////////////////////////////////////////
-
-  /** The name and metadata of this counter. */
-  protected final CounterName name;
-
-  /** The kind of aggregation function to apply to this counter. */
-  protected final AggregationKind kind;
-
-  /** The commit state of this counter. */
-  protected final AtomicReference<CommitState> commitState;
-
-  protected Counter(CounterName name, AggregationKind kind) {
-    this.name = name;
-    this.kind = kind;
-    this.commitState = new AtomicReference<>(CommitState.COMMITTED);
-  }
-
-  //////////////////////////////////////////////////////////////////////////////
-
-  /**
-   * Implements a {@link Counter} for {@link Long} values.
-   */
-  private static class LongCounter extends Counter<Long> {
-    private final AtomicLong aggregate;
-    private final AtomicLong deltaAggregate;
-    private final AtomicReference<LongCounterMean> mean;
-    private final AtomicReference<LongCounterMean> deltaMean;
-
-    /** Initializes a new {@link Counter} for {@link Long} values. */
-    private LongCounter(CounterName name, AggregationKind kind) {
-      super(name, kind);
-      switch (kind) {
-        case MEAN:
-          mean = new AtomicReference<>();
-          deltaMean = new AtomicReference<>();
-          getAndResetMeanDelta();
-          mean.set(deltaMean.get());
-          aggregate = deltaAggregate = null;
-          break;
-        case SUM:
-        case MAX:
-        case MIN:
-          aggregate = new AtomicLong();
-          deltaAggregate = new AtomicLong();
-          getAndResetDelta();
-          aggregate.set(deltaAggregate.get());
-          mean = deltaMean = null;
-          break;
-        default:
-          throw illegalArgumentException();
-      }
-    }
-
-    @Override
-    public LongCounter addValue(Long value) {
-      try {
-        switch (kind) {
-          case SUM:
-            aggregate.addAndGet(value);
-            deltaAggregate.addAndGet(value);
-            break;
-          case MEAN:
-            addToMeanAndSet(value, mean);
-            addToMeanAndSet(value, deltaMean);
-            break;
-          case MAX:
-            maxAndSet(value, aggregate);
-            maxAndSet(value, deltaAggregate);
-            break;
-          case MIN:
-            minAndSet(value, aggregate);
-            minAndSet(value, deltaAggregate);
-            break;
-          default:
-            throw illegalArgumentException();
-        }
-        return this;
-      } finally {
-        setDirty();
-      }
-    }
-
-    private void minAndSet(Long value, AtomicLong target) {
-      long current;
-      long update;
-      do {
-        current = target.get();
-        update = Math.min(value, current);
-      } while (update < current && !target.compareAndSet(current, update));
-    }
-
-    private void maxAndSet(Long value, AtomicLong target) {
-      long current;
-      long update;
-      do {
-        current = target.get();
-        update = Math.max(value, current);
-      } while (update > current && !target.compareAndSet(current, update));
-    }
-
-    private void addToMeanAndSet(Long value, AtomicReference<LongCounterMean> target) {
-      LongCounterMean current;
-      LongCounterMean update;
-      do {
-        current = target.get();
-        update = new LongCounterMean(current.getAggregate() + value, current.getCount() + 1L);
-      } while (!target.compareAndSet(current, update));
-    }
-
-    @Override
-    public Long getAggregate() {
-      if (kind != MEAN) {
-        return aggregate.get();
-      } else {
-        return getMean().getAggregate();
-      }
-    }
-
-    @Override
-    public Long getAndResetDelta() {
-      switch (kind) {
-        case SUM:
-          return deltaAggregate.getAndSet(0L);
-        case MAX:
-          return deltaAggregate.getAndSet(Long.MIN_VALUE);
-        case MIN:
-          return deltaAggregate.getAndSet(Long.MAX_VALUE);
-        default:
-          throw illegalArgumentException();
-      }
-    }
-
-    @Override
-    public Counter<Long> resetToValue(Long value) {
-      try {
-        if (kind == MEAN) {
-          throw illegalArgumentException();
-        }
-        aggregate.set(value);
-        deltaAggregate.set(value);
-        return this;
-      } finally {
-        setDirty();
-      }
-    }
-
-    @Override
-    public Counter<Long> resetMeanToValue(long elementCount, Long value) {
-      try {
-        if (kind != MEAN) {
-          throw illegalArgumentException();
-        }
-        if (elementCount < 0) {
-          throw new IllegalArgumentException("elementCount must be non-negative");
-        }
-        LongCounterMean counterMean = new LongCounterMean(value, elementCount);
-        mean.set(counterMean);
-        deltaMean.set(counterMean);
-        return this;
-      } finally {
-        setDirty();
-      }
-    }
-
-    @Override
-    public CounterMean<Long> getAndResetMeanDelta() {
-      if (kind != MEAN) {
-        throw illegalArgumentException();
-      }
-      return deltaMean.getAndSet(new LongCounterMean(0L, 0L));
-    }
-
-    @Override
-    @Nullable
-    public CounterMean<Long> getMean() {
-      if (kind != MEAN) {
-        throw illegalArgumentException();
-      }
-      return mean.get();
-    }
-
-    @Override
-    public Counter<Long> merge(Counter<Long> that) {
-      try {
-        checkArgument(
-            this.isCompatibleWith(that), "Counters %s and %s are incompatible", this, that);
-        switch (kind) {
-          case SUM:
-          case MIN:
-          case MAX:
-            return addValue(that.getAggregate());
-          case MEAN:
-            CounterMean<Long> thisCounterMean = this.getMean();
-            CounterMean<Long> thatCounterMean = that.getMean();
-            return resetMeanToValue(
-                thisCounterMean.getCount() + thatCounterMean.getCount(),
-                thisCounterMean.getAggregate() + thatCounterMean.getAggregate());
-          default:
-            throw illegalArgumentException();
-        }
-      } finally {
-        setDirty();
-      }
-    }
-
-    private static class LongCounterMean implements CounterMean<Long> {
-      private final long aggregate;
-      private final long count;
-
-      public LongCounterMean(long aggregate, long count) {
-        this.aggregate = aggregate;
-        this.count = count;
-      }
-
-      @Override
-      public Long getAggregate() {
-        return aggregate;
-      }
-
-      @Override
-      public long getCount() {
-        return count;
-      }
-
-      @Override
-      public String toString() {
-        return aggregate + "/" + count;
-      }
-    }
-  }
-
-  /**
-   * Implements a {@link Counter} for {@link Double} values.
-   */
-  private static class DoubleCounter extends Counter<Double> {
-    AtomicDouble aggregate;
-    AtomicDouble deltaAggregate;
-    AtomicReference<DoubleCounterMean> mean;
-    AtomicReference<DoubleCounterMean> deltaMean;
-
-    /** Initializes a new {@link Counter} for {@link Double} values. */
-    private DoubleCounter(CounterName name, AggregationKind kind) {
-      super(name, kind);
-      switch (kind) {
-        case MEAN:
-          aggregate = deltaAggregate = null;
-          mean = new AtomicReference<>();
-          deltaMean = new AtomicReference<>();
-          getAndResetMeanDelta();
-          mean.set(deltaMean.get());
-          break;
-        case SUM:
-        case MAX:
-        case MIN:
-          mean = deltaMean = null;
-          aggregate = new AtomicDouble();
-          deltaAggregate = new AtomicDouble();
-          getAndResetDelta();
-          aggregate.set(deltaAggregate.get());
-          break;
-        default:
-          throw illegalArgumentException();
-      }
-    }
-
-    @Override
-    public DoubleCounter addValue(Double value) {
-      try {
-        switch (kind) {
-          case SUM:
-            aggregate.addAndGet(value);
-            deltaAggregate.addAndGet(value);
-            break;
-          case MEAN:
-            addToMeanAndSet(value, mean);
-            addToMeanAndSet(value, deltaMean);
-            break;
-          case MAX:
-            maxAndSet(value, aggregate);
-            maxAndSet(value, deltaAggregate);
-            break;
-          case MIN:
-            minAndSet(value, aggregate);
-            minAndSet(value, deltaAggregate);
-            break;
-          default:
-            throw illegalArgumentException();
-        }
-        return this;
-      } finally {
-        setDirty();
-      }
-    }
-
-    private void addToMeanAndSet(Double value, AtomicReference<DoubleCounterMean> target) {
-      DoubleCounterMean current;
-      DoubleCounterMean update;
-      do {
-        current = target.get();
-        update = new DoubleCounterMean(current.getAggregate() + value, current.getCount() + 1);
-      } while (!target.compareAndSet(current, update));
-    }
-
-    private void maxAndSet(Double value, AtomicDouble target) {
-      double current;
-      double update;
-      do {
-        current = target.get();
-        update = Math.max(current, value);
-      } while (update > current && !target.compareAndSet(current, update));
-    }
-
-    private void minAndSet(Double value, AtomicDouble target) {
-      double current;
-      double update;
-      do {
-        current = target.get();
-        update = Math.min(current, value);
-      } while (update < current && !target.compareAndSet(current, update));
-    }
-
-    @Override
-    public Double getAndResetDelta() {
-      switch (kind) {
-        case SUM:
-          return deltaAggregate.getAndSet(0.0);
-        case MAX:
-          return deltaAggregate.getAndSet(Double.NEGATIVE_INFINITY);
-        case MIN:
-          return deltaAggregate.getAndSet(Double.POSITIVE_INFINITY);
-        default:
-          throw illegalArgumentException();
-      }
-    }
-
-    @Override
-    public Counter<Double> resetToValue(Double value) {
-      try {
-        if (kind == MEAN) {
-          throw illegalArgumentException();
-        }
-        aggregate.set(value);
-        deltaAggregate.set(value);
-        return this;
-      } finally {
-        setDirty();
-      }
-    }
-
-    @Override
-    public Counter<Double> resetMeanToValue(long elementCount, Double value) {
-      try {
-        if (kind != MEAN) {
-          throw illegalArgumentException();
-        }
-        if (elementCount < 0) {
-          throw new IllegalArgumentException("elementCount must be non-negative");
-        }
-        DoubleCounterMean counterMean = new DoubleCounterMean(value, elementCount);
-        mean.set(counterMean);
-        deltaMean.set(counterMean);
-        return this;
-      } finally {
-        setDirty();
-      }
-    }
-
-    @Override
-    public CounterMean<Double> getAndResetMeanDelta() {
-      if (kind != MEAN) {
-        throw illegalArgumentException();
-      }
-      return deltaMean.getAndSet(new DoubleCounterMean(0.0, 0L));
-    }
-
-    @Override
-    public Double getAggregate() {
-      if (kind != MEAN) {
-        return aggregate.get();
-      } else {
-        return getMean().getAggregate();
-      }
-    }
-
-    @Override
-    @Nullable
-    public CounterMean<Double> getMean() {
-      if (kind != MEAN) {
-        throw illegalArgumentException();
-      }
-      return mean.get();
-    }
-
-    @Override
-    public Counter<Double> merge(Counter<Double> that) {
-      try {
-        checkArgument(
-            this.isCompatibleWith(that), "Counters %s and %s are incompatible", this, that);
-        switch (kind) {
-          case SUM:
-          case MIN:
-          case MAX:
-            return addValue(that.getAggregate());
-          case MEAN:
-            CounterMean<Double> thisCounterMean = this.getMean();
-            CounterMean<Double> thatCounterMean = that.getMean();
-            return resetMeanToValue(
-                thisCounterMean.getCount() + thatCounterMean.getCount(),
-                thisCounterMean.getAggregate() + thatCounterMean.getAggregate());
-          default:
-            throw illegalArgumentException();
-        }
-      } finally {
-        setDirty();
-      }
-    }
-
-    private static class DoubleCounterMean implements CounterMean<Double> {
-      private final double aggregate;
-      private final long count;
-
-      public DoubleCounterMean(double aggregate, long count) {
-        this.aggregate = aggregate;
-        this.count = count;
-      }
-
-      @Override
-      public Double getAggregate() {
-        return aggregate;
-      }
-
-      @Override
-      public long getCount() {
-        return count;
-      }
-
-      @Override
-      public String toString() {
-        return aggregate + "/" + count;
-      }
-    }
-  }
-
-  /**
-   * Implements a {@link Counter} for {@link Boolean} values.
-   */
-  private static class BooleanCounter extends Counter<Boolean> {
-    private final AtomicBoolean aggregate;
-    private final AtomicBoolean deltaAggregate;
-
-    /** Initializes a new {@link Counter} for {@link Boolean} values. */
-    private BooleanCounter(CounterName name, AggregationKind kind) {
-      super(name, kind);
-      aggregate = new AtomicBoolean();
-      deltaAggregate = new AtomicBoolean();
-      getAndResetDelta();
-      aggregate.set(deltaAggregate.get());
-    }
-
-    @Override
-    public BooleanCounter addValue(Boolean value) {
-      try {
-        if (kind.equals(AND) && !value) {
-          aggregate.set(value);
-          deltaAggregate.set(value);
-        } else if (kind.equals(OR) && value) {
-          aggregate.set(value);
-          deltaAggregate.set(value);
-        }
-        return this;
-      } finally {
-        setDirty();
-      }
-    }
-
-    @Override
-    public Boolean getAndResetDelta() {
-      switch (kind) {
-        case AND:
-          return deltaAggregate.getAndSet(true);
-        case OR:
-          return deltaAggregate.getAndSet(false);
-        default:
-          throw illegalArgumentException();
-      }
-    }
-
-    @Override
-    public Counter<Boolean> resetToValue(Boolean value) {
-      try {
-        aggregate.set(value);
-        deltaAggregate.set(value);
-        return this;
-      } finally {
-        setDirty();
-      }
-    }
-
-    @Override
-    public Counter<Boolean> resetMeanToValue(long elementCount, Boolean value) {
-      throw illegalArgumentException();
-    }
-
-    @Override
-    public CounterMean<Boolean> getAndResetMeanDelta() {
-      throw illegalArgumentException();
-    }
-
-    @Override
-    public Boolean getAggregate() {
-      return aggregate.get();
-    }
-
-    @Override
-    @Nullable
-    public CounterMean<Boolean> getMean() {
-      throw illegalArgumentException();
-    }
-
-    @Override
-    public Counter<Boolean> merge(Counter<Boolean> that) {
-      try {
-        checkArgument(
-            this.isCompatibleWith(that), "Counters %s and %s are incompatible", this, that);
-        return addValue(that.getAggregate());
-      } finally {
-        setDirty();
-      }
-    }
-  }
-
-  /**
-   * Implements a {@link Counter} for {@link String} values.
-   */
-  private static class StringCounter extends Counter<String> {
-    /** Initializes a new {@link Counter} for {@link String} values. */
-    private StringCounter(CounterName name, AggregationKind kind) {
-      super(name, kind);
-      // TODO: Support MIN, MAX of Strings.
-      throw illegalArgumentException();
-    }
-
-    @Override
-    public StringCounter addValue(String value) {
-      switch (kind) {
-        default:
-          throw illegalArgumentException();
-      }
-    }
-
-    @Override
-    public Counter<String> resetToValue(String value) {
-      switch (kind) {
-        default:
-          throw illegalArgumentException();
-      }
-    }
-
-    @Override
-    public Counter<String> resetMeanToValue(long elementCount, String value) {
-      switch (kind) {
-        default:
-          throw illegalArgumentException();
-      }
-    }
-
-    @Override
-    public String getAndResetDelta() {
-      switch (kind) {
-        default:
-          throw illegalArgumentException();
-      }
-    }
-
-    @Override
-    public CounterMean<String> getAndResetMeanDelta() {
-      switch (kind) {
-        default:
-          throw illegalArgumentException();
-      }
-    }
-
-    @Override
-    public String getAggregate() {
-      switch (kind) {
-        default:
-          throw illegalArgumentException();
-      }
-    }
-
-    @Override
-    @Nullable
-    public CounterMean<String> getMean() {
-      switch (kind) {
-        default:
-          throw illegalArgumentException();
-      }
-    }
-
-    @Override
-    public Counter<String> merge(Counter<String> that) {
-      checkArgument(this.isCompatibleWith(that), "Counters %s and %s are incompatible", this, that);
-      switch (kind) {
-        default:
-          throw illegalArgumentException();
-      }
-    }
-  }
-
-  /**
-   * Implements a {@link Counter} for {@link Integer} values.
-   */
-  private static class IntegerCounter extends Counter<Integer> {
-    private final AtomicInteger aggregate;
-    private final AtomicInteger deltaAggregate;
-    private final AtomicReference<IntegerCounterMean> mean;
-    private final AtomicReference<IntegerCounterMean> deltaMean;
-
-    /** Initializes a new {@link Counter} for {@link Integer} values. */
-    private IntegerCounter(CounterName name, AggregationKind kind) {
-      super(name, kind);
-      switch (kind) {
-        case MEAN:
-          aggregate = deltaAggregate = null;
-          mean = new AtomicReference<>();
-          deltaMean = new AtomicReference<>();
-          getAndResetMeanDelta();
-          mean.set(deltaMean.get());
-          break;
-        case SUM:
-        case MAX:
-        case MIN:
-          mean = deltaMean = null;
-          aggregate = new AtomicInteger();
-          deltaAggregate = new AtomicInteger();
-          getAndResetDelta();
-          aggregate.set(deltaAggregate.get());
-          break;
-        default:
-          throw illegalArgumentException();
-      }
-    }
-
-    @Override
-    public IntegerCounter addValue(Integer value) {
-      try {
-        switch (kind) {
-          case SUM:
-            aggregate.getAndAdd(value);
-            deltaAggregate.getAndAdd(value);
-            break;
-          case MEAN:
-            addToMeanAndSet(value, mean);
-            addToMeanAndSet(value, deltaMean);
-            break;
-          case MAX:
-            maxAndSet(value, aggregate);
-            maxAndSet(value, deltaAggregate);
-            break;
-          case MIN:
-            minAndSet(value, aggregate);
-            minAndSet(value, deltaAggregate);
-            break;
-          default:
-            throw illegalArgumentException();
-        }
-        return this;
-      } finally {
-        setDirty();
-      }
-    }
-
-    private void addToMeanAndSet(int value, AtomicReference<IntegerCounterMean> target) {
-      IntegerCounterMean current;
-      IntegerCounterMean update;
-      do {
-        current = target.get();
-        update = new IntegerCounterMean(current.getAggregate() + value, current.getCount() + 1);
-      } while (!target.compareAndSet(current, update));
-    }
-
-    private void maxAndSet(int value, AtomicInteger target) {
-      int current;
-      int update;
-      do {
-        current = target.get();
-        update = Math.max(value, current);
-      } while (update > current && !target.compareAndSet(current, update));
-    }
-
-    private void minAndSet(int value, AtomicInteger target) {
-      int current;
-      int update;
-      do {
-        current = target.get();
-        update = Math.min(value, current);
-      } while (update < current && !target.compareAndSet(current, update));
-    }
-
-    @Override
-    public Integer getAndResetDelta() {
-      switch (kind) {
-        case SUM:
-          return deltaAggregate.getAndSet(0);
-        case MAX:
-          return deltaAggregate.getAndSet(Integer.MIN_VALUE);
-        case MIN:
-          return deltaAggregate.getAndSet(Integer.MAX_VALUE);
-        default:
-          throw illegalArgumentException();
-      }
-    }
-
-    @Override
-    public Counter<Integer> resetToValue(Integer value) {
-      try {
-        if (kind == MEAN) {
-          throw illegalArgumentException();
-        }
-        aggregate.set(value);
-        deltaAggregate.set(value);
-        return this;
-      } finally {
-        setDirty();
-      }
-    }
-
-    @Override
-    public Counter<Integer> resetMeanToValue(long elementCount, Integer value) {
-      try {
-        if (kind != MEAN) {
-          throw illegalArgumentException();
-        }
-        if (elementCount < 0) {
-          throw new IllegalArgumentException("elementCount must be non-negative");
-        }
-        IntegerCounterMean counterMean = new IntegerCounterMean(value, elementCount);
-        mean.set(counterMean);
-        deltaMean.set(counterMean);
-        return this;
-      } finally {
-        setDirty();
-      }
-    }
-
-    @Override
-    public CounterMean<Integer> getAndResetMeanDelta() {
-      if (kind != MEAN) {
-        throw illegalArgumentException();
-      }
-      return deltaMean.getAndSet(new IntegerCounterMean(0, 0L));
-    }
-
-    @Override
-    public Integer getAggregate() {
-      if (kind != MEAN) {
-        return aggregate.get();
-      } else {
-        return getMean().getAggregate();
-      }
-    }
-
-    @Override
-    @Nullable
-    public CounterMean<Integer> getMean() {
-      if (kind != MEAN) {
-        throw illegalArgumentException();
-      }
-      return mean.get();
-    }
-
-    @Override
-    public Counter<Integer> merge(Counter<Integer> that) {
-      try {
-        checkArgument(
-            this.isCompatibleWith(that), "Counters %s and %s are incompatible", this, that);
-        switch (kind) {
-          case SUM:
-          case MIN:
-          case MAX:
-            return addValue(that.getAggregate());
-          case MEAN:
-            CounterMean<Integer> thisCounterMean = this.getMean();
-            CounterMean<Integer> thatCounterMean = that.getMean();
-            return resetMeanToValue(
-                thisCounterMean.getCount() + thatCounterMean.getCount(),
-                thisCounterMean.getAggregate() + thatCounterMean.getAggregate());
-          default:
-            throw illegalArgumentException();
-        }
-      } finally {
-        setDirty();
-      }
-    }
-
-    private static class IntegerCounterMean implements CounterMean<Integer> {
-      private final int aggregate;
-      private final long count;
-
-      public IntegerCounterMean(int aggregate, long count) {
-        this.aggregate = aggregate;
-        this.count = count;
-      }
-
-      @Override
-      public Integer getAggregate() {
-        return aggregate;
-      }
-
-      @Override
-      public long getCount() {
-        return count;
-      }
-
-      @Override
-      public String toString() {
-        return aggregate + "/" + count;
-      }
-    }
-  }
-
-  //////////////////////////////////////////////////////////////////////////////
-
-  /**
-   * Constructs an {@link IllegalArgumentException} explaining that this
-   * {@link Counter}'s aggregation kind is not supported by its value type.
-   */
-  protected IllegalArgumentException illegalArgumentException() {
-    return new IllegalArgumentException("Cannot compute " + kind
-        + " aggregation over " + getType().getSimpleName() + " values.");
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d20a7ada/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/CounterName.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/CounterName.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/CounterName.java
deleted file mode 100644
index b46be98..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/CounterName.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util.common;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.google.common.base.Strings;
-
-import java.util.concurrent.atomic.AtomicReference;
-
-/**
- * The name of a counter identifies the user-specified name, as well as the origin,
- * the step the counter is associated with, and a prefix to add to the name.
- *
- * <p>For backwards compatibility, the {@link CounterName} will be converted to
- * a flat name (string) during the migration.
- */
-public class CounterName {
-  /**
-   * Returns a {@link CounterName} with the given name.
-   */
-  public static CounterName named(String name) {
-    return new CounterName(name, "", "", "");
-  }
-
-  /**
-   * Returns a msecs {@link CounterName}.
-   */
-  public static CounterName msecs(String name) {
-    return named(name + "-msecs");
-  }
-
-  /**
-   * Returns a {@link CounterName} identical to this, but with the given origin.
-   */
-  public CounterName withOrigin(String origin) {
-    return new CounterName(this.name, origin, this.stepName, this.prefix);
-  }
-
-  /**
-   * Returns a {@link CounterName} identical to this, but with the given step name.
-   */
-  public CounterName withStepName(String stepName) {
-    return new CounterName(this.name, this.origin, stepName, this.prefix);
-  }
-
-  /**
-   * Returns a {@link CounterName} identical to this, but with the given prefix.
-   */
-  public CounterName withPrefix(String prefix) {
-    return new CounterName(this.name, this.origin, this.stepName, prefix);
-  }
-
-  /**
-   * Name of the counter.
-   *
-   * <p>For example, process-msecs, ElementCount.
-   */
-  private final String name;
-
-  /**
-   * Origin (namespace) of counter name.
-   *
-   * <p>For example, "user" for user-defined counters.
-   * It is empty for counters defined by the SDK or the runner.
-   */
-  private final String origin;
-
-  /**
-   * System defined step name or the named-output of a step.
-   *
-   * <p>For example, {@code s1} or {@code s2.out}.
-   * It may be empty when counters don't associate with step names.
-   */
-  private final String stepName;
-
-  /**
-   * Prefix of group of counters.
-   *
-   * <p>It is empty when counters don't have general prefixes.
-   */
-  private final String prefix;
-
-  /**
-   * Flat name is the equivalent unstructured name.
-   *
-   * <p>It is null before {@link #getFlatName()} is called.
-   */
-  private AtomicReference<String> flatName;
-
-  private CounterName(String name, String origin, String stepName, String prefix) {
-    this.name = checkNotNull(name, "name");
-    this.origin = checkNotNull(origin, "origin");
-    this.stepName = checkNotNull(stepName, "stepName");
-    this.prefix = checkNotNull(prefix, "prefix");
-    this.flatName = new AtomicReference<String>();
-  }
-
-  /**
-   * Returns the flat name of a structured counter.
-   */
-  public String getFlatName() {
-    String ret = flatName.get();
-    if (ret == null) {
-      StringBuilder sb = new StringBuilder();
-      if (!Strings.isNullOrEmpty(prefix)) {
-        // Not all runner versions use "-" to concatenate prefix, it may already have it in it.
-        sb.append(prefix);
-      }
-      if (!Strings.isNullOrEmpty(origin)) {
-        sb.append(origin + "-");
-      }
-      if (!Strings.isNullOrEmpty(stepName)) {
-        sb.append(stepName + "-");
-      }
-      sb.append(name);
-      flatName.compareAndSet(null, sb.toString());
-      ret = flatName.get();
-    }
-    return ret;
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) {
-      return true;
-    } else if (o instanceof CounterName) {
-      CounterName that = (CounterName) o;
-      return this.getFlatName().equals(that.getFlatName());
-    }
-    return false;
-  }
-
-  @Override
-  public int hashCode() {
-    return getFlatName().hashCode();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d20a7ada/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/CounterProvider.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/CounterProvider.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/CounterProvider.java
deleted file mode 100644
index c2550cd..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/CounterProvider.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util.common;
-
-/**
- * A counter provider can provide {@link Counter} instances.
- *
- * @param <T> the input type of the counter.
- */
-public interface CounterProvider<T> {
-  Counter<T> getCounter(String name);
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d20a7ada/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/CounterSet.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/CounterSet.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/CounterSet.java
deleted file mode 100644
index cb0ffe5..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/CounterSet.java
+++ /dev/null
@@ -1,179 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util.common;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import java.util.AbstractSet;
-import java.util.HashMap;
-import java.util.Iterator;
-
-/**
- * A CounterSet maintains a set of {@link Counter}s.
- *
- * <p>Thread-safe.
- */
-public class CounterSet extends AbstractSet<Counter<?>> {
-
-  /** Registered counters. */
-  private final HashMap<String, Counter<?>> counters = new HashMap<>();
-
-  private final AddCounterMutator addCounterMutator = new AddCounterMutator();
-
-  /**
-   * Constructs a CounterSet containing the given Counters.
-   */
-  public CounterSet(Counter<?>... counters) {
-    for (Counter<?> counter : counters) {
-      addNewCounter(counter);
-    }
-  }
-
-  /**
-   * Returns an object that supports adding additional counters into
-   * this CounterSet.
-   */
-  public AddCounterMutator getAddCounterMutator() {
-    return addCounterMutator;
-  }
-
-  /**
-   * Adds a new counter, throwing an exception if a counter of the
-   * same name already exists.
-   */
-  public void addNewCounter(Counter<?> counter) {
-    if (!addCounter(counter)) {
-      throw new IllegalArgumentException(
-          "Counter " + counter + " duplicates an existing counter in " + this);
-    }
-  }
-
-  /**
-   * Adds the given Counter to this CounterSet.
-   *
-   * <p>If a counter with the same name already exists, it will be
-   * reused, as long as it is compatible.
-   *
-   * @return the Counter that was reused, or added
-   * @throws IllegalArgumentException if a counter with the same
-   * name but an incompatible kind had already been added
-   */
-  public synchronized <T> Counter<T> addOrReuseCounter(Counter<T> counter) {
-    String flatName = counter.getFlatName();
-    Counter<?> oldCounter = counters.get(flatName);
-    if (oldCounter == null) {
-      // A new counter.
-      counters.put(flatName, counter);
-      return counter;
-    }
-    if (counter.isCompatibleWith(oldCounter)) {
-      // Return the counter to reuse.
-      @SuppressWarnings("unchecked")
-      Counter<T> compatibleCounter = (Counter<T>) oldCounter;
-      return compatibleCounter;
-    }
-    throw new IllegalArgumentException(
-        "Counter " + counter + " duplicates incompatible counter "
-        + oldCounter + " in " + this);
-  }
-
-  /**
-   * Adds a counter. Returns {@code true} if the counter was added to the set
-   * and false if the given counter was {@code null} or it already existed in
-   * the set.
-   *
-   * @param counter to register
-   */
-  public boolean addCounter(Counter<?> counter) {
-    return add(counter);
-  }
-
-  /**
-   * Returns the Counter with the given name in this CounterSet;
-   * returns null if no such Counter exists.
-   */
-  public synchronized Counter<?> getExistingCounter(String name) {
-    return counters.get(name);
-  }
-
-  @Override
-  public synchronized Iterator<Counter<?>> iterator() {
-    return counters.values().iterator();
-  }
-
-  @Override
-  public synchronized int size() {
-    return counters.size();
-  }
-
-  @Override
-  public synchronized boolean add(Counter<?> e) {
-    if (null == e) {
-      return false;
-    }
-    if (counters.containsKey(e.getFlatName())) {
-      return false;
-    }
-    counters.put(e.getFlatName(), e);
-    return true;
-  }
-
-  public synchronized void merge(CounterSet that) {
-    for (Counter<?> theirCounter : that) {
-      Counter<?> myCounter = counters.get(theirCounter.getFlatName());
-      if (myCounter != null) {
-        mergeCounters(myCounter, theirCounter);
-      } else {
-        addCounter(theirCounter);
-      }
-    }
-  }
-
-  private <T> void mergeCounters(Counter<T> mine, Counter<?> theirCounter) {
-    checkArgument(
-        mine.isCompatibleWith(theirCounter),
-        "Can't merge CounterSets containing incompatible counters with the same name: "
-            + "%s (existing) and %s (merged)",
-        mine,
-        theirCounter);
-    @SuppressWarnings("unchecked")
-    Counter<T> theirs = (Counter<T>) theirCounter;
-    mine.merge(theirs);
-  }
-
-  /**
-   * A nested class that supports adding additional counters into the
-   * enclosing CounterSet. This is useful as a mutator, hiding other
-   * public methods of the CounterSet.
-   */
-  public class AddCounterMutator {
-    /**
-     * Adds the given Counter into the enclosing CounterSet.
-     *
-     * <p>If a counter with the same name already exists, it will be
-     * reused, as long as it has the same type.
-     *
-     * @return the Counter that was reused, or added
-     * @throws IllegalArgumentException if a counter with the same
-     * name but an incompatible kind had already been added
-     */
-    public <T> Counter<T> addCounter(Counter<T> counter) {
-      return addOrReuseCounter(counter);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d20a7ada/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/ElementByteSizeObserver.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/ElementByteSizeObserver.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/ElementByteSizeObserver.java
index 3e7011b..388355e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/ElementByteSizeObserver.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/ElementByteSizeObserver.java
@@ -19,37 +19,21 @@ package org.apache.beam.sdk.util.common;
 
 import java.util.Observable;
 import java.util.Observer;
-import javax.annotation.Nullable;
 
 /**
- * An observer that gets notified when additional bytes are read
- * and/or used. It adds all bytes into a local counter. When the
- * observer gets advanced via the next() call, it adds the total byte
- * count to the specified counter, and prepares for the next element.
+ * An observer that gets notified when additional bytes are read and/or used.
  */
-public class ElementByteSizeObserver implements Observer {
-  @Nullable
-  private final Counter<Long> counter;
+public abstract class ElementByteSizeObserver implements Observer {
   private boolean isLazy = false;
   private long totalSize = 0;
   private double scalingFactor = 1.0;
 
-  public ElementByteSizeObserver() {
-    this.counter = null;
-  }
-
-  public ElementByteSizeObserver(Counter<Long> counter) {
-    this.counter = counter;
-  }
+  public ElementByteSizeObserver() {}
 
   /**
    * Called to report element byte size.
    */
-  protected void reportElementSize(long elementByteSize) {
-    if (counter != null) {
-      counter.addValue(elementByteSize);
-    }
-  }
+  protected abstract void reportElementSize(long elementByteSize);
 
   /**
    * Sets byte counting for the current element as lazy. That is, the

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d20a7ada/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CounterAggregatorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CounterAggregatorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CounterAggregatorTest.java
deleted file mode 100644
index 3f96cf2..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CounterAggregatorTest.java
+++ /dev/null
@@ -1,256 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static org.apache.beam.sdk.util.common.Counter.AggregationKind.MAX;
-import static org.apache.beam.sdk.util.common.Counter.AggregationKind.MIN;
-import static org.apache.beam.sdk.util.common.Counter.AggregationKind.SUM;
-
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-import static org.mockito.Mockito.withSettings;
-
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.Combine.IterableCombineFn;
-import org.apache.beam.sdk.transforms.Max;
-import org.apache.beam.sdk.transforms.Min;
-import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.transforms.Sum;
-import org.apache.beam.sdk.util.common.Counter;
-import org.apache.beam.sdk.util.common.CounterProvider;
-import org.apache.beam.sdk.util.common.CounterSet;
-import org.apache.beam.sdk.util.common.CounterSet.AddCounterMutator;
-
-import com.google.common.collect.Iterables;
-
-import org.hamcrest.Matchers;
-import org.junit.Assert;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-import java.util.Arrays;
-import java.util.List;
-
-/**
- * Unit tests for the {@link Aggregator} API.
- */
-@RunWith(JUnit4.class)
-public class CounterAggregatorTest {
-  @Rule
-  public final ExpectedException expectedEx = ExpectedException.none();
-
-  private static final String AGGREGATOR_NAME = "aggregator_name";
-
-  @SuppressWarnings("rawtypes")
-  private <V, AccumT> void testAggregator(List<V> items,
-                                      Combine.CombineFn<V, AccumT, V> combiner,
-                                      Counter expectedCounter) {
-    CounterSet counters = new CounterSet();
-    Aggregator<V, V> aggregator = new CounterAggregator<>(
-        AGGREGATOR_NAME, combiner, counters.getAddCounterMutator());
-    for (V item : items) {
-      aggregator.addValue(item);
-    }
-
-    assertEquals(Iterables.getOnlyElement(counters), expectedCounter);
-  }
-
-  @Test
-  public void testGetName() {
-    String name = "testAgg";
-    CounterAggregator<Long, long[], Long> aggregator = new CounterAggregator<>(
-        name, new Sum.SumLongFn(),
-        new CounterSet().getAddCounterMutator());
-
-    assertEquals(name, aggregator.getName());
-  }
-
-  @Test
-  public void testGetCombineFn() {
-    CombineFn<Long, ?, Long> combineFn = new Min.MinLongFn();
-
-    CounterAggregator<Long, ?, Long> aggregator = new CounterAggregator<>("foo",
-        combineFn, new CounterSet().getAddCounterMutator());
-
-    assertEquals(combineFn, aggregator.getCombineFn());
-  }
-
-  @Test
-
-  public void testSumInteger() throws Exception {
-    testAggregator(Arrays.asList(2, 4, 1, 3), new Sum.SumIntegerFn(),
-                   Counter.ints(AGGREGATOR_NAME, SUM).resetToValue(10));
-  }
-
-  @Test
-  public void testSumLong() throws Exception {
-    testAggregator(Arrays.asList(2L, 4L, 1L, 3L), new Sum.SumLongFn(),
-                   Counter.longs(AGGREGATOR_NAME, SUM).resetToValue(10L));
-  }
-
-  @Test
-  public void testSumDouble() throws Exception {
-    testAggregator(Arrays.asList(2.0, 4.1, 1.0, 3.1), new Sum.SumDoubleFn(),
-                   Counter.doubles(AGGREGATOR_NAME, SUM).resetToValue(10.2));
-  }
-
-  @Test
-  public void testMinInteger() throws Exception {
-    testAggregator(Arrays.asList(2, 4, 1, 3), new Min.MinIntegerFn(),
-                   Counter.ints(AGGREGATOR_NAME, MIN).resetToValue(1));
-  }
-
-  @Test
-  public void testMinLong() throws Exception {
-    testAggregator(Arrays.asList(2L, 4L, 1L, 3L), new Min.MinLongFn(),
-                   Counter.longs(AGGREGATOR_NAME, MIN).resetToValue(1L));
-  }
-
-  @Test
-  public void testMinDouble() throws Exception {
-    testAggregator(Arrays.asList(2.0, 4.1, 1.0, 3.1), new Min.MinDoubleFn(),
-                   Counter.doubles(AGGREGATOR_NAME, MIN).resetToValue(1.0));
-  }
-
-  @Test
-  public void testMaxInteger() throws Exception {
-    testAggregator(Arrays.asList(2, 4, 1, 3), new Max.MaxIntegerFn(),
-                   Counter.ints(AGGREGATOR_NAME, MAX).resetToValue(4));
-  }
-
-  @Test
-  public void testMaxLong() throws Exception {
-    testAggregator(Arrays.asList(2L, 4L, 1L, 3L), new Max.MaxLongFn(),
-                   Counter.longs(AGGREGATOR_NAME, MAX).resetToValue(4L));
-  }
-
-  @Test
-  public void testMaxDouble() throws Exception {
-    testAggregator(Arrays.asList(2.0, 4.1, 1.0, 3.1), new Max.MaxDoubleFn(),
-                   Counter.doubles(AGGREGATOR_NAME, MAX).resetToValue(4.1));
-  }
-
-  @Test
-  public void testCounterProviderCallsProvidedCounterAddValue() {
-    @SuppressWarnings("unchecked")
-    CombineFn<String, ?, String> combiner = mock(CombineFn.class,
-        withSettings().extraInterfaces(CounterProvider.class));
-    @SuppressWarnings("unchecked")
-    CounterProvider<String> provider = (CounterProvider<String>) combiner;
-
-    @SuppressWarnings("unchecked")
-    Counter<String> mockCounter = mock(Counter.class);
-    String name = "foo";
-    when(provider.getCounter(name)).thenReturn(mockCounter);
-
-    AddCounterMutator addCounterMutator = mock(AddCounterMutator.class);
-    when(addCounterMutator.addCounter(mockCounter)).thenReturn(mockCounter);
-
-    Aggregator<String, String> aggregator =
-        new CounterAggregator<>(name, combiner, addCounterMutator);
-
-    aggregator.addValue("bar_baz");
-
-    verify(mockCounter).addValue("bar_baz");
-    verify(addCounterMutator).addCounter(mockCounter);
-  }
-
-
-  @Test
-  public void testCompatibleDuplicateNames() throws Exception {
-    CounterSet counters = new CounterSet();
-    Aggregator<Integer, Integer> aggregator1 = new CounterAggregator<>(
-        AGGREGATOR_NAME, new Sum.SumIntegerFn(),
-        counters.getAddCounterMutator());
-
-    Aggregator<Integer, Integer> aggregator2 = new CounterAggregator<>(
-        AGGREGATOR_NAME, new Sum.SumIntegerFn(),
-        counters.getAddCounterMutator());
-
-    // The duplicate aggregators should update the same counter.
-    aggregator1.addValue(3);
-    aggregator2.addValue(4);
-    Assert.assertEquals(
-        new CounterSet(Counter.ints(AGGREGATOR_NAME, SUM).resetToValue(7)),
-        counters);
-  }
-
-  @Test
-  public void testIncompatibleDuplicateNames() throws Exception {
-    CounterSet counters = new CounterSet();
-    new CounterAggregator<>(
-        AGGREGATOR_NAME, new Sum.SumIntegerFn(),
-        counters.getAddCounterMutator());
-
-    expectedEx.expect(IllegalArgumentException.class);
-    expectedEx.expectMessage(Matchers.containsString(
-        "aggregator's name collides with an existing aggregator or "
-        + "system-provided counter of an incompatible type"));
-    new CounterAggregator<>(
-        AGGREGATOR_NAME, new Sum.SumLongFn(),
-        counters.getAddCounterMutator());
-    }
-
-  @Test
-  public void testUnsupportedCombineFn() throws Exception {
-    expectedEx.expect(IllegalArgumentException.class);
-    expectedEx.expectMessage(Matchers.containsString("unsupported combiner"));
-    new CounterAggregator<>(
-        AGGREGATOR_NAME,
-        new Combine.CombineFn<Integer, List<Integer>, Integer>() {
-          @Override
-          public List<Integer> createAccumulator() {
-            return null;
-          }
-          @Override
-          public List<Integer> addInput(List<Integer> accumulator, Integer input) {
-            return null;
-          }
-          @Override
-          public List<Integer> mergeAccumulators(Iterable<List<Integer>> accumulators) {
-            return null;
-          }
-          @Override
-          public Integer extractOutput(List<Integer> accumulator) {
-            return null;
-          }
-        }, (new CounterSet()).getAddCounterMutator());
-  }
-
-  @Test
-  public void testUnsupportedSerializableFunction() throws Exception {
-    expectedEx.expect(IllegalArgumentException.class);
-    expectedEx.expectMessage(Matchers.containsString("unsupported combiner"));
-    CombineFn<Integer, List<Integer>, Integer> combiner = IterableCombineFn
-        .<Integer>of(new SerializableFunction<Iterable<Integer>, Integer>() {
-          @Override
-          public Integer apply(Iterable<Integer> input) {
-            return null;
-          }
-        });
-    new CounterAggregator<>(AGGREGATOR_NAME, combiner,
-        (new CounterSet()).getAddCounterMutator());
-  }
-}



Mime
View raw message