beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhalp...@apache.org
Subject [2/3] beam git commit: Full removal of Aggregators in Java SDK and Runners
Date Tue, 02 May 2017 23:38:37 GMT
Full removal of Aggregators in Java SDK and Runners


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/615761a7
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/615761a7
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/615761a7

Branch: refs/heads/master
Commit: 615761a77d2da6229dfa2cad5376d265afea8a62
Parents: 5bfd3e0
Author: Pablo <pabloem@google.com>
Authored: Tue May 2 14:49:39 2017 -0700
Committer: Dan Halperin <dhalperi@google.com>
Committed: Tue May 2 16:37:27 2017 -0700

----------------------------------------------------------------------
 .../cookbook/CombinePerKeyExamples.java         |   4 -
 .../beam/runners/core/AggregatorFactory.java    |  38 ----
 .../apache/beam/runners/core/DoFnRunners.java   |  27 ++-
 .../apache/beam/runners/core/LateDataUtils.java |   6 +-
 .../beam/runners/core/SimpleDoFnRunner.java     |   1 -
 .../beam/runners/core/SimpleOldDoFnRunner.java  |   3 -
 .../core/LateDataDroppingDoFnRunnerTest.java    |  27 ---
 .../beam/runners/core/SimpleDoFnRunnerTest.java |   9 -
 .../runners/core/SimpleOldDoFnRunnerTest.java   |   2 +-
 .../runners/core/StatefulDoFnRunnerTest.java    |   1 -
 .../runners/direct/AggregatorContainer.java     | 200 -------------------
 .../beam/runners/direct/EvaluationContext.java  |  23 +--
 .../GroupAlsoByWindowEvaluatorFactory.java      |  23 +--
 .../beam/runners/direct/ParDoEvaluator.java     |  14 +-
 ...littableProcessElementsEvaluatorFactory.java |   2 -
 .../direct/StatefulParDoEvaluatorFactory.java   |   1 -
 .../runners/direct/StepTransformResult.java     |   8 -
 .../beam/runners/direct/TransformResult.java    |   6 -
 .../beam/runners/direct/ParDoEvaluatorTest.java |   5 -
 .../beam/runners/spark/SparkPipelineResult.java |   5 -
 .../spark/aggregators/SparkAggregators.java     | 110 ----------
 .../SparkGroupAlsoByWindowViaWindowSet.java     |  57 ++----
 .../spark/translation/MultiDoFnFunction.java    |   2 -
 ...SparkGroupAlsoByWindowViaOutputBufferFn.java |  10 -
 .../spark/translation/SparkRuntimeContext.java  |  81 --------
 .../spark/aggregators/ClearAggregatorsRule.java |  38 ----
 .../metrics/sink/NamedAggregatorsTest.java      | 101 ----------
 .../beam/sdk/AggregatorPipelineExtractor.java   |  84 --------
 .../beam/sdk/AggregatorRetrievalException.java  |  33 ---
 .../org/apache/beam/sdk/AggregatorValues.java   |  51 -----
 .../main/java/org/apache/beam/sdk/Pipeline.java |  10 -
 .../beam/sdk/annotations/Experimental.java      |   3 -
 .../apache/beam/sdk/transforms/Aggregator.java  |  14 +-
 .../sdk/transforms/DelegatingAggregator.java    | 126 ------------
 .../org/apache/beam/sdk/transforms/Latest.java  |   2 -
 .../harness/control/ProcessBundleHandler.java   |   2 -
 .../fn/harness/fake/FakeAggregatorFactory.java  |  52 -----
 37 files changed, 52 insertions(+), 1129 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java
index 39553a5..693f0c4 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java
@@ -45,10 +45,6 @@ import org.apache.beam.sdk.values.PCollection;
  * list of play names in which that word appears, and saves this information
  * to a bigquery table.
  *
- * <p>Concepts: the Combine.perKey transform, which lets you combine the values in a
- * key-grouped Collection, and how to use an Aggregator to track information in the
- * Monitoring UI.
- *
  * <p>Note: Before running this example, you must create a BigQuery dataset to contain your output
  * table.
  *

http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/runners/core-java/src/main/java/org/apache/beam/runners/core/AggregatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/AggregatorFactory.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/AggregatorFactory.java
deleted file mode 100644
index 24a605f..0000000
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/AggregatorFactory.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core;
-
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.DoFn;
-
-/**
- * A factory for creating aggregators.
- */
-public interface AggregatorFactory {
-  /**
-   * Create an aggregator with the given {@code name} and {@link CombineFn}.
-   *
-   *  <p>This method is called to create an aggregator for a {@link DoFn}. It receives the
-   *  class of the {@link DoFn} being executed and the context of the step it is being
-   *  executed in.
-   */
-  <InputT, AccumT, OutputT> Aggregator<InputT, OutputT> createAggregatorForDoFn(
-      Class<?> fnClass, ExecutionContext.StepContext stepContext,
-      String aggregatorName, CombineFn<InputT, AccumT, OutputT> combine);
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
index 26e57f5..fe33af7 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
@@ -48,6 +48,27 @@ public class DoFnRunners {
     <T> void output(TupleTag<T> tag, WindowedValue<T> output);
   }
 
+  @Deprecated
+  public static <InputT, OutputT> DoFnRunner<InputT, OutputT> simpleRunner(
+      PipelineOptions options,
+      DoFn<InputT, OutputT> fn,
+      SideInputReader sideInputReader,
+      OutputManager outputManager,
+      TupleTag<OutputT> mainOutputTag,
+      List<TupleTag<?>> additionalOutputTags,
+      StepContext stepContext,
+      Object aggregatorFactory,
+      WindowingStrategy<?, ?> windowingStrategy) {
+    return simpleRunner(options,
+        fn,
+        sideInputReader,
+        outputManager,
+        mainOutputTag,
+        additionalOutputTags,
+        stepContext,
+        windowingStrategy);
+  }
+
   /**
    * Returns an implementation of {@link DoFnRunner} that for a {@link DoFn}.
    *
@@ -63,7 +84,6 @@ public class DoFnRunners {
       TupleTag<OutputT> mainOutputTag,
       List<TupleTag<?>> additionalOutputTags,
       StepContext stepContext,
-      AggregatorFactory aggregatorFactory,
       WindowingStrategy<?, ?> windowingStrategy) {
     return new SimpleDoFnRunner<>(
         options,
@@ -73,7 +93,6 @@ public class DoFnRunners {
         mainOutputTag,
         additionalOutputTags,
         stepContext,
-        aggregatorFactory,
         windowingStrategy);
   }
 
@@ -90,7 +109,6 @@ public class DoFnRunners {
       TupleTag<OutputT> mainOutputTag,
       List<TupleTag<?>> additionalOutputTags,
       StepContext stepContext,
-      AggregatorFactory aggregatorFactory,
       WindowingStrategy<?, ?> windowingStrategy) {
     return new SimpleOldDoFnRunner<>(
         options,
@@ -100,7 +118,6 @@ public class DoFnRunners {
         mainOutputTag,
         additionalOutputTags,
         stepContext,
-        aggregatorFactory,
         windowingStrategy);
   }
 
@@ -151,7 +168,6 @@ public class DoFnRunners {
       TupleTag<OutputT> mainOutputTag,
       List<TupleTag<?>> additionalOutputTags,
       StepContext stepContext,
-      AggregatorFactory aggregatorFactory,
       WindowingStrategy<?, ?> windowingStrategy) {
     return new ProcessFnRunner<>(
         simpleRunner(
@@ -162,7 +178,6 @@ public class DoFnRunners {
             mainOutputTag,
             additionalOutputTags,
             stepContext,
-            aggregatorFactory,
             windowingStrategy),
         views,
         sideInputReader);

http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataUtils.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataUtils.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataUtils.java
index 17bd360..982d693 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataUtils.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataUtils.java
@@ -22,7 +22,7 @@ import com.google.common.base.Predicate;
 import com.google.common.collect.FluentIterable;
 import com.google.common.collect.Iterables;
 import javax.annotation.Nullable;
-import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.metrics.CounterCell;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowTracing;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -42,7 +42,7 @@ public class LateDataUtils {
       Iterable<WindowedValue<V>> elements,
       final TimerInternals timerInternals,
       final WindowingStrategy<?, ?> windowingStrategy,
-      final Aggregator<Long, Long> droppedDueToLateness) {
+      final CounterCell droppedDueToLateness) {
     return FluentIterable.from(elements)
         .transformAndConcat(
             // Explode windows to filter out expired ones
@@ -71,7 +71,7 @@ public class LateDataUtils {
                         .isBefore(timerInternals.currentInputWatermarkTime());
                 if (expired) {
                   // The element is too late for this window.
-                  droppedDueToLateness.addValue(1L);
+                  droppedDueToLateness.inc();
                   WindowTracing.debug(
                       "GroupAlsoByWindow: Dropping element at {} for key: {}; "
                           + "window: {} since it is too far behind inputWatermark: {}",

http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index edce9a2..8a3e25f 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -106,7 +106,6 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
       TupleTag<OutputT> mainOutputTag,
       List<TupleTag<?>> additionalOutputTags,
       StepContext stepContext,
-      AggregatorFactory aggregatorFactory,
       WindowingStrategy<?, ?> windowingStrategy) {
     this.fn = fn;
     this.signature = DoFnSignatures.getSignature(fn.getClass());

http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
index b5f8f45..4c3149a 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
@@ -66,7 +66,6 @@ class SimpleOldDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT
       TupleTag<OutputT> mainOutputTag,
       List<TupleTag<?>> additionalOutputTags,
       StepContext stepContext,
-      AggregatorFactory aggregatorFactory,
       WindowingStrategy<?, ?> windowingStrategy) {
     this.fn = fn;
     this.context = new DoFnContext<>(
@@ -77,7 +76,6 @@ class SimpleOldDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT
         mainOutputTag,
         additionalOutputTags,
         stepContext,
-        aggregatorFactory,
         windowingStrategy == null ? null : windowingStrategy.getWindowFn());
   }
 
@@ -181,7 +179,6 @@ class SimpleOldDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT
                        TupleTag<OutputT> mainOutputTag,
                        List<TupleTag<?>> additionalOutputTags,
                        StepContext stepContext,
-                       AggregatorFactory aggregatorFactory,
                        WindowFn<?, ?> windowFn) {
       fn.super();
       this.options = options;

http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/runners/core-java/src/test/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunnerTest.java
index 74fb562..bf78427 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunnerTest.java
@@ -29,9 +29,6 @@ import org.apache.beam.runners.core.LateDataDroppingDoFnRunner.LateDataFilter;
 import org.apache.beam.sdk.metrics.MetricName;
 import org.apache.beam.sdk.metrics.MetricsContainer;
 import org.apache.beam.sdk.metrics.MetricsEnvironment;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -102,28 +99,4 @@ public class LateDataDroppingDoFnRunnerTest {
         Arrays.asList(WINDOW_FN.assignWindow(timestamp)),
         PaneInfo.NO_FIRING);
   }
-
-  private static class InMemoryLongSumAggregator implements Aggregator<Long, Long> {
-    private final String name;
-    private long sum = 0;
-
-    public InMemoryLongSumAggregator(String name) {
-      this.name = name;
-    }
-
-    @Override
-    public void addValue(Long value) {
-      sum += value;
-    }
-
-    @Override
-    public String getName() {
-      return name;
-    }
-
-    @Override
-    public CombineFn<Long, ?, Long> getCombineFn() {
-      return Sum.ofLongs();
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
index 4ae5332..3e404ad 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
@@ -86,7 +86,6 @@ public class SimpleDoFnRunnerTest {
             null,
             Collections.<TupleTag<?>>emptyList(),
             mockStepContext,
-            null,
             WindowingStrategy.of(new GlobalWindows()));
 
     thrown.expect(UserCodeException.class);
@@ -107,7 +106,6 @@ public class SimpleDoFnRunnerTest {
             null,
             Collections.<TupleTag<?>>emptyList(),
             mockStepContext,
-            null,
             WindowingStrategy.of(new GlobalWindows()));
 
     thrown.expect(UserCodeException.class);
@@ -138,7 +136,6 @@ public class SimpleDoFnRunnerTest {
             null,
             Collections.<TupleTag<?>>emptyList(),
             mockStepContext,
-            null,
             WindowingStrategy.of(new GlobalWindows()));
 
     // Setting the timer needs the current time, as it is set relative
@@ -167,7 +164,6 @@ public class SimpleDoFnRunnerTest {
             null,
             Collections.<TupleTag<?>>emptyList(),
             mockStepContext,
-            null,
             WindowingStrategy.of(new GlobalWindows()));
 
     thrown.expect(UserCodeException.class);
@@ -188,7 +184,6 @@ public class SimpleDoFnRunnerTest {
             null,
             Collections.<TupleTag<?>>emptyList(),
             mockStepContext,
-            null,
             WindowingStrategy.of(new GlobalWindows()));
 
     thrown.expect(UserCodeException.class);
@@ -215,7 +210,6 @@ public class SimpleDoFnRunnerTest {
             null,
             Collections.<TupleTag<?>>emptyList(),
             mockStepContext,
-            null,
             WindowingStrategy.of(windowFn));
 
     Instant currentTime = new Instant(42);
@@ -255,7 +249,6 @@ public class SimpleDoFnRunnerTest {
             new TupleTag<Duration>(),
             Collections.<TupleTag<?>>emptyList(),
             mockStepContext,
-            null,
             WindowingStrategy.of(new GlobalWindows()));
 
     runner.startBundle();
@@ -292,7 +285,6 @@ public class SimpleDoFnRunnerTest {
             new TupleTag<Duration>(),
             Collections.<TupleTag<?>>emptyList(),
             mockStepContext,
-            null,
             WindowingStrategy.of(new GlobalWindows()));
 
     runner.startBundle();
@@ -330,7 +322,6 @@ public class SimpleDoFnRunnerTest {
             new TupleTag<Duration>(),
             Collections.<TupleTag<?>>emptyList(),
             mockStepContext,
-            null,
             WindowingStrategy.of(new GlobalWindows()));
 
     runner.startBundle();

http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleOldDoFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleOldDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleOldDoFnRunnerTest.java
index 8ded2dc..a73ef5e 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleOldDoFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleOldDoFnRunnerTest.java
@@ -67,7 +67,7 @@ public class SimpleOldDoFnRunnerTest {
     List<TupleTag<?>> additionalOutputTags = Arrays.asList();
     StepContext context = mock(StepContext.class);
     return new SimpleOldDoFnRunner<>(
-        null, fn, null, null, null, additionalOutputTags, context, null, null);
+        null, fn, null, null, null, additionalOutputTags, context, null);
   }
 
   static class ThrowingDoFn extends OldDoFn<String, String> {

http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java
index f80643a..d4ff49e 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java
@@ -204,7 +204,6 @@ public class StatefulDoFnRunnerTest {
         null,
         Collections.<TupleTag<?>>emptyList(),
         mockStepContext,
-        null,
         WINDOWING_STRATEGY);
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AggregatorContainer.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AggregatorContainer.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AggregatorContainer.java
deleted file mode 100644
index fd17704..0000000
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AggregatorContainer.java
+++ /dev/null
@@ -1,200 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.direct;
-
-import static com.google.common.base.Preconditions.checkState;
-
-import com.google.auto.value.AutoValue;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import javax.annotation.Nullable;
-import javax.annotation.concurrent.GuardedBy;
-import org.apache.beam.runners.core.AggregatorFactory;
-import org.apache.beam.runners.core.ExecutionContext;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
-
-/**
- * AccumT container for the current values associated with {@link Aggregator Aggregators}.
- */
-public class AggregatorContainer {
-
-  private static class AggregatorInfo<InputT, AccumT, OutputT>
-      implements Aggregator<InputT, OutputT> {
-    private final String stepName;
-    private final String name;
-    private final CombineFn<InputT, AccumT, OutputT> combiner;
-    @GuardedBy("this")
-    private volatile AccumT accumulator = null;
-    private boolean committed = false;
-
-    private AggregatorInfo(
-        String stepName, String name, CombineFn<InputT, AccumT, OutputT> combiner) {
-      this.stepName = stepName;
-      this.name = name;
-      this.combiner = combiner;
-    }
-
-    @Override
-    public synchronized void addValue(InputT input) {
-      checkState(!committed, "Cannot addValue after committing");
-      if (accumulator == null) {
-        accumulator = combiner.createAccumulator();
-      }
-      accumulator = combiner.addInput(accumulator, input);
-    }
-
-    public synchronized OutputT getOutput() {
-      return accumulator == null ? null : combiner.extractOutput(accumulator);
-    }
-
-    private void merge(AggregatorInfo<?, ?, ?> other) {
-      // Aggregators are only merged if they are the same (same step, same name).
-      // As a result, they should also have the same CombineFn, so this is safe.
-      AggregatorInfo<InputT, AccumT, OutputT> otherSafe =
-          (AggregatorInfo<InputT, AccumT, OutputT>) other;
-      mergeSafe(otherSafe);
-    }
-
-    private synchronized void mergeSafe(AggregatorInfo<InputT, AccumT, OutputT> other) {
-      if (accumulator == null) {
-        accumulator = other.accumulator;
-      } else if (other.accumulator != null) {
-        accumulator = combiner.mergeAccumulators(Arrays.asList(accumulator, other.accumulator));
-      }
-    }
-
-    public String getStepName() {
-      return name;
-    }
-
-    @Override
-    public String getName() {
-      return name;
-    }
-
-    @Override
-    public CombineFn<InputT, ?, OutputT> getCombineFn() {
-      return combiner;
-    }
-  }
-
-  private final ConcurrentMap<AggregatorKey, AggregatorInfo<?, ?, ?>> accumulators =
-      new ConcurrentHashMap<>();
-
-  private AggregatorContainer() {
-  }
-
-  public static AggregatorContainer create() {
-    return new AggregatorContainer();
-  }
-
-  @Nullable
-  <OutputT> OutputT getAggregate(String stepName, String aggregatorName) {
-    AggregatorInfo<?, ?, OutputT> aggregatorInfo =
-        (AggregatorInfo<?, ?, OutputT>) accumulators.get(
-            AggregatorKey.create(stepName, aggregatorName));
-    return aggregatorInfo == null ? null : aggregatorInfo.getOutput();
-  }
-
-  public Mutator createMutator() {
-    return new Mutator(this);
-  }
-
-  /**
-   * AccumT class for mutations to the aggregator values.
-   */
-  public static class Mutator implements AggregatorFactory {
-
-    private final Map<AggregatorKey, AggregatorInfo<?, ?, ?>> accumulatorDeltas = new HashMap<>();
-    private final AggregatorContainer container;
-    private boolean committed = false;
-
-    private Mutator(AggregatorContainer container) {
-      this.container = container;
-    }
-
-    public void commit() {
-      checkState(!committed, "Should not be already committed");
-      committed = true;
-
-      for (Map.Entry<AggregatorKey, AggregatorInfo<?, ?, ?>> entry : accumulatorDeltas.entrySet()) {
-        AggregatorInfo<?, ?, ?> previous = container.accumulators.get(entry.getKey());
-        entry.getValue().committed = true;
-        if (previous == null) {
-          previous = container.accumulators.putIfAbsent(entry.getKey(), entry.getValue());
-        }
-        if (previous != null) {
-          previous.merge(entry.getValue());
-          previous.committed = true;
-        }
-      }
-    }
-
-    @Override
-    public <InputT, AccumT, OutputT> Aggregator<InputT, OutputT> createAggregatorForDoFn(
-        Class<?> fnClass,
-        ExecutionContext.StepContext step,
-        String name,
-        CombineFn<InputT, AccumT, OutputT> combine) {
-      return createAggregatorForStep(step, name, combine);
-    }
-
-    public <InputT, AccumT, OutputT> Aggregator<InputT, OutputT> createSystemAggregator(
-        ExecutionContext.StepContext step,
-        String name,
-        CombineFn<InputT, AccumT, OutputT> combiner) {
-      return createAggregatorForStep(step, name, combiner);
-    }
-
-    private <InputT, AccumT, OutputT> Aggregator<InputT, OutputT> createAggregatorForStep(
-        ExecutionContext.StepContext step,
-        String name,
-        CombineFn<InputT, AccumT, OutputT> combine) {
-      checkState(!committed, "Cannot create aggregators after committing");
-      AggregatorKey key = AggregatorKey.create(step.getStepName(), name);
-      AggregatorInfo<?, ?, ?> aggregatorInfo = accumulatorDeltas.get(key);
-      if (aggregatorInfo != null) {
-        AggregatorInfo<InputT, ?, OutputT> typedAggregatorInfo =
-            (AggregatorInfo<InputT, ?, OutputT>) aggregatorInfo;
-        return typedAggregatorInfo;
-      } else {
-        AggregatorInfo<InputT, ?, OutputT> typedAggregatorInfo =
-            new AggregatorInfo<>(step.getStepName(), name, combine);
-        accumulatorDeltas.put(key, typedAggregatorInfo);
-        return typedAggregatorInfo;
-      }
-    }
-  }
-
-  /**
-   * Aggregators are identified by a step name and an aggregator name.
-   */
-  @AutoValue
-  public abstract static class AggregatorKey {
-    public static AggregatorKey create(String stepName, String aggregatorName)  {
-      return new AutoValue_AggregatorContainer_AggregatorKey(stepName, aggregatorName);
-    }
-
-    public abstract String getStepName();
-    public abstract String aggregatorName();
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
index f6d9a36..93d6f96 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
@@ -62,7 +62,7 @@ import org.joda.time.Instant;
  * <p>{@link EvaluationContext} contains shared state for an execution of the
  * {@link DirectRunner} that can be used while evaluating a {@link PTransform}. This
  * consists of views into underlying state and watermark implementations, access to read and write
- * {@link PCollectionView PCollectionViews}, and managing the {@link AggregatorContainer} and
+ * {@link PCollectionView PCollectionViews}, and managing the
  * {@link ExecutionContext ExecutionContexts}. This includes executing callbacks asynchronously when
  * state changes to the appropriate point (e.g. when a {@link PCollectionView} is requested and
  * known to be empty).
@@ -95,8 +95,6 @@ class EvaluationContext {
 
   private final SideInputContainer sideInputContainer;
 
-  private final AggregatorContainer mergedAggregators;
-
   private final DirectMetrics metrics;
 
   private final Set<PValue> keyedPValues;
@@ -126,7 +124,6 @@ class EvaluationContext {
     this.sideInputContainer = SideInputContainer.create(this, graph.getViews());
 
     this.applicationStateInternals = new ConcurrentHashMap<>();
-    this.mergedAggregators = AggregatorContainer.create();
     this.metrics = new DirectMetrics();
 
     this.callbackExecutor =
@@ -174,10 +171,6 @@ class EvaluationContext {
             : completedBundle.withElements((Iterable) result.getUnprocessedElements()),
         committedBundles,
         outputTypes);
-    // Commit aggregator changes
-    if (result.getAggregatorChanges() != null) {
-      result.getAggregatorChanges().commit();
-    }
     // Update state internals
     CopyOnAccessInMemoryStateInternals theirState = result.getState();
     if (theirState != null) {
@@ -362,20 +355,6 @@ class EvaluationContext {
     return sideInputContainer.createReaderForViews(sideInputs);
   }
 
-  /**
-   * Returns a new mutator for the {@link AggregatorContainer}.
-   */
-  public AggregatorContainer.Mutator getAggregatorMutator() {
-    return mergedAggregators.createMutator();
-  }
-
-  /**
-   * Returns the counter container for this context.
-   */
-  public AggregatorContainer getAggregatorContainer() {
-    return mergedAggregators;
-  }
-
   /** Returns the metrics container for this pipeline. */
   public DirectMetrics getMetrics() {
     return metrics;

http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
index 9f567a4..d006553 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
@@ -42,10 +42,10 @@ import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupAlsoByWindow;
 import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.WindowTracing;
@@ -113,11 +113,10 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory {
     private final StructuralKey<?> structuralKey;
     private final Collection<UncommittedBundle<?>> outputBundles;
     private final ImmutableList.Builder<WindowedValue<KeyedWorkItem<K, V>>> unprocessedElements;
-    private final AggregatorContainer.Mutator aggregatorChanges;
 
     private final SystemReduceFn<K, V, Iterable<V>, Iterable<V>, BoundedWindow> reduceFn;
-    private final Aggregator<Long, Long> droppedDueToClosedWindow;
-    private final Aggregator<Long, Long> droppedDueToLateness;
+    private final Counter droppedDueToClosedWindow;
+    private final Counter droppedDueToLateness;
 
     public GroupAlsoByWindowEvaluator(
         final EvaluationContext evaluationContext,
@@ -140,17 +139,14 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory {
 
       outputBundles = new ArrayList<>();
       unprocessedElements = ImmutableList.builder();
-      aggregatorChanges = evaluationContext.getAggregatorMutator();
 
       Coder<V> valueCoder =
           application.getTransform().getValueCoder(inputBundle.getPCollection().getCoder());
       reduceFn = SystemReduceFn.buffering(valueCoder);
-      droppedDueToClosedWindow = aggregatorChanges.createSystemAggregator(stepContext,
-          GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER,
-          Sum.ofLongs());
-      droppedDueToLateness = aggregatorChanges.createSystemAggregator(stepContext,
-          GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_LATENESS_COUNTER,
-          Sum.ofLongs());
+      droppedDueToClosedWindow = Metrics.counter(GroupAlsoByWindowEvaluator.class,
+          GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER);
+      droppedDueToLateness = Metrics.counter(GroupAlsoByWindowEvaluator.class,
+          GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_LATENESS_COUNTER);
     }
 
     @Override
@@ -197,7 +193,6 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory {
           .withState(state)
           .addOutput(outputBundles)
           .withTimerUpdate(stepContext.getTimerUpdate())
-          .withAggregatorChanges(aggregatorChanges)
           .addUnprocessedElements(unprocessedElements.build())
           .build();
     }
@@ -229,7 +224,7 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory {
                           .isBefore(timerInternals.currentInputWatermarkTime());
                   if (expired) {
                     // The element is too late for this window.
-                    droppedDueToLateness.addValue(1L);
+                    droppedDueToLateness.inc();
                     WindowTracing.debug(
                         "GroupAlsoByWindow: Dropping element at {} for key: {}; "
                             + "window: {} since it is too far behind inputWatermark: {}",

http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
index 053da31..2ea8a91 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
@@ -54,7 +54,6 @@ class ParDoEvaluator<InputT> implements TransformEvaluator<InputT> {
         TupleTag<OutputT> mainOutputTag,
         List<TupleTag<?>> additionalOutputTags,
         DirectStepContext stepContext,
-        AggregatorContainer.Mutator aggregatorChanges,
         WindowingStrategy<?, ? extends BoundedWindow> windowingStrategy);
   }
 
@@ -70,7 +69,6 @@ class ParDoEvaluator<InputT> implements TransformEvaluator<InputT> {
           TupleTag<OutputT> mainOutputTag,
           List<TupleTag<?>> additionalOutputTags,
           DirectStepContext stepContext,
-          AggregatorContainer.Mutator aggregatorChanges,
           WindowingStrategy<?, ? extends BoundedWindow> windowingStrategy) {
         DoFnRunner<InputT, OutputT> underlying =
             DoFnRunners.simpleRunner(
@@ -81,7 +79,6 @@ class ParDoEvaluator<InputT> implements TransformEvaluator<InputT> {
                 mainOutputTag,
                 additionalOutputTags,
                 stepContext,
-                aggregatorChanges,
                 windowingStrategy);
         return SimplePushbackSideInputDoFnRunner.create(underlying, sideInputs, sideInputReader);
       }
@@ -100,7 +97,6 @@ class ParDoEvaluator<InputT> implements TransformEvaluator<InputT> {
       List<TupleTag<?>> additionalOutputTags,
       Map<TupleTag<?>, PCollection<?>> outputs,
       DoFnRunnerFactory<InputT, OutputT> runnerFactory) {
-    AggregatorContainer.Mutator aggregatorChanges = evaluationContext.getAggregatorMutator();
 
     BundleOutputManager outputManager = createOutputManager(evaluationContext, key, outputs);
 
@@ -116,19 +112,17 @@ class ParDoEvaluator<InputT> implements TransformEvaluator<InputT> {
         mainOutputTag,
         additionalOutputTags,
         stepContext,
-        aggregatorChanges,
         windowingStrategy);
 
-    return create(runner, stepContext, application, aggregatorChanges, outputManager);
+    return create(runner, stepContext, application, outputManager);
   }
 
   public static <InputT, OutputT> ParDoEvaluator<InputT> create(
       PushbackSideInputDoFnRunner<InputT, OutputT> runner,
       DirectStepContext stepContext,
       AppliedPTransform<?, ?, ?> application,
-      AggregatorContainer.Mutator aggregatorChanges,
       BundleOutputManager outputManager) {
-    return new ParDoEvaluator<>(runner, application, aggregatorChanges, outputManager, stepContext);
+    return new ParDoEvaluator<>(runner, application, outputManager, stepContext);
   }
 
   static BundleOutputManager createOutputManager(
@@ -155,7 +149,6 @@ class ParDoEvaluator<InputT> implements TransformEvaluator<InputT> {
 
   private final PushbackSideInputDoFnRunner<InputT, ?> fnRunner;
   private final AppliedPTransform<?, ?, ?> transform;
-  private final AggregatorContainer.Mutator aggregatorChanges;
   private final BundleOutputManager outputManager;
   private final DirectStepContext stepContext;
 
@@ -164,14 +157,12 @@ class ParDoEvaluator<InputT> implements TransformEvaluator<InputT> {
   private ParDoEvaluator(
       PushbackSideInputDoFnRunner<InputT, ?> fnRunner,
       AppliedPTransform<?, ?, ?> transform,
-      AggregatorContainer.Mutator aggregatorChanges,
       BundleOutputManager outputManager,
       DirectStepContext stepContext) {
     this.fnRunner = fnRunner;
     this.transform = transform;
     this.outputManager = outputManager;
     this.stepContext = stepContext;
-    this.aggregatorChanges = aggregatorChanges;
     this.unprocessedElements = ImmutableList.builder();
 
     try {
@@ -222,7 +213,6 @@ class ParDoEvaluator<InputT> implements TransformEvaluator<InputT> {
     return resultBuilder
         .addOutput(outputManager.bundles.values())
         .withTimerUpdate(stepContext.getTimerUpdate())
-        .withAggregatorChanges(aggregatorChanges)
         .addUnprocessedElements(unprocessedElements.build())
         .build();
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
index e0adc40..5f6b4f7 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
@@ -197,7 +197,6 @@ class SplittableProcessElementsEvaluatorFactory<
           TupleTag<OutputT> mainOutputTag,
           List<TupleTag<?>> additionalOutputTags,
           DirectExecutionContext.DirectStepContext stepContext,
-          AggregatorContainer.Mutator aggregatorChanges,
           WindowingStrategy<?, ? extends BoundedWindow> windowingStrategy) {
         ProcessFn<InputT, OutputT, RestrictionT, ?> processFn =
             (ProcessFn) fn;
@@ -210,7 +209,6 @@ class SplittableProcessElementsEvaluatorFactory<
             mainOutputTag,
             additionalOutputTags,
             stepContext,
-            aggregatorChanges,
             windowingStrategy);
       }
     };

http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
index 93ab077..7cf3840 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
@@ -261,7 +261,6 @@ final class StatefulParDoEvaluatorFactory<K, InputT, OutputT> implements Transfo
                   delegateResult.getTransform(), delegateResult.getWatermarkHold())
               .withTimerUpdate(delegateResult.getTimerUpdate())
               .withState(delegateResult.getState())
-              .withAggregatorChanges(delegateResult.getAggregatorChanges())
               .withMetricUpdates(delegateResult.getLogicalMetricUpdates())
               .addOutput(Lists.newArrayList(delegateResult.getOutputBundles()));
 

http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java
index fe3ae97..2a2ccab 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java
@@ -54,7 +54,6 @@ public abstract class StepTransformResult<InputT> implements TransformResult<Inp
         getTransform(),
         getOutputBundles(),
         getUnprocessedElements(),
-        getAggregatorChanges(),
         metricUpdates,
         getWatermarkHold(),
         getState(),
@@ -72,7 +71,6 @@ public abstract class StepTransformResult<InputT> implements TransformResult<Inp
     private MetricUpdates metricUpdates;
     private CopyOnAccessInMemoryStateInternals state;
     private TimerUpdate timerUpdate;
-    private AggregatorContainer.Mutator aggregatorChanges;
     private final Set<OutputType> producedOutputs;
     private final Instant watermarkHold;
 
@@ -91,7 +89,6 @@ public abstract class StepTransformResult<InputT> implements TransformResult<Inp
           transform,
           bundlesBuilder.build(),
           unprocessedElementsBuilder.build(),
-          aggregatorChanges,
           metricUpdates,
           watermarkHold,
           state,
@@ -99,11 +96,6 @@ public abstract class StepTransformResult<InputT> implements TransformResult<Inp
           producedOutputs);
     }
 
-    public Builder<InputT> withAggregatorChanges(AggregatorContainer.Mutator aggregatorChanges) {
-      this.aggregatorChanges = aggregatorChanges;
-      return this;
-    }
-
     public Builder<InputT> withMetricUpdates(MetricUpdates metricUpdates) {
       this.metricUpdates = metricUpdates;
       return this;

http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java
index bde44ca..3a95df7 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java
@@ -61,12 +61,6 @@ public interface TransformResult<InputT> {
   Iterable<? extends WindowedValue<InputT>> getUnprocessedElements();
 
   /**
-   * Returns the {@link AggregatorContainer.Mutator} used by this {@link PTransform}, or null if
-   * this transform did not use an {@link AggregatorContainer.Mutator}.
-   */
-  @Nullable AggregatorContainer.Mutator getAggregatorChanges();
-
-  /**
    * Returns the logical metric updates.
    */
   MetricUpdates getLogicalMetricUpdates();

http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
index e99e4bf..69dbc22 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
@@ -150,11 +150,6 @@ public class ParDoEvaluatorTest {
                 Mockito.any(AppliedPTransform.class), Mockito.any(StructuralKey.class)))
         .thenReturn(executionContext);
 
-    AggregatorContainer container = AggregatorContainer.create();
-    AggregatorContainer.Mutator mutator = container.createMutator();
-    when(evaluationContext.getAggregatorContainer()).thenReturn(container);
-    when(evaluationContext.getAggregatorMutator()).thenReturn(mutator);
-
     @SuppressWarnings("unchecked")
     AppliedPTransform<PCollection<Integer>, ?, ?> transform =
         (AppliedPTransform<PCollection<Integer>, ?, ?>) DirectGraphs.getProducer(output);

http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java
index 1110a55..3e94a45 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java
@@ -24,7 +24,6 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-import org.apache.beam.runners.spark.aggregators.SparkAggregators;
 import org.apache.beam.runners.spark.metrics.SparkMetricResults;
 import org.apache.beam.runners.spark.translation.SparkContextFactory;
 import org.apache.beam.sdk.Pipeline;
@@ -77,10 +76,6 @@ public abstract class SparkPipelineResult implements PipelineResult {
   protected abstract State awaitTermination(Duration duration)
       throws TimeoutException, ExecutionException, InterruptedException;
 
-  public <T> T getAggregatorValue(final String name, final Class<T> resultType) {
-    return SparkAggregators.valueOf(name, resultType);
-  }
-
   @Override
   public PipelineResult.State getState() {
     return state;

http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/SparkAggregators.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/SparkAggregators.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/SparkAggregators.java
deleted file mode 100644
index 1da196b..0000000
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/SparkAggregators.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.spark.aggregators;
-
-import com.google.common.collect.ImmutableList;
-import java.util.Collection;
-import java.util.Map;
-import org.apache.beam.runners.core.AggregatorFactory;
-import org.apache.beam.runners.core.ExecutionContext;
-import org.apache.beam.runners.spark.translation.SparkRuntimeContext;
-import org.apache.beam.sdk.AggregatorValues;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Combine;
-import org.apache.spark.Accumulator;
-
-/**
- * A utility class for handling Beam {@link Aggregator}s.
- */
-public class SparkAggregators {
-
-  private static <T> AggregatorValues<T> valueOf(final Accumulator<NamedAggregators> accum,
-                                                 final Aggregator<?, T> aggregator) {
-    @SuppressWarnings("unchecked")
-    Class<T> valueType = (Class<T>) aggregator.getCombineFn().getOutputType().getRawType();
-    final T value = valueOf(accum, aggregator.getName(), valueType);
-
-    return new AggregatorValues<T>() {
-
-      @Override
-      public Collection<T> getValues() {
-        return ImmutableList.of(value);
-      }
-
-      @Override
-      public Map<String, T> getValuesAtSteps() {
-        throw new UnsupportedOperationException("getValuesAtSteps is not supported.");
-      }
-    };
-  }
-
-  private static <T> T valueOf(final Accumulator<NamedAggregators> accum,
-                               final String aggregatorName,
-                               final Class<T> typeClass) {
-    return accum.value().getValue(aggregatorName, typeClass);
-  }
-
-  /**
-   * Retrieves the value of an aggregator from a SparkContext instance.
-   *
-   * @param aggregator The aggregator whose value to retrieve
-   * @param <T> The type of the aggregator's output
-   * @return The value of the aggregator
-   */
-  public static <T> AggregatorValues<T> valueOf(final Aggregator<?, T> aggregator) {
-    return valueOf(AggregatorsAccumulator.getInstance(), aggregator);
-  }
-
-  /**
-   * Retrieves the value of an aggregator from a SparkContext instance.
-   *
-   * @param name Name of the aggregator to retrieve the value of.
-   * @param typeClass      Type class of value to be retrieved.
-   * @param <T>            Type of object to be returned.
-   * @return The value of the aggregator.
-   */
-  public static <T> T valueOf(final String name, final Class<T> typeClass) {
-    return valueOf(AggregatorsAccumulator.getInstance(), name, typeClass);
-  }
-
-  /**
-   * An implementation of {@link AggregatorFactory} for the SparkRunner.
-   */
-  public static class Factory implements AggregatorFactory {
-
-    private final SparkRuntimeContext runtimeContext;
-    private final Accumulator<NamedAggregators> accumulator;
-
-    public Factory(SparkRuntimeContext runtimeContext, Accumulator<NamedAggregators> accumulator) {
-      this.runtimeContext = runtimeContext;
-      this.accumulator = accumulator;
-    }
-
-    @Override
-    public <InputT, AccumT, OutputT> Aggregator<InputT, OutputT> createAggregatorForDoFn(
-        Class<?> fnClass,
-        ExecutionContext.StepContext stepContext,
-        String aggregatorName,
-        Combine.CombineFn<InputT, AccumT, OutputT> combine) {
-
-      return runtimeContext.createAggregator(accumulator, aggregatorName, combine);
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
index c59e0e7..4a2851d 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
@@ -43,9 +43,9 @@ import org.apache.beam.runners.spark.util.GlobalWatermarkHolder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.metrics.CounterCell;
+import org.apache.beam.sdk.metrics.MetricName;
+import org.apache.beam.sdk.metrics.MetricsContainer;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -207,10 +207,13 @@ public class SparkGroupAlsoByWindowViaWindowSet {
             new OutputWindowedValueHolder<>();
         // use in memory Aggregators since Spark Accumulators are not resilient
         // in stateful operators, once done with this partition.
-        final InMemoryLongSumAggregator droppedDueToClosedWindow = new InMemoryLongSumAggregator(
-            GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER);
-        final InMemoryLongSumAggregator droppedDueToLateness = new InMemoryLongSumAggregator(
-            GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_LATENESS_COUNTER);
+        final MetricsContainer cellProvider = new MetricsContainer("cellProvider");
+        final CounterCell droppedDueToClosedWindow = cellProvider.getCounter(
+            MetricName.named(SparkGroupAlsoByWindowViaWindowSet.class,
+            GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER));
+        final CounterCell droppedDueToLateness = cellProvider.getCounter(
+            MetricName.named(SparkGroupAlsoByWindowViaWindowSet.class,
+                GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_LATENESS_COUNTER));
 
         AbstractIterator<
             Tuple2</*K*/ ByteArray, Tuple2<StateAndTimers, /*WV<KV<K, Itr<I>>>*/ List<byte[]>>>>
@@ -315,15 +318,15 @@ public class SparkGroupAlsoByWindowViaWindowSet {
         };
 
         // log if there's something to log.
-        long lateDropped = droppedDueToLateness.getSum();
+        long lateDropped = droppedDueToLateness.getCumulative();
         if (lateDropped > 0) {
           LOG.info(String.format("Dropped %d elements due to lateness.", lateDropped));
-          droppedDueToLateness.zero();
+          droppedDueToLateness.inc(-droppedDueToLateness.getCumulative());
         }
-        long closedWindowDropped = droppedDueToClosedWindow.getSum();
+        long closedWindowDropped = droppedDueToClosedWindow.getCumulative();
         if (closedWindowDropped > 0) {
           LOG.info(String.format("Dropped %d elements due to closed window.", closedWindowDropped));
-          droppedDueToClosedWindow.zero();
+          droppedDueToClosedWindow.inc(-droppedDueToClosedWindow.getCumulative());
         }
 
         return scala.collection.JavaConversions.asScalaIterator(outIter);
@@ -421,36 +424,4 @@ public class SparkGroupAlsoByWindowViaWindowSet {
           "Tagged outputs are not allowed in GroupAlsoByWindow.");
     }
   }
-
-  private static class InMemoryLongSumAggregator implements Aggregator<Long, Long> {
-    private final String name;
-    private long sum = 0;
-
-    public void zero() {
-      sum = 0;
-    }
-
-    public long getSum() {
-      return sum;
-    }
-
-    InMemoryLongSumAggregator(String name) {
-      this.name = name;
-    }
-
-    @Override
-    public void addValue(Long value) {
-      sum += value;
-    }
-
-    @Override
-    public String getName() {
-      return name;
-    }
-
-    @Override
-    public Combine.CombineFn<Long, ?, Long> getCombineFn() {
-      return Sum.ofLongs();
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
index 4cd1683..410b7de 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
@@ -28,7 +28,6 @@ import java.util.Map;
 import org.apache.beam.runners.core.DoFnRunner;
 import org.apache.beam.runners.core.DoFnRunners;
 import org.apache.beam.runners.spark.aggregators.NamedAggregators;
-import org.apache.beam.runners.spark.aggregators.SparkAggregators;
 import org.apache.beam.runners.spark.metrics.SparkMetricsContainer;
 import org.apache.beam.runners.spark.util.SideInputBroadcast;
 import org.apache.beam.runners.spark.util.SparkSideInputReader;
@@ -104,7 +103,6 @@ public class MultiDoFnFunction<InputT, OutputT>
             mainOutputTag,
             Collections.<TupleTag<?>>emptyList(),
             new SparkProcessContext.NoOpStepContext(),
-            new SparkAggregators.Factory(runtimeContext, aggAccum),
             windowingStrategy);
 
     DoFnRunnerWithMetrics<InputT, OutputT> doFnRunnerWithMetrics =

http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java
index 063feef..9ee52de 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java
@@ -22,7 +22,6 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import org.apache.beam.runners.core.GroupAlsoByWindowViaOutputBufferDoFn;
-import org.apache.beam.runners.core.GroupAlsoByWindowsDoFn;
 import org.apache.beam.runners.core.InMemoryTimerInternals;
 import org.apache.beam.runners.core.OutputWindowedValue;
 import org.apache.beam.runners.core.ReduceFnRunner;
@@ -35,8 +34,6 @@ import org.apache.beam.runners.core.construction.Triggers;
 import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
 import org.apache.beam.runners.core.triggers.TriggerStateMachines;
 import org.apache.beam.runners.spark.aggregators.NamedAggregators;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -59,8 +56,6 @@ public class SparkGroupAlsoByWindowViaOutputBufferFn<K, InputT, W extends Bounde
   private final StateInternalsFactory<K> stateInternalsFactory;
   private final SystemReduceFn<K, InputT, Iterable<InputT>, Iterable<InputT>, W> reduceFn;
   private final SparkRuntimeContext runtimeContext;
-  private final Aggregator<Long, Long> droppedDueToClosedWindow;
-
 
   public SparkGroupAlsoByWindowViaOutputBufferFn(
       WindowingStrategy<?, W> windowingStrategy,
@@ -72,11 +67,6 @@ public class SparkGroupAlsoByWindowViaOutputBufferFn<K, InputT, W extends Bounde
     this.stateInternalsFactory = stateInternalsFactory;
     this.reduceFn = reduceFn;
     this.runtimeContext = runtimeContext;
-
-    droppedDueToClosedWindow = runtimeContext.createAggregator(
-        accumulator,
-        GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER,
-        Sum.ofLongs());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
index 6abab17..6bba863 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
@@ -22,19 +22,11 @@ import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import java.io.IOException;
 import java.io.Serializable;
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.beam.runners.spark.aggregators.NamedAggregators;
 import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.CannotProvideCoderException;
-import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.io.FileSystems;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.util.IOChannelUtils;
-import org.apache.spark.Accumulator;
 
 /**
  * The SparkRuntimeContext allows us to define useful features on the client side before our
@@ -44,9 +36,6 @@ public class SparkRuntimeContext implements Serializable {
   private final String serializedPipelineOptions;
   private transient CoderRegistry coderRegistry;
 
-  // map for names to Beam aggregators.
-  private final Map<String, Aggregator<?, ?>> aggregators = new HashMap<>();
-
   SparkRuntimeContext(Pipeline pipeline) {
     this.serializedPipelineOptions = serializePipelineOptions(pipeline.getOptions());
   }
@@ -71,45 +60,6 @@ public class SparkRuntimeContext implements Serializable {
     return PipelineOptionsHolder.getOrInit(serializedPipelineOptions);
   }
 
-  /**
-   * Creates and aggregator and associates it with the specified name.
-   *
-   * @param accum     Spark Accumulator.
-   * @param named     Name of aggregator.
-   * @param combineFn Combine function used in aggregation.
-   * @param <InputT>  Type of inputs to aggregator.
-   * @param <InterT>  Intermediate data type
-   * @param <OutputT> Type of aggregator outputs.
-   * @return Specified aggregator
-   */
-  public synchronized <InputT, InterT, OutputT> Aggregator<InputT, OutputT> createAggregator(
-      Accumulator<NamedAggregators> accum,
-      String named,
-      Combine.CombineFn<? super InputT, InterT, OutputT> combineFn) {
-    @SuppressWarnings("unchecked")
-    Aggregator<InputT, OutputT> aggregator = (Aggregator<InputT, OutputT>) aggregators.get(named);
-    try {
-      if (aggregator == null) {
-        @SuppressWarnings("unchecked")
-        final
-        NamedAggregators.CombineFunctionState<InputT, InterT, OutputT> state =
-            new NamedAggregators.CombineFunctionState<>(
-                (Combine.CombineFn<InputT, InterT, OutputT>) combineFn,
-                // hidden assumption: InputT == OutputT
-                (Coder<InputT>) getCoderRegistry().getCoder(combineFn.getOutputType()),
-                this);
-
-        accum.add(new NamedAggregators(named, state));
-        aggregator = new SparkAggregator<>(named, state);
-        aggregators.put(named, aggregator);
-      }
-      return aggregator;
-    } catch (CannotProvideCoderException e) {
-      throw new RuntimeException(String.format("Unable to create an aggregator named: [%s]", named),
-                                 e);
-    }
-  }
-
   public CoderRegistry getCoderRegistry() {
     if (coderRegistry == null) {
       coderRegistry = CoderRegistry.createDefault();
@@ -135,35 +85,4 @@ public class SparkRuntimeContext implements Serializable {
       return pipelineOptions;
     }
   }
-
-  /**
-   * Initialize spark aggregators exactly once.
-   *
-   * @param <InputT> Type of element fed in to aggregator.
-   */
-  private static class SparkAggregator<InputT, OutputT>
-      implements Aggregator<InputT, OutputT>, Serializable {
-    private final String name;
-    private final NamedAggregators.State<InputT, ?, OutputT> state;
-
-    SparkAggregator(String name, NamedAggregators.State<InputT, ?, OutputT> state) {
-      this.name = name;
-      this.state = state;
-    }
-
-    @Override
-    public String getName() {
-      return name;
-    }
-
-    @Override
-    public void addValue(InputT elem) {
-      state.update(elem);
-    }
-
-    @Override
-    public Combine.CombineFn<InputT, ?, OutputT> getCombineFn() {
-      return state.getCombineFn();
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/ClearAggregatorsRule.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/ClearAggregatorsRule.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/ClearAggregatorsRule.java
deleted file mode 100644
index 0b31acc..0000000
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/ClearAggregatorsRule.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.spark.aggregators;
-
-import org.junit.rules.ExternalResource;
-
-
-/**
- * A rule that clears the {@link AggregatorsAccumulator}
- * which represents the Beam {@link org.apache.beam.sdk.transforms.Aggregator}s.
- */
-public class ClearAggregatorsRule extends ExternalResource {
-
-  @Override
-  protected void before() throws Throwable {
-    clearNamedAggregators();
-  }
-
-  public void clearNamedAggregators() {
-    AggregatorsAccumulator.clear();
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java
deleted file mode 100644
index dbd8cac..0000000
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.spark.aggregators.metrics.sink;
-
-import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.nullValue;
-import static org.junit.Assert.assertThat;
-
-import com.google.common.collect.ImmutableSet;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Set;
-import org.apache.beam.runners.spark.PipelineRule;
-import org.apache.beam.runners.spark.aggregators.ClearAggregatorsRule;
-import org.apache.beam.runners.spark.aggregators.SparkAggregators;
-import org.apache.beam.runners.spark.examples.WordCount;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.MapElements;
-import org.apache.beam.sdk.values.PCollection;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExternalResource;
-
-
-/**
- * A test for the NamedAggregators mechanism.
- */
-public class NamedAggregatorsTest {
-
-  @Rule
-  public ExternalResource inMemoryMetricsSink = new InMemoryMetricsSinkRule();
-
-  @Rule
-  public ClearAggregatorsRule clearAggregators = new ClearAggregatorsRule();
-
-  @Rule
-  public final PipelineRule pipelineRule = PipelineRule.batch();
-
-  private Pipeline createSparkPipeline() {
-    pipelineRule.getOptions().setEnableSparkMetricSinks(true);
-    return pipelineRule.createPipeline();
-  }
-
-  private void runPipeline() {
-
-    final List<String> words =
-        Arrays.asList("hi there", "hi", "hi sue bob", "hi sue", "", "bob hi");
-
-    final Set<String> expectedCounts =
-        ImmutableSet.of("hi: 5", "there: 1", "sue: 2", "bob: 2");
-
-    final Pipeline pipeline = createSparkPipeline();
-
-    final PCollection<String> output =
-        pipeline
-        .apply(Create.of(words).withCoder(StringUtf8Coder.of()))
-        .apply(new WordCount.CountWords())
-        .apply(MapElements.via(new WordCount.FormatAsTextFn()));
-
-    PAssert.that(output).containsInAnyOrder(expectedCounts);
-
-    pipeline.run();
-  }
-
-  @Test
-  public void testNamedAggregators() throws Exception {
-    assertThat(InMemoryMetrics.valueOf("emptyLines"), is(nullValue()));
-
-    runPipeline();
-
-    assertThat(InMemoryMetrics.<Double>valueOf("emptyLines"), is(1d));
-  }
-
-  @Test
-  public void testNonExistingAggregatorName() throws Exception {
-    runPipeline();
-
-    final Long valueOf = SparkAggregators.valueOf("myMissingAggregator", Long.class);
-
-    assertThat(valueOf, is(nullValue()));
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorPipelineExtractor.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorPipelineExtractor.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorPipelineExtractor.java
deleted file mode 100644
index eeb9b45..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorPipelineExtractor.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk;
-
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.SetMultimap;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Map;
-import org.apache.beam.sdk.Pipeline.PipelineVisitor;
-import org.apache.beam.sdk.runners.TransformHierarchy;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.values.PValue;
-
-/**
- * Retrieves {@link Aggregator Aggregators} at each {@link ParDo} and returns a {@link Map} of
- * {@link Aggregator} to the {@link PTransform PTransforms} in which it is present.
- */
-@Deprecated
-class AggregatorPipelineExtractor {
-  private final Pipeline pipeline;
-
-  /**
-   * Creates an {@code AggregatorPipelineExtractor} for the given {@link Pipeline}.
-   */
-  public AggregatorPipelineExtractor(Pipeline pipeline) {
-    this.pipeline = pipeline;
-  }
-
-  /**
-   * Returns a {@link Map} between each {@link Aggregator} in the {@link Pipeline} to the {@link
-   * PTransform PTransforms} in which it is used.
-   */
-  public Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> getAggregatorSteps() {
-    HashMultimap<Aggregator<?, ?>, PTransform<?, ?>> aggregatorSteps = HashMultimap.create();
-    pipeline.traverseTopologically(new AggregatorVisitor(aggregatorSteps));
-    return aggregatorSteps.asMap();
-  }
-
-  private static class AggregatorVisitor extends PipelineVisitor.Defaults {
-    private final SetMultimap<Aggregator<?, ?>, PTransform<?, ?>> aggregatorSteps;
-
-    public AggregatorVisitor(SetMultimap<Aggregator<?, ?>, PTransform<?, ?>> aggregatorSteps) {
-      this.aggregatorSteps = aggregatorSteps;
-    }
-
-    @Override
-    public void visitPrimitiveTransform(TransformHierarchy.Node node) {
-      PTransform<?, ?> transform = node.getTransform();
-      addStepToAggregators(transform, getAggregators(transform));
-    }
-
-    private Collection<Aggregator<?, ?>> getAggregators(PTransform<?, ?> transform) {
-      return Collections.emptyList();
-    }
-
-    private void addStepToAggregators(
-        PTransform<?, ?> transform, Collection<Aggregator<?, ?>> aggregators) {
-      for (Aggregator<?, ?> aggregator : aggregators) {
-        aggregatorSteps.put(aggregator, transform);
-      }
-    }
-
-    @Override
-    public void visitValue(PValue value, TransformHierarchy.Node producer) {}
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorRetrievalException.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorRetrievalException.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorRetrievalException.java
deleted file mode 100644
index 3040815..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorRetrievalException.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk;
-
-import org.apache.beam.sdk.transforms.Aggregator;
-
-/**
- * Signals that an exception has occurred while retrieving {@link Aggregator}s.
- */
-public class AggregatorRetrievalException extends Exception {
-  /**
-   * Constructs a new {@code AggregatorRetrievalException} with the specified detail message and
-   * cause.
-   */
-  public AggregatorRetrievalException(String message, Throwable cause) {
-    super(message, cause);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorValues.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorValues.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorValues.java
deleted file mode 100644
index 1fd034a..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorValues.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk;
-
-import java.util.Collection;
-import java.util.Map;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.DoFn;
-
-/**
- * A collection of values associated with an {@link Aggregator}. Aggregators declared in a
- * {@link DoFn} are emitted on a per-{@link DoFn}-application basis.
- *
- * @param <T> the output type of the aggregator
- */
-public abstract class AggregatorValues<T> {
-  /**
-   * Get the values of the {@link Aggregator} at all steps it was used.
-   */
-  public Collection<T> getValues() {
-    return getValuesAtSteps().values();
-  }
-
-  /**
-   * Get the values of the {@link Aggregator} by the user name at each step it was used.
-   */
-  public abstract Map<String, T> getValuesAtSteps();
-
-  /**
-   * Get the total value of this {@link Aggregator} by applying the specified {@link CombineFn}.
-   */
-  public T getTotalValue(CombineFn<T, ?, T> combineFn) {
-    return combineFn.apply(getValues());
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
index ab8906a..351e1b8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
@@ -24,7 +24,6 @@ import com.google.common.base.Joiner;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.SetMultimap;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -40,7 +39,6 @@ import org.apache.beam.sdk.runners.PTransformOverrideFactory.ReplacementOutput;
 import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.runners.TransformHierarchy;
 import org.apache.beam.sdk.runners.TransformHierarchy.Node;
-import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -571,14 +569,6 @@ public class Pipeline {
   }
 
   /**
-   * Returns a {@link Map} from each {@link Aggregator} in the {@link Pipeline} to the {@link
-   * PTransform PTransforms} in which it is used.
-   */
-  public Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> getAggregatorSteps() {
-    return new AggregatorPipelineExtractor(this).getAggregatorSteps();
-  }
-
-  /**
    * Builds a name from a "/"-delimited prefix and a name.
    */
   private String buildName(String namePrefix, String name) {

http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java
index f720599..7255a01 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java
@@ -63,9 +63,6 @@ public @interface Experimental {
     /** Trigger-related experimental APIs. */
     TRIGGER,
 
-    /** Aggregator-related experimental APIs. */
-    AGGREGATOR,
-
     /** Experimental APIs for Coder binary format identifiers. */
     CODER_ENCODING_ID,
 

http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java
index c957100..6c21b8c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java
@@ -26,21 +26,9 @@ import org.apache.beam.sdk.transforms.Combine.CombineFn;
  * @param <InputT> the type of input values
  * @param <OutputT> the type of output values
  */
+@Deprecated
 public interface Aggregator<InputT, OutputT> {
-
-  /**
-   * Adds a new value into the Aggregator.
-   */
   void addValue(InputT value);
-
-  /**
-   * Returns the name of the Aggregator.
-   */
   String getName();
-
-  /**
-   * Returns the {@link CombineFn}, which combines input elements in the
-   * aggregator.
-   */
   CombineFn<InputT, ?, OutputT> getCombineFn();
 }


Mime
View raw message