beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [5/6] beam git commit: Remove KeyedCombineFn
Date Mon, 01 May 2017 02:25:41 GMT
Remove KeyedCombineFn


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7e04924e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7e04924e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7e04924e

Branch: refs/heads/master
Commit: 7e04924ee7b31e28326f761618173749a55789d0
Parents: a198f8d
Author: Kenneth Knowles <klk@google.com>
Authored: Fri Apr 21 14:04:02 2017 -0700
Committer: Kenneth Knowles <klk@google.com>
Committed: Sun Apr 30 18:17:42 2017 -0700

----------------------------------------------------------------------
 .../translation/utils/ApexStateInternals.java   |  38 +-
 .../runners/core/GlobalCombineFnRunner.java     |  78 +++
 .../runners/core/GlobalCombineFnRunners.java    | 193 ++++++
 .../runners/core/InMemoryStateInternals.java    |  50 +-
 .../runners/core/PerKeyCombineFnRunner.java     |  79 ---
 .../runners/core/PerKeyCombineFnRunners.java    | 161 -----
 .../org/apache/beam/runners/core/StateTag.java  |  18 +-
 .../org/apache/beam/runners/core/StateTags.java |  43 +-
 .../beam/runners/core/SystemReduceFn.java       |  15 +-
 .../beam/runners/core/ReduceFnRunnerTest.java   |  36 +-
 .../beam/runners/core/ReduceFnTester.java       |  15 +-
 .../apache/beam/runners/core/StateTagTest.java  |  22 +-
 .../CopyOnAccessInMemoryStateInternals.java     |  66 +-
 .../CopyOnAccessInMemoryStateInternalsTest.java |  34 -
 .../flink/FlinkBatchTransformTranslators.java   |   9 +-
 .../functions/AbstractFlinkCombineRunner.java   |  44 +-
 .../FlinkMergingNonShuffleReduceFunction.java   |  10 +-
 .../functions/FlinkPartialReduceFunction.java   |   6 +-
 .../functions/FlinkReduceFunction.java          |  10 +-
 .../functions/SortingFlinkCombineRunner.java    |   1 -
 .../state/FlinkBroadcastStateInternals.java     | 173 ++---
 .../state/FlinkKeyGroupStateInternals.java      | 119 ++--
 .../state/FlinkSplitStateInternals.java         | 119 ++--
 .../streaming/state/FlinkStateInternals.java    | 173 ++---
 .../spark/stateful/SparkStateInternals.java     |  40 +-
 .../spark/translation/SparkKeyedCombineFn.java  |  26 +-
 .../spark/translation/TransformTranslator.java  |  44 +-
 .../streaming/StreamingTransformTranslator.java |   4 +-
 .../runners/spark/SparkRunnerDebuggerTest.java  |   7 +-
 .../src/main/resources/beam/findbugs-filter.xml |   2 +-
 .../sdk/transforms/ApproximateQuantiles.java    |   8 +-
 .../beam/sdk/transforms/ApproximateUnique.java  |   3 +-
 .../org/apache/beam/sdk/transforms/Combine.java | 672 +++++--------------
 .../beam/sdk/transforms/CombineFnBase.java      | 136 ----
 .../apache/beam/sdk/transforms/CombineFns.java  | 448 +------------
 .../beam/sdk/transforms/CombineWithContext.java | 174 +----
 .../org/apache/beam/sdk/transforms/Top.java     |   6 +-
 .../org/apache/beam/sdk/transforms/View.java    |   2 +-
 .../apache/beam/sdk/util/AppliedCombineFn.java  |  35 +-
 .../org/apache/beam/sdk/util/CombineFnUtil.java | 123 ++--
 .../apache/beam/sdk/util/state/StateBinder.java |  19 +-
 .../apache/beam/sdk/util/state/StateSpecs.java  | 177 ++---
 .../beam/sdk/transforms/CombineFnsTest.java     | 114 ++--
 .../apache/beam/sdk/transforms/CombineTest.java | 213 +++---
 .../apache/beam/sdk/transforms/ParDoTest.java   |   2 +-
 .../apache/beam/sdk/transforms/ViewTest.java    |   2 +-
 .../apache/beam/sdk/util/CombineFnUtilTest.java |  18 +-
 47 files changed, 1178 insertions(+), 2609 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java
index ec8f666..e682894 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java
@@ -42,8 +42,7 @@ import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.InstantCoder;
 import org.apache.beam.sdk.coders.ListCoder;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
-import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
+import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
 import org.apache.beam.sdk.util.CoderUtils;
@@ -145,7 +144,7 @@ public class ApexStateInternals<K> implements StateInternals<K> {
           address,
           accumCoder,
           key,
-          combineFn.<K>asKeyedFn()
+          combineFn
           );
     }
 
@@ -158,24 +157,11 @@ public class ApexStateInternals<K> implements StateInternals<K> {
 
     @Override
     public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
-        bindKeyedCombiningValue(
+        bindCombiningValueWithContext(
             StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
             Coder<AccumT> accumCoder,
-            KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
-      return new ApexCombiningState<>(
-          namespace,
-          address,
-          accumCoder,
-          key, combineFn);
-    }
-
-    @Override
-    public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
-        bindKeyedCombiningValueWithContext(
-            StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
-            Coder<AccumT> accumCoder,
-            KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> combineFn) {
-      return bindKeyedCombiningValue(address, accumCoder, CombineFnUtil.bindContext(combineFn, c));
+            CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
+      return bindCombiningValue(address, accumCoder, CombineFnUtil.bindContext(combineFn, c));
     }
   }
 
@@ -323,12 +309,12 @@ public class ApexStateInternals<K> implements StateInternals<K> {
       extends AbstractState<AccumT>
       implements CombiningState<InputT, AccumT, OutputT> {
     private final K key;
-    private final KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn;
+    private final CombineFn<InputT, AccumT, OutputT> combineFn;
 
     private ApexCombiningState(StateNamespace namespace,
         StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
         Coder<AccumT> coder,
-        K key, KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
+        K key, CombineFn<InputT, AccumT, OutputT> combineFn) {
       super(namespace, address, coder);
       this.key = key;
       this.combineFn = combineFn;
@@ -341,13 +327,13 @@ public class ApexStateInternals<K> implements StateInternals<K> {
 
     @Override
     public OutputT read() {
-      return combineFn.extractOutput(key, getAccum());
+      return combineFn.extractOutput(getAccum());
     }
 
     @Override
     public void add(InputT input) {
       AccumT accum = getAccum();
-      combineFn.addInput(key, accum, input);
+      combineFn.addInput(accum, input);
       writeValue(accum);
     }
 
@@ -355,7 +341,7 @@ public class ApexStateInternals<K> implements StateInternals<K> {
     public AccumT getAccum() {
       AccumT accum = readValue();
       if (accum == null) {
-        accum = combineFn.createAccumulator(key);
+        accum = combineFn.createAccumulator();
       }
       return accum;
     }
@@ -376,13 +362,13 @@ public class ApexStateInternals<K> implements StateInternals<K> {
 
     @Override
     public void addAccum(AccumT accum) {
-      accum = combineFn.mergeAccumulators(key, Arrays.asList(getAccum(), accum));
+      accum = combineFn.mergeAccumulators(Arrays.asList(getAccum(), accum));
       writeValue(accum);
     }
 
     @Override
     public AccumT mergeAccumulators(Iterable<AccumT> accumulators) {
-      return combineFn.mergeAccumulators(key, accumulators);
+      return combineFn.mergeAccumulators(accumulators);
     }
 
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/runners/core-java/src/main/java/org/apache/beam/runners/core/GlobalCombineFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GlobalCombineFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GlobalCombineFnRunner.java
new file mode 100644
index 0000000..5325ba6
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GlobalCombineFnRunner.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import java.io.Serializable;
+import java.util.Collection;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.CombineFnBase.GlobalCombineFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.SideInputReader;
+
+/**
+ * An interface that runs a {@link GlobalCombineFn} with unified APIs.
+ *
+ * <p>Different combine functions have their own implementations. For example, the implementation
+ * can skip allocating {@code Combine.Context}, if the combine function doesn't use it.
+ */
+public interface GlobalCombineFnRunner<InputT, AccumT, OutputT> extends Serializable {
+  /**
+   * Forwards the call to a {@link GlobalCombineFn} to create the accumulator.
+   *
+   * <p>It constructs a {@code CombineWithContext.Context} from
+   * {@link PipelineOptions} and {@link SideInputReader} if it is required.
+   */
+  AccumT createAccumulator(PipelineOptions options,
+      SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows);
+
+  /**
+   * Forwards the call to a {@link GlobalCombineFn} to add the input.
+   *
+   * <p>It constructs a {@code CombineWithContext.Context} from
+   * {@link PipelineOptions} and {@link SideInputReader} if it is required.
+   */
+  AccumT addInput(AccumT accumulator, InputT value, PipelineOptions options,
+      SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows);
+
+  /**
+   * Forwards the call to a {@link GlobalCombineFn} to merge accumulators.
+   *
+   * <p>It constructs a {@code CombineWithContext.Context} from
+   * {@link PipelineOptions} and {@link SideInputReader} if it is required.
+   */
+  AccumT mergeAccumulators(Iterable<AccumT> accumulators, PipelineOptions options,
+      SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows);
+
+  /**
+   * Forwards the call to a {@link GlobalCombineFn} to extract the output.
+   *
+   * <p>It constructs a {@code CombineWithContext.Context} from
+   * {@link PipelineOptions} and {@link SideInputReader} if it is required.
+   */
+  OutputT extractOutput(AccumT accumulator, PipelineOptions options,
+      SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows);
+
+  /**
+   * Forwards the call to a {@link GlobalCombineFn} to compact the accumulator.
+   *
+   * <p>It constructs a {@code CombineWithContext.Context} from
+   * {@link PipelineOptions} and {@link SideInputReader} if it is required.
+   */
+  AccumT compact(AccumT accumulator, PipelineOptions options,
+      SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows);
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/runners/core-java/src/main/java/org/apache/beam/runners/core/GlobalCombineFnRunners.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GlobalCombineFnRunners.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GlobalCombineFnRunners.java
new file mode 100644
index 0000000..d45b503
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GlobalCombineFnRunners.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import com.google.common.collect.Iterables;
+import java.util.Collection;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.CombineFnBase.GlobalCombineFn;
+import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.CombineContextFactory;
+import org.apache.beam.sdk.util.SideInputReader;
+
+/**
+ * Static utility methods that provide {@link GlobalCombineFnRunner} implementations for different
+ * combine functions.
+ */
+public class GlobalCombineFnRunners {
+  /** Returns a {@link GlobalCombineFnRunner} from a {@link GlobalCombineFn}. */
+  public static <InputT, AccumT, OutputT> GlobalCombineFnRunner<InputT, AccumT, OutputT> create(
+      GlobalCombineFn<InputT, AccumT, OutputT> globalCombineFn) {
+    if (globalCombineFn instanceof CombineFnWithContext) {
+      return new CombineFnWithContextRunner<>(
+          (CombineFnWithContext<InputT, AccumT, OutputT>) globalCombineFn);
+    } else if (globalCombineFn instanceof CombineFn) {
+      return new CombineFnRunner<>((CombineFn<InputT, AccumT, OutputT>) globalCombineFn);
+    } else {
+      throw new IllegalStateException(
+          String.format("Unknown type of CombineFn: %s", globalCombineFn.getClass()));
+    }
+  }
+
+  /**
+   * An implementation of {@link GlobalCombineFnRunner} with {@link CombineFn}.
+   *
+   * <p>It forwards functions calls to the {@link CombineFn}.
+   */
+  private static class CombineFnRunner<InputT, AccumT, OutputT>
+      implements org.apache.beam.runners.core.GlobalCombineFnRunner<InputT, AccumT, OutputT> {
+    private final CombineFn<InputT, AccumT, OutputT> combineFn;
+
+    private CombineFnRunner(CombineFn<InputT, AccumT, OutputT> combineFn) {
+      this.combineFn = combineFn;
+    }
+
+    @Override
+    public String toString() {
+      return combineFn.toString();
+    }
+
+    @Override
+    public AccumT createAccumulator(
+        PipelineOptions options,
+        SideInputReader sideInputReader,
+        Collection<? extends BoundedWindow> windows) {
+      return combineFn.createAccumulator();
+    }
+
+    @Override
+    public AccumT addInput(
+        AccumT accumulator,
+        InputT input,
+        PipelineOptions options,
+        SideInputReader sideInputReader,
+        Collection<? extends BoundedWindow> windows) {
+      return combineFn.addInput(accumulator, input);
+    }
+
+    @Override
+    public AccumT mergeAccumulators(
+        Iterable<AccumT> accumulators,
+        PipelineOptions options,
+        SideInputReader sideInputReader,
+        Collection<? extends BoundedWindow> windows) {
+      return combineFn.mergeAccumulators(accumulators);
+    }
+
+    @Override
+    public OutputT extractOutput(
+        AccumT accumulator,
+        PipelineOptions options,
+        SideInputReader sideInputReader,
+        Collection<? extends BoundedWindow> windows) {
+      return combineFn.extractOutput(accumulator);
+    }
+
+    @Override
+    public AccumT compact(
+        AccumT accumulator,
+        PipelineOptions options,
+        SideInputReader sideInputReader,
+        Collection<? extends BoundedWindow> windows) {
+      return combineFn.compact(accumulator);
+    }
+  }
+
+  /**
+   * An implementation of {@link org.apache.beam.runners.core.GlobalCombineFnRunner} with {@link
+   * CombineFnWithContext}.
+   *
+   * <p>It forwards functions calls to the {@link CombineFnWithContext}.
+   */
+  private static class CombineFnWithContextRunner<InputT, AccumT, OutputT>
+      implements org.apache.beam.runners.core.GlobalCombineFnRunner<InputT, AccumT, OutputT> {
+    private final CombineFnWithContext<InputT, AccumT, OutputT> combineFnWithContext;
+
+    private CombineFnWithContextRunner(
+        CombineFnWithContext<InputT, AccumT, OutputT> combineFnWithContext) {
+      this.combineFnWithContext = combineFnWithContext;
+    }
+
+    @Override
+    public String toString() {
+      return combineFnWithContext.toString();
+    }
+
+    @Override
+    public AccumT createAccumulator(
+        PipelineOptions options,
+        SideInputReader sideInputReader,
+        Collection<? extends BoundedWindow> windows) {
+      return combineFnWithContext.createAccumulator(
+          CombineContextFactory.createFromComponents(
+              options, sideInputReader, Iterables.getOnlyElement(windows)));
+    }
+
+    @Override
+    public AccumT addInput(
+        AccumT accumulator,
+        InputT input,
+        PipelineOptions options,
+        SideInputReader sideInputReader,
+        Collection<? extends BoundedWindow> windows) {
+      return combineFnWithContext.addInput(
+          accumulator,
+          input,
+          CombineContextFactory.createFromComponents(
+              options, sideInputReader, Iterables.getOnlyElement(windows)));
+    }
+
+    @Override
+    public AccumT mergeAccumulators(
+        Iterable<AccumT> accumulators,
+        PipelineOptions options,
+        SideInputReader sideInputReader,
+        Collection<? extends BoundedWindow> windows) {
+      return combineFnWithContext.mergeAccumulators(
+          accumulators,
+          CombineContextFactory.createFromComponents(
+              options, sideInputReader, Iterables.getOnlyElement(windows)));
+    }
+
+    @Override
+    public OutputT extractOutput(
+        AccumT accumulator,
+        PipelineOptions options,
+        SideInputReader sideInputReader,
+        Collection<? extends BoundedWindow> windows) {
+      return combineFnWithContext.extractOutput(
+          accumulator,
+          CombineContextFactory.createFromComponents(
+              options, sideInputReader, Iterables.getOnlyElement(windows)));
+    }
+
+    @Override
+    public AccumT compact(
+        AccumT accumulator,
+        PipelineOptions options,
+        SideInputReader sideInputReader,
+        Collection<? extends BoundedWindow> windows) {
+      return combineFnWithContext.compact(
+          accumulator,
+          CombineContextFactory.createFromComponents(
+              options, sideInputReader, Iterables.getOnlyElement(windows)));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java
index 9fb8e3f..2c02282 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java
@@ -31,8 +31,7 @@ import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
-import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
+import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
 import org.apache.beam.sdk.util.CombineFnUtil;
@@ -152,7 +151,7 @@ public class InMemoryStateInternals<K> implements StateInternals<K> {
             StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
             Coder<AccumT> accumCoder,
             final CombineFn<InputT, AccumT, OutputT> combineFn) {
-      return new InMemoryCombiningState<K, InputT, AccumT, OutputT>(key, combineFn.<K>asKeyedFn());
+      return new InMemoryCombiningState<>(combineFn);
     }
 
     @Override
@@ -164,20 +163,11 @@ public class InMemoryStateInternals<K> implements StateInternals<K> {
 
     @Override
     public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
-        bindKeyedCombiningValue(
+        bindCombiningValueWithContext(
             StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
             Coder<AccumT> accumCoder,
-            KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
-      return new InMemoryCombiningState<K, InputT, AccumT, OutputT>(key, combineFn);
-    }
-
-    @Override
-    public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
-        bindKeyedCombiningValueWithContext(
-            StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
-            Coder<AccumT> accumCoder,
-            KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> combineFn) {
-      return bindKeyedCombiningValue(address, accumCoder, CombineFnUtil.bindContext(combineFn, c));
+            CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
+      return bindCombiningValue(address, accumCoder, CombineFnUtil.bindContext(combineFn, c));
     }
   }
 
@@ -310,23 +300,21 @@ public class InMemoryStateInternals<K> implements StateInternals<K> {
   /**
    * An {@link InMemoryState} implementation of {@link CombiningState}.
    */
-  public static final class InMemoryCombiningState<K, InputT, AccumT, OutputT>
+  public static final class InMemoryCombiningState<InputT, AccumT, OutputT>
       implements CombiningState<InputT, AccumT, OutputT>,
-          InMemoryState<InMemoryCombiningState<K, InputT, AccumT, OutputT>> {
-    private final K key;
+          InMemoryState<InMemoryCombiningState<InputT, AccumT, OutputT>> {
     private boolean isCleared = true;
-    private final KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn;
+    private final CombineFn<InputT, AccumT, OutputT> combineFn;
     private AccumT accum;
 
     public InMemoryCombiningState(
-        K key, KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
-      this.key = key;
+        CombineFn<InputT, AccumT, OutputT> combineFn) {
       this.combineFn = combineFn;
-      accum = combineFn.createAccumulator(key);
+      accum = combineFn.createAccumulator();
     }
 
     @Override
-    public InMemoryCombiningState<K, InputT, AccumT, OutputT> readLater() {
+    public InMemoryCombiningState<InputT, AccumT, OutputT> readLater() {
       return this;
     }
 
@@ -334,19 +322,19 @@ public class InMemoryStateInternals<K> implements StateInternals<K> {
     public void clear() {
       // Even though we're clearing we can't remove this from the in-memory state map, since
       // other users may already have a handle on this CombiningValue.
-      accum = combineFn.createAccumulator(key);
+      accum = combineFn.createAccumulator();
       isCleared = true;
     }
 
     @Override
     public OutputT read() {
-      return combineFn.extractOutput(key, accum);
+      return combineFn.extractOutput(accum);
     }
 
     @Override
     public void add(InputT input) {
       isCleared = false;
-      accum = combineFn.addInput(key, accum, input);
+      accum = combineFn.addInput(accum, input);
     }
 
     @Override
@@ -371,12 +359,12 @@ public class InMemoryStateInternals<K> implements StateInternals<K> {
     @Override
     public void addAccum(AccumT accum) {
       isCleared = false;
-      this.accum = combineFn.mergeAccumulators(key, Arrays.asList(this.accum, accum));
+      this.accum = combineFn.mergeAccumulators(Arrays.asList(this.accum, accum));
     }
 
     @Override
     public AccumT mergeAccumulators(Iterable<AccumT> accumulators) {
-      return combineFn.mergeAccumulators(key, accumulators);
+      return combineFn.mergeAccumulators(accumulators);
     }
 
     @Override
@@ -385,9 +373,9 @@ public class InMemoryStateInternals<K> implements StateInternals<K> {
     }
 
     @Override
-    public InMemoryCombiningState<K, InputT, AccumT, OutputT> copy() {
-      InMemoryCombiningState<K, InputT, AccumT, OutputT> that =
-          new InMemoryCombiningState<>(key, combineFn);
+    public InMemoryCombiningState<InputT, AccumT, OutputT> copy() {
+      InMemoryCombiningState<InputT, AccumT, OutputT> that =
+          new InMemoryCombiningState<>(combineFn);
       if (!this.isCleared) {
         that.isCleared = this.isCleared;
         that.addAccum(accum);

http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/runners/core-java/src/main/java/org/apache/beam/runners/core/PerKeyCombineFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/PerKeyCombineFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/PerKeyCombineFnRunner.java
deleted file mode 100644
index a6608a7..0000000
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/PerKeyCombineFnRunner.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core;
-
-import java.io.Serializable;
-import java.util.Collection;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.CombineFnBase.PerKeyCombineFn;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.SideInputReader;
-
-/**
- * An interface that runs a {@link PerKeyCombineFn} with unified APIs.
- *
- * <p>Different keyed combine functions have their own implementations.
- * For example, the implementation can skip allocating {@code Combine.Context},
- * if the keyed combine function doesn't use it.
- */
-public interface PerKeyCombineFnRunner<K, InputT, AccumT, OutputT> extends Serializable {
-  /**
-   * Forwards the call to a {@link PerKeyCombineFn} to create the accumulator.
-   *
-   * <p>It constructs a {@code CombineWithContext.Context} from
-   * {@link PipelineOptions} and {@link SideInputReader} if it is required.
-   */
-  AccumT createAccumulator(K key, PipelineOptions options,
-      SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows);
-
-  /**
-   * Forwards the call to a {@link PerKeyCombineFn} to add the input.
-   *
-   * <p>It constructs a {@code CombineWithContext.Context} from
-   * {@link PipelineOptions} and {@link SideInputReader} if it is required.
-   */
-  AccumT addInput(K key, AccumT accumulator, InputT value, PipelineOptions options,
-      SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows);
-
-  /**
-   * Forwards the call to a {@link PerKeyCombineFn} to merge accumulators.
-   *
-   * <p>It constructs a {@code CombineWithContext.Context} from
-   * {@link PipelineOptions} and {@link SideInputReader} if it is required.
-   */
-  AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators, PipelineOptions options,
-      SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows);
-
-  /**
-   * Forwards the call to a {@link PerKeyCombineFn} to extract the output.
-   *
-   * <p>It constructs a {@code CombineWithContext.Context} from
-   * {@link PipelineOptions} and {@link SideInputReader} if it is required.
-   */
-  OutputT extractOutput(K key, AccumT accumulator, PipelineOptions options,
-      SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows);
-
-  /**
-   * Forwards the call to a {@link PerKeyCombineFn} to compact the accumulator.
-   *
-   * <p>It constructs a {@code CombineWithContext.Context} from
-   * {@link PipelineOptions} and {@link SideInputReader} if it is required.
-   */
-  AccumT compact(K key, AccumT accumulator, PipelineOptions options,
-      SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows);
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/runners/core-java/src/main/java/org/apache/beam/runners/core/PerKeyCombineFnRunners.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/PerKeyCombineFnRunners.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/PerKeyCombineFnRunners.java
deleted file mode 100644
index 7736758..0000000
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/PerKeyCombineFnRunners.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core;
-
-import com.google.common.collect.Iterables;
-import java.util.Collection;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
-import org.apache.beam.sdk.transforms.CombineFnBase.PerKeyCombineFn;
-import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.CombineContextFactory;
-import org.apache.beam.sdk.util.SideInputReader;
-
-/**
- * Static utility methods that provide {@link PerKeyCombineFnRunner} implementations
- * for different keyed combine functions.
- */
-public class PerKeyCombineFnRunners {
-  /**
-   * Returns a {@link PerKeyCombineFnRunner} from a {@link PerKeyCombineFn}.
-   */
-  public static <K, InputT, AccumT, OutputT> PerKeyCombineFnRunner<K, InputT, AccumT, OutputT>
-  create(PerKeyCombineFn<K, InputT, AccumT, OutputT> perKeyCombineFn) {
-    if (perKeyCombineFn instanceof KeyedCombineFnWithContext) {
-      return new KeyedCombineFnWithContextRunner<>(
-          (KeyedCombineFnWithContext<K, InputT, AccumT, OutputT>) perKeyCombineFn);
-    } else if (perKeyCombineFn instanceof KeyedCombineFn) {
-      return new KeyedCombineFnRunner<>(
-          (KeyedCombineFn<K, InputT, AccumT, OutputT>) perKeyCombineFn);
-    } else {
-      throw new IllegalStateException(
-          String.format("Unknown type of CombineFn: %s", perKeyCombineFn.getClass()));
-    }
-  }
-
-  /**
-   * An implementation of {@link PerKeyCombineFnRunner} with {@link KeyedCombineFn}.
-   *
-   * <p>It forwards functions calls to the {@link KeyedCombineFn}.
-   */
-  private static class KeyedCombineFnRunner<K, InputT, AccumT, OutputT>
-      implements PerKeyCombineFnRunner<K, InputT, AccumT, OutputT> {
-    private final KeyedCombineFn<K, InputT, AccumT, OutputT> keyedCombineFn;
-
-    private KeyedCombineFnRunner(
-        KeyedCombineFn<K, InputT, AccumT, OutputT> keyedCombineFn) {
-      this.keyedCombineFn = keyedCombineFn;
-    }
-
-    @Override
-    public String toString() {
-      return keyedCombineFn.toString();
-    }
-
-    @Override
-    public AccumT createAccumulator(K key, PipelineOptions options,
-        SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
-      return keyedCombineFn.createAccumulator(key);
-    }
-
-    @Override
-    public AccumT addInput(K key, AccumT accumulator, InputT input, PipelineOptions options,
-        SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
-      return keyedCombineFn.addInput(key, accumulator, input);
-    }
-
-    @Override
-    public AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators, PipelineOptions options,
-        SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
-      return keyedCombineFn.mergeAccumulators(key, accumulators);
-    }
-
-    @Override
-    public OutputT extractOutput(K key, AccumT accumulator, PipelineOptions options,
-        SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
-      return keyedCombineFn.extractOutput(key, accumulator);
-    }
-
-    @Override
-    public AccumT compact(K key, AccumT accumulator, PipelineOptions options,
-        SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
-      return keyedCombineFn.compact(key, accumulator);
-    }
-  }
-
-  /**
-   * An implementation of {@link PerKeyCombineFnRunner} with {@link KeyedCombineFnWithContext}.
-   *
-   * <p>It forwards functions calls to the {@link KeyedCombineFnWithContext}.
-   */
-  private static class KeyedCombineFnWithContextRunner<K, InputT, AccumT, OutputT>
-      implements PerKeyCombineFnRunner<K, InputT, AccumT, OutputT> {
-    private final KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> keyedCombineFnWithContext;
-
-    private KeyedCombineFnWithContextRunner(
-        KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> keyedCombineFnWithContext) {
-      this.keyedCombineFnWithContext = keyedCombineFnWithContext;
-    }
-
-    @Override
-    public String toString() {
-      return keyedCombineFnWithContext.toString();
-    }
-
-    @Override
-    public AccumT createAccumulator(K key, PipelineOptions options, SideInputReader sideInputReader,
-        Collection<? extends BoundedWindow> windows) {
-      return keyedCombineFnWithContext.createAccumulator(key,
-        CombineContextFactory.createFromComponents(
-          options, sideInputReader, Iterables.getOnlyElement(windows)));
-    }
-
-    @Override
-    public AccumT addInput(K key, AccumT accumulator, InputT input, PipelineOptions options,
-        SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
-      return keyedCombineFnWithContext.addInput(key, accumulator, input,
-        CombineContextFactory.createFromComponents(
-          options, sideInputReader, Iterables.getOnlyElement(windows)));
-    }
-
-    @Override
-    public AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators, PipelineOptions options,
-        SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
-      return keyedCombineFnWithContext.mergeAccumulators(key, accumulators,
-        CombineContextFactory.createFromComponents(
-          options, sideInputReader, Iterables.getOnlyElement(windows)));
-    }
-
-    @Override
-    public OutputT extractOutput(K key, AccumT accumulator, PipelineOptions options,
-        SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
-      return keyedCombineFnWithContext.extractOutput(key, accumulator,
-        CombineContextFactory.createFromComponents(
-          options, sideInputReader, Iterables.getOnlyElement(windows)));
-    }
-
-    @Override
-    public AccumT compact(K key, AccumT accumulator, PipelineOptions options,
-        SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
-      return keyedCombineFnWithContext.compact(key, accumulator,
-        CombineContextFactory.createFromComponents(
-          options, sideInputReader, Iterables.getOnlyElement(windows)));
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTag.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTag.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTag.java
index a5d262a..aaeecf0 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTag.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTag.java
@@ -23,8 +23,7 @@ import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
-import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
+import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
@@ -100,17 +99,10 @@ public interface StateTag<K, StateT extends State> extends Serializable {
         CombineFn<InputT, AccumT, OutputT> combineFn);
 
     <InputT, AccumT, OutputT>
-    CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue(
-        StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> spec,
-        Coder<AccumT> accumCoder,
-        KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn);
-
-    <InputT, AccumT, OutputT>
-    CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext(
-        StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> spec,
-        Coder<AccumT> accumCoder,
-        KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT>
-            combineFn);
+        CombiningState<InputT, AccumT, OutputT> bindCombiningValueWithContext(
+            StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> spec,
+            Coder<AccumT> accumCoder,
+            CombineFnWithContext<InputT, AccumT, OutputT> combineFn);
 
     /**
      * Bind to a watermark {@link StateSpec}.

http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java
index 2b3f4b8..fe99f27 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java
@@ -26,8 +26,7 @@ import org.apache.beam.sdk.annotations.Experimental.Kind;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
-import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
+import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
 import org.apache.beam.sdk.util.state.BagState;
@@ -90,22 +89,12 @@ public class StateTags {
 
       @Override
       public <InputT, AccumT, OutputT>
-      CombiningState<InputT, AccumT, OutputT> bindKeyedCombining(
+      CombiningState<InputT, AccumT, OutputT> bindCombiningWithContext(
               String id,
               StateSpec<? super K, CombiningState<InputT, AccumT, OutputT>> spec,
               Coder<AccumT> accumCoder,
-              KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
-        return binder.bindKeyedCombiningValue(tagForSpec(id, spec), accumCoder, combineFn);
-      }
-
-      @Override
-      public <InputT, AccumT, OutputT>
-      CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningWithContext(
-              String id,
-              StateSpec<? super K, CombiningState<InputT, AccumT, OutputT>> spec,
-              Coder<AccumT> accumCoder,
-              KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> combineFn) {
-        return binder.bindKeyedCombiningValueWithContext(
+              CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
+        return binder.bindCombiningValueWithContext(
             tagForSpec(id, spec), accumCoder, combineFn);
       }
 
@@ -162,29 +151,17 @@ public class StateTags {
   }
 
   /**
-   * Create a state tag for values that use a {@link KeyedCombineFn} to automatically merge
-   * multiple {@code InputT}s into a single {@code OutputT}.
-   */
-  public static <K, InputT, AccumT,
-      OutputT> StateTag<K, CombiningState<InputT, AccumT, OutputT>>
-      keyedCombiningValue(String id, Coder<AccumT> accumCoder,
-          KeyedCombineFn<K, InputT, AccumT, OutputT> combineFn) {
-    return new SimpleStateTag<>(
-        new StructuredId(id), StateSpecs.keyedCombining(accumCoder, combineFn));
-  }
-
-  /**
-   * Create a state tag for values that use a {@link KeyedCombineFnWithContext} to automatically
+   * Create a state tag for values that use a {@link CombineFnWithContext} to automatically
    * merge multiple {@code InputT}s into a single {@code OutputT}.
    */
-  public static <K, InputT, AccumT, OutputT>
-      StateTag<K, CombiningState<InputT, AccumT, OutputT>>
-      keyedCombiningValueWithContext(
+  public static <InputT, AccumT, OutputT>
+      StateTag<Object, CombiningState<InputT, AccumT, OutputT>>
+      combiningValueWithContext(
           String id,
           Coder<AccumT> accumCoder,
-          KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn) {
+          CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
     return new SimpleStateTag<>(
-        new StructuredId(id), StateSpecs.keyedCombiningWithContext(accumCoder, combineFn));
+        new StructuredId(id), StateSpecs.combining(accumCoder, combineFn));
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java
index f618d88..86a7fd7 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java
@@ -20,8 +20,7 @@ package org.apache.beam.runners.core;
 
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
-import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
+import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.AppliedCombineFn;
@@ -71,18 +70,18 @@ public abstract class SystemReduceFn<K, InputT, AccumT, OutputT, W extends Bound
       AccumT, OutputT, W>
       combining(
           final Coder<K> keyCoder, final AppliedCombineFn<K, InputT, AccumT, OutputT> combineFn) {
-    final StateTag<K, CombiningState<InputT, AccumT, OutputT>> bufferTag;
-    if (combineFn.getFn() instanceof KeyedCombineFnWithContext) {
+    final StateTag<Object, CombiningState<InputT, AccumT, OutputT>> bufferTag;
+    if (combineFn.getFn() instanceof CombineFnWithContext) {
       bufferTag = StateTags.makeSystemTagInternal(
-          StateTags.<K, InputT, AccumT, OutputT>keyedCombiningValueWithContext(
+          StateTags.<InputT, AccumT, OutputT>combiningValueWithContext(
               BUFFER_NAME, combineFn.getAccumulatorCoder(),
-              (KeyedCombineFnWithContext<K, InputT, AccumT, OutputT>) combineFn.getFn()));
+              (CombineFnWithContext<InputT, AccumT, OutputT>) combineFn.getFn()));
 
     } else {
       bufferTag = StateTags.makeSystemTagInternal(
-            StateTags.<K, InputT, AccumT, OutputT>keyedCombiningValue(
+            StateTags.<InputT, AccumT, OutputT>combiningValue(
                 BUFFER_NAME, combineFn.getAccumulatorCoder(),
-                (KeyedCombineFn<K, InputT, AccumT, OutputT>) combineFn.getFn()));
+                (CombineFn<InputT, AccumT, OutputT>) combineFn.getFn()));
     }
     return new SystemReduceFn<K, InputT, AccumT, OutputT, W>(bufferTag) {
       @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
index 44bc538..ec2e7a3 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
@@ -218,7 +218,7 @@ public class ReduceFnRunnerTest {
         ReduceFnTester.combining(
             strategy,
             mockTriggerStateMachine,
-            Sum.ofIntegers().<String>asKeyedFn(),
+            Sum.ofIntegers(),
             VarIntCoder.of());
 
     injectElement(tester, 2);
@@ -291,7 +291,7 @@ public class ReduceFnRunnerTest {
 
     ReduceFnTester<Integer, Integer, IntervalWindow> tester =
         ReduceFnTester
-            .combining(strategy, Sum.ofIntegers().<String>asKeyedFn(), VarIntCoder.of());
+            .combining(strategy, Sum.ofIntegers(), VarIntCoder.of());
 
     tester.injectElements(TimestampedValue.of(13, elementTimestamp));
 
@@ -323,7 +323,7 @@ public class ReduceFnRunnerTest {
         ReduceFnTester.combining(
             strategy,
             mockTriggerStateMachine,
-            Sum.ofIntegers().<String>asKeyedFn(),
+            Sum.ofIntegers(),
             VarIntCoder.of());
 
     injectElement(tester, 1);
@@ -387,9 +387,14 @@ public class ReduceFnRunnerTest {
             });
 
     SumAndVerifyContextFn combineFn = new SumAndVerifyContextFn(mockView, expectedValue);
-    ReduceFnTester<Integer, Integer, IntervalWindow> tester = ReduceFnTester.combining(
-        mainInputWindowingStrategy, mockTriggerStateMachine, combineFn.<String>asKeyedFn(),
-        VarIntCoder.of(), options, mockSideInputReader);
+    ReduceFnTester<Integer, Integer, IntervalWindow> tester =
+        ReduceFnTester.combining(
+            mainInputWindowingStrategy,
+            mockTriggerStateMachine,
+            combineFn,
+            VarIntCoder.of(),
+            options,
+            mockSideInputReader);
 
     when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true);
     for (int i = 0; i < 8; ++i) {
@@ -1062,12 +1067,13 @@ public class ReduceFnRunnerTest {
    */
   @Test
   public void testDropDataMultipleWindowsFinishedTrigger() throws Exception {
-    ReduceFnTester<Integer, Integer, IntervalWindow> tester = ReduceFnTester.combining(
-        WindowingStrategy.of(
-            SlidingWindows.of(Duration.millis(100)).every(Duration.millis(30)))
-        .withTrigger(AfterWatermark.pastEndOfWindow())
-        .withAllowedLateness(Duration.millis(1000)),
-        Sum.ofIntegers().<String>asKeyedFn(), VarIntCoder.of());
+    ReduceFnTester<Integer, Integer, IntervalWindow> tester =
+        ReduceFnTester.combining(
+            WindowingStrategy.of(SlidingWindows.of(Duration.millis(100)).every(Duration.millis(30)))
+                .withTrigger(AfterWatermark.pastEndOfWindow())
+                .withAllowedLateness(Duration.millis(1000)),
+            Sum.ofIntegers(),
+            VarIntCoder.of());
 
     tester.injectElements(
         // assigned to [-60, 40), [-30, 70), [0, 100)
@@ -1209,8 +1215,7 @@ public class ReduceFnRunnerTest {
             .withAllowedLateness(Duration.millis(100));
 
     ReduceFnTester<Integer, Integer, IntervalWindow> tester =
-        ReduceFnTester
-            .combining(strategy, Sum.ofIntegers().<String>asKeyedFn(), VarIntCoder.of());
+        ReduceFnTester.combining(strategy, Sum.ofIntegers(), VarIntCoder.of());
 
     tester.advanceInputWatermark(new Instant(0));
     tester.advanceProcessingTime(new Instant(0));
@@ -1265,8 +1270,7 @@ public class ReduceFnRunnerTest {
             .withAllowedLateness(Duration.millis(100));
 
     ReduceFnTester<Integer, Integer, IntervalWindow> tester =
-        ReduceFnTester
-            .combining(strategy, Sum.ofIntegers().<String>asKeyedFn(), VarIntCoder.of());
+        ReduceFnTester.combining(strategy, Sum.ofIntegers(), VarIntCoder.of());
 
     tester.advanceInputWatermark(new Instant(0));
     tester.advanceProcessingTime(new Instant(0));

http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
index b5b5492..dfb769f 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
@@ -53,8 +53,7 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
-import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
+import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext;
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
@@ -170,13 +169,13 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
 
   /**
    * Creates a {@link ReduceFnTester} for the given {@link WindowingStrategy} and
-   * {@link KeyedCombineFn}, creating a {@link TriggerStateMachine} from the
+   * {@link CombineFn}, creating a {@link TriggerStateMachine} from the
    * {@link Trigger} in the {@link WindowingStrategy}.
    */
   public static <W extends BoundedWindow, AccumT, OutputT>
       ReduceFnTester<Integer, OutputT, W> combining(
           WindowingStrategy<?, W> strategy,
-          KeyedCombineFn<String, Integer, AccumT, OutputT> combineFn,
+          CombineFn<Integer, AccumT, OutputT> combineFn,
           Coder<OutputT> outputCoder)
           throws Exception {
 
@@ -194,7 +193,7 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
 
   /**
    * Creates a {@link ReduceFnTester} for the given {@link WindowingStrategy},
-   * {@link KeyedCombineFn}, and {@link TriggerStateMachine}, for mocking the interaction
+   * {@link CombineFn}, and {@link TriggerStateMachine}, for mocking the interaction
    * between {@link ReduceFnRunner} and the {@link TriggerStateMachine}.
    * Ignores the {@link Trigger} in the {@link WindowingStrategy}.
    */
@@ -202,7 +201,7 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
   ReduceFnTester<Integer, OutputT, W> combining(
       WindowingStrategy<?, W> strategy,
       TriggerStateMachine triggerStateMachine,
-      KeyedCombineFn<String, Integer, AccumT, OutputT> combineFn,
+      CombineFn<Integer, AccumT, OutputT> combineFn,
       Coder<OutputT> outputCoder)
       throws Exception {
 
@@ -223,7 +222,7 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
   public static <W extends BoundedWindow, AccumT, OutputT>
       ReduceFnTester<Integer, OutputT, W> combining(
           WindowingStrategy<?, W> strategy,
-          KeyedCombineFnWithContext<String, Integer, AccumT, OutputT> combineFn,
+          CombineFnWithContext<Integer, AccumT, OutputT> combineFn,
           Coder<OutputT> outputCoder,
           PipelineOptions options,
           SideInputReader sideInputReader)
@@ -246,7 +245,7 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
   ReduceFnTester<Integer, OutputT, W> combining(
       WindowingStrategy<?, W> strategy,
       TriggerStateMachine triggerStateMachine,
-      KeyedCombineFnWithContext<String, Integer, AccumT, OutputT> combineFn,
+      CombineFnWithContext<Integer, AccumT, OutputT> combineFn,
       Coder<OutputT> outputCoder,
       PipelineOptions options,
       SideInputReader sideInputReader)

http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/runners/core-java/src/test/java/org/apache/beam/runners/core/StateTagTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/StateTagTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/StateTagTest.java
index 10dcb62..9a8b75c 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/StateTagTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/StateTagTest.java
@@ -162,17 +162,17 @@ public class StateTagTest {
     Coder<int[]> accum1 = maxFn.getAccumulatorCoder(registry, VarIntCoder.of());
     Coder<int[]> accum2 = minFn.getAccumulatorCoder(registry, BigEndianIntegerCoder.of());
 
-    StateTag<?, ?> fooCoder1Max1 = StateTags.keyedCombiningValueWithContext(
-            "foo", accum1, CombineFnUtil.toFnWithContext(maxFn).<String>asKeyedFn());
-    StateTag<?, ?> fooCoder1Max2 = StateTags.keyedCombiningValueWithContext(
-        "foo", accum1, CombineFnUtil.toFnWithContext(maxFn).asKeyedFn());
-    StateTag<?, ?> fooCoder1Min = StateTags.keyedCombiningValueWithContext(
-        "foo", accum1, CombineFnUtil.toFnWithContext(minFn).asKeyedFn());
-
-    StateTag<?, ?> fooCoder2Max = StateTags.keyedCombiningValueWithContext(
-        "foo", accum2, CombineFnUtil.toFnWithContext(maxFn).asKeyedFn());
-    StateTag<?, ?> barCoder1Max = StateTags.keyedCombiningValueWithContext(
-        "bar", accum1, CombineFnUtil.toFnWithContext(maxFn).asKeyedFn());
+    StateTag<?, ?> fooCoder1Max1 = StateTags.combiningValueWithContext(
+            "foo", accum1, CombineFnUtil.toFnWithContext(maxFn));
+    StateTag<?, ?> fooCoder1Max2 = StateTags.combiningValueWithContext(
+        "foo", accum1, CombineFnUtil.toFnWithContext(maxFn));
+    StateTag<?, ?> fooCoder1Min = StateTags.combiningValueWithContext(
+        "foo", accum1, CombineFnUtil.toFnWithContext(minFn));
+
+    StateTag<?, ?> fooCoder2Max = StateTags.combiningValueWithContext(
+        "foo", accum2, CombineFnUtil.toFnWithContext(maxFn));
+    StateTag<?, ?> barCoder1Max = StateTags.combiningValueWithContext(
+        "bar", accum1, CombineFnUtil.toFnWithContext(maxFn));
 
     // Same name, coder and combineFn
     assertEquals(fooCoder1Max1, fooCoder1Max2);

http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java
index 068b37f..92d87b5 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java
@@ -40,8 +40,7 @@ import org.apache.beam.runners.core.StateTag;
 import org.apache.beam.runners.core.StateTag.StateBinder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
-import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
+import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
 import org.apache.beam.sdk.util.CombineFnUtil;
@@ -283,11 +282,10 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
               @SuppressWarnings("unchecked")
               InMemoryState<? extends WatermarkHoldState> existingState =
                   (InMemoryState<? extends WatermarkHoldState>)
-                  underlying.get().get(namespace, address, c);
+                      underlying.get().get(namespace, address, c);
               return existingState.copy();
             } else {
-              return new InMemoryWatermarkHold<>(
-                  timestampCombiner);
+              return new InMemoryWatermarkHold<>(timestampCombiner);
             }
           }
 
@@ -298,7 +296,7 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
               @SuppressWarnings("unchecked")
               InMemoryState<? extends ValueState<T>> existingState =
                   (InMemoryState<? extends ValueState<T>>)
-                  underlying.get().get(namespace, address, c);
+                      underlying.get().get(namespace, address, c);
               return existingState.copy();
             } else {
               return new InMemoryValue<>();
@@ -306,10 +304,11 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
           }
 
           @Override
-          public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
-              bindCombiningValue(
+          public <InputT, AccumT, OutputT>
+              CombiningState<InputT, AccumT, OutputT> bindCombiningValue(
                   StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
-                  Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn) {
+                  Coder<AccumT> accumCoder,
+                  CombineFn<InputT, AccumT, OutputT> combineFn) {
             if (containedInUnderlying(namespace, address)) {
               @SuppressWarnings("unchecked")
               InMemoryState<? extends CombiningState<InputT, AccumT, OutputT>> existingState =
@@ -317,8 +316,7 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
                       underlying.get().get(namespace, address, c);
               return existingState.copy();
             } else {
-              return new InMemoryCombiningState<>(
-                  key, combineFn.asKeyedFn());
+              return new InMemoryCombiningState<>(combineFn);
             }
           }
 
@@ -329,7 +327,7 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
               @SuppressWarnings("unchecked")
               InMemoryState<? extends BagState<T>> existingState =
                   (InMemoryState<? extends BagState<T>>)
-                  underlying.get().get(namespace, address, c);
+                      underlying.get().get(namespace, address, c);
               return existingState.copy();
             } else {
               return new InMemoryBag<>();
@@ -353,7 +351,8 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
           @Override
           public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(
               StateTag<? super K, MapState<KeyT, ValueT>> address,
-              Coder<KeyT> mapKeyCoder, Coder<ValueT> mapValueCoder) {
+              Coder<KeyT> mapKeyCoder,
+              Coder<ValueT> mapValueCoder) {
             if (containedInUnderlying(namespace, address)) {
               @SuppressWarnings("unchecked")
               InMemoryState<? extends MapState<KeyT, ValueT>> existingState =
@@ -366,30 +365,12 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
           }
 
           @Override
-          public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
-              bindKeyedCombiningValue(
+          public <InputT, AccumT, OutputT>
+              CombiningState<InputT, AccumT, OutputT> bindCombiningValueWithContext(
                   StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
                   Coder<AccumT> accumCoder,
-                  KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
-            if (containedInUnderlying(namespace, address)) {
-              @SuppressWarnings("unchecked")
-              InMemoryState<? extends CombiningState<InputT, AccumT, OutputT>> existingState =
-                  (InMemoryState<? extends CombiningState<InputT, AccumT, OutputT>>)
-                      underlying.get().get(namespace, address, c);
-              return existingState.copy();
-            } else {
-              return new InMemoryCombiningState<>(key, combineFn);
-            }
-          }
-
-          @Override
-          public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
-          bindKeyedCombiningValueWithContext(
-                  StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
-                  Coder<AccumT> accumCoder,
-                  KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> combineFn) {
-            return bindKeyedCombiningValue(
-                address, accumCoder, CombineFnUtil.bindContext(combineFn, c));
+                  CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
+            return bindCombiningValue(address, accumCoder, CombineFnUtil.bindContext(combineFn, c));
           }
         };
       }
@@ -475,20 +456,11 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K>
 
           @Override
           public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
-              bindKeyedCombiningValue(
-                  StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
-                  Coder<AccumT> accumCoder,
-                  KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
-            return underlying.get(namespace, address, c);
-          }
-
-          @Override
-          public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
-          bindKeyedCombiningValueWithContext(
+          bindCombiningValueWithContext(
                   StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
                   Coder<AccumT> accumCoder,
-                  KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> combineFn) {
-            return bindKeyedCombiningValue(
+                  CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
+            return bindCombiningValue(
                 address, accumCoder, CombineFnUtil.bindContext(combineFn, c));
           }
         };

http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
index f0aeece..4d04745 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
@@ -40,7 +40,6 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
@@ -251,39 +250,6 @@ public class CopyOnAccessInMemoryStateInternalsTest {
   }
 
   @Test
-  public void testKeyedAccumulatorCombiningStateWithUnderlying() throws Exception {
-    CopyOnAccessInMemoryStateInternals<String> underlying =
-        CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
-    KeyedCombineFn<String, Long, long[], Long> sumLongFn = Sum.ofLongs().asKeyedFn();
-
-    StateNamespace namespace = new StateNamespaceForTest("foo");
-    CoderRegistry reg = pipeline.getCoderRegistry();
-    StateTag<String, CombiningState<Long, long[], Long>> stateTag =
-        StateTags.keyedCombiningValue(
-            "summer",
-            sumLongFn.getAccumulatorCoder(
-                reg, StringUtf8Coder.of(), reg.getDefaultCoder(Long.class)),
-            sumLongFn);
-    GroupingState<Long, Long> underlyingValue = underlying.state(namespace, stateTag);
-    assertThat(underlyingValue.read(), equalTo(0L));
-
-    underlyingValue.add(1L);
-    assertThat(underlyingValue.read(), equalTo(1L));
-
-    CopyOnAccessInMemoryStateInternals<String> internals =
-        CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying);
-    GroupingState<Long, Long> copyOnAccessState = internals.state(namespace, stateTag);
-    assertThat(copyOnAccessState.read(), equalTo(1L));
-
-    copyOnAccessState.add(4L);
-    assertThat(copyOnAccessState.read(), equalTo(5L));
-    assertThat(underlyingValue.read(), equalTo(1L));
-
-    GroupingState<Long, Long> reReadUnderlyingValue = underlying.state(namespace, stateTag);
-    assertThat(underlyingValue.read(), equalTo(reReadUnderlyingValue.read()));
-  }
-
-  @Test
   public void testWatermarkHoldStateWithUnderlying() {
     CopyOnAccessInMemoryStateInternals<String> underlying =
         CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);

http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
index 99de5be..6a7689a 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
@@ -188,8 +188,7 @@ class FlinkBatchTransformTranslators {
       DataSet<WindowedValue<KV<K, InputT>>> inputDataSet =
           context.getInputDataSet(context.getInput(transform));
 
-      Combine.KeyedCombineFn<K, InputT, List<InputT>, List<InputT>> combineFn =
-          new Concatenate<InputT>().asKeyedFn();
+      Combine.CombineFn<InputT, List<InputT>, List<InputT>> combineFn = new Concatenate<>();
 
       KvCoder<K, InputT> inputCoder =
           (KvCoder<K, InputT>) context.getInput(transform).getCoder();
@@ -200,7 +199,6 @@ class FlinkBatchTransformTranslators {
         accumulatorCoder =
             combineFn.getAccumulatorCoder(
                 context.getInput(transform).getPipeline().getCoderRegistry(),
-                inputCoder.getKeyCoder(),
                 inputCoder.getValueCoder());
       } catch (CannotProvideCoderException e) {
         throw new RuntimeException(e);
@@ -337,8 +335,8 @@ class FlinkBatchTransformTranslators {
       DataSet<WindowedValue<KV<K, InputT>>> inputDataSet =
           context.getInputDataSet(context.getInput(transform));
 
-      CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, OutputT> combineFn =
-          (CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, OutputT>) transform.getFn();
+      CombineFnBase.GlobalCombineFn<InputT, AccumT, OutputT> combineFn =
+          (CombineFnBase.GlobalCombineFn<InputT, AccumT, OutputT>) transform.getFn();
 
       KvCoder<K, InputT> inputCoder =
           (KvCoder<K, InputT>) context.getInput(transform).getCoder();
@@ -349,7 +347,6 @@ class FlinkBatchTransformTranslators {
         accumulatorCoder =
             combineFn.getAccumulatorCoder(
                 context.getInput(transform).getPipeline().getCoderRegistry(),
-                inputCoder.getKeyCoder(),
                 inputCoder.getValueCoder());
       } catch (CannotProvideCoderException e) {
         throw new RuntimeException(e);

http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/AbstractFlinkCombineRunner.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/AbstractFlinkCombineRunner.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/AbstractFlinkCombineRunner.java
index 83ff70d..6e27057 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/AbstractFlinkCombineRunner.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/AbstractFlinkCombineRunner.java
@@ -19,8 +19,8 @@ package org.apache.beam.runners.flink.translation.functions;
 
 import com.google.common.collect.ImmutableList;
 import java.util.Collection;
-import org.apache.beam.runners.core.PerKeyCombineFnRunner;
-import org.apache.beam.runners.core.PerKeyCombineFnRunners;
+import org.apache.beam.runners.core.GlobalCombineFnRunner;
+import org.apache.beam.runners.core.GlobalCombineFnRunners;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.CombineFnBase;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -53,7 +53,7 @@ public abstract class AbstractFlinkCombineRunner<
       Collector<WindowedValue<KV<K, OutputT>>> out) throws Exception;
 
   /**
-   * Adapter interface that allows using a {@link CombineFnBase.PerKeyCombineFn} to either produce
+   * Adapter interface that allows using a {@link CombineFnBase.GlobalCombineFn} to either produce
    * the {@code AccumT} as output or to combine several accumulators into an {@code OutputT}.
    * The former would be used for a partial combine while the latter is used for the final merging
    * of accumulators.
@@ -72,17 +72,17 @@ public abstract class AbstractFlinkCombineRunner<
   }
 
   /**
-   * A straight wrapper of {@link CombineFnBase.PerKeyCombineFn} that takes in {@code InputT}
+   * A straight wrapper of {@link CombineFnBase.GlobalCombineFn} that takes in {@code InputT}
    * and emits {@code OutputT}.
    */
   public static class CompleteFlinkCombiner<K, InputT, AccumT, OutputT> implements
       FlinkCombiner<K, InputT, AccumT, OutputT> {
 
-    private final PerKeyCombineFnRunner<K, InputT, AccumT, OutputT> combineFnRunner;
+    private final GlobalCombineFnRunner<InputT, AccumT, OutputT> combineFnRunner;
 
     public CompleteFlinkCombiner(
-        CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, OutputT> combineFn) {
-      combineFnRunner = PerKeyCombineFnRunners.create(combineFn);
+        CombineFnBase.GlobalCombineFn<InputT, AccumT, OutputT> combineFn) {
+      combineFnRunner = GlobalCombineFnRunners.create(combineFn);
     }
 
     @Override
@@ -90,22 +90,22 @@ public abstract class AbstractFlinkCombineRunner<
         K key, InputT value, PipelineOptions options, SideInputReader sideInputReader,
         Collection<? extends BoundedWindow> windows) {
       AccumT accumulator =
-          combineFnRunner.createAccumulator(key, options, sideInputReader, windows);
-      return combineFnRunner.addInput(key, accumulator, value, options, sideInputReader, windows);
+          combineFnRunner.createAccumulator(options, sideInputReader, windows);
+      return combineFnRunner.addInput(accumulator, value, options, sideInputReader, windows);
     }
 
     @Override
     public AccumT addInput(
         K key, AccumT accumulator, InputT value, PipelineOptions options,
         SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
-      return combineFnRunner.addInput(key, accumulator, value, options, sideInputReader, windows);
+      return combineFnRunner.addInput(accumulator, value, options, sideInputReader, windows);
     }
 
     @Override
     public OutputT extractOutput(
         K key, AccumT accumulator, PipelineOptions options, SideInputReader sideInputReader,
         Collection<? extends BoundedWindow> windows) {
-      return combineFnRunner.extractOutput(key, accumulator, options, sideInputReader, windows);
+      return combineFnRunner.extractOutput(accumulator, options, sideInputReader, windows);
     }
   }
 
@@ -115,10 +115,10 @@ public abstract class AbstractFlinkCombineRunner<
   public static class PartialFlinkCombiner<K, InputT, AccumT> implements
       FlinkCombiner<K, InputT, AccumT, AccumT> {
 
-    private final PerKeyCombineFnRunner<K, InputT, AccumT, ?> combineFnRunner;
+    private final GlobalCombineFnRunner<InputT, AccumT, ?> combineFnRunner;
 
-    public PartialFlinkCombiner(CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, ?> combineFn) {
-      combineFnRunner = PerKeyCombineFnRunners.create(combineFn);
+    public PartialFlinkCombiner(CombineFnBase.GlobalCombineFn<InputT, AccumT, ?> combineFn) {
+      combineFnRunner = GlobalCombineFnRunners.create(combineFn);
     }
 
     @Override
@@ -126,15 +126,15 @@ public abstract class AbstractFlinkCombineRunner<
         K key, InputT value, PipelineOptions options, SideInputReader sideInputReader,
         Collection<? extends BoundedWindow> windows) {
       AccumT accumulator =
-          combineFnRunner.createAccumulator(key, options, sideInputReader, windows);
-      return combineFnRunner.addInput(key, accumulator, value, options, sideInputReader, windows);
+          combineFnRunner.createAccumulator(options, sideInputReader, windows);
+      return combineFnRunner.addInput(accumulator, value, options, sideInputReader, windows);
     }
 
     @Override
     public AccumT addInput(
         K key, AccumT accumulator, InputT value, PipelineOptions options,
         SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
-      return combineFnRunner.addInput(key, accumulator, value, options, sideInputReader, windows);
+      return combineFnRunner.addInput(accumulator, value, options, sideInputReader, windows);
     }
 
     @Override
@@ -151,10 +151,10 @@ public abstract class AbstractFlinkCombineRunner<
   public static class FinalFlinkCombiner<K, AccumT, OutputT> implements
       FlinkCombiner<K, AccumT, AccumT, OutputT> {
 
-    private final PerKeyCombineFnRunner<K, ?, AccumT, OutputT> combineFnRunner;
+    private final GlobalCombineFnRunner<?, AccumT, OutputT> combineFnRunner;
 
-    public FinalFlinkCombiner(CombineFnBase.PerKeyCombineFn<K, ?, AccumT, OutputT> combineFn) {
-      combineFnRunner = PerKeyCombineFnRunners.create(combineFn);
+    public FinalFlinkCombiner(CombineFnBase.GlobalCombineFn<?, AccumT, OutputT> combineFn) {
+      combineFnRunner = GlobalCombineFnRunners.create(combineFn);
     }
 
     @Override
@@ -169,14 +169,14 @@ public abstract class AbstractFlinkCombineRunner<
         K key, AccumT accumulator, AccumT value, PipelineOptions options,
         SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
       return combineFnRunner.mergeAccumulators(
-          key, ImmutableList.of(accumulator, value), options, sideInputReader, windows);
+          ImmutableList.of(accumulator, value), options, sideInputReader, windows);
     }
 
     @Override
     public OutputT extractOutput(
         K key, AccumT accumulator, PipelineOptions options, SideInputReader sideInputReader,
         Collection<? extends BoundedWindow> windows) {
-      return combineFnRunner.extractOutput(key, accumulator, options, sideInputReader, windows);
+      return combineFnRunner.extractOutput(accumulator, options, sideInputReader, windows);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
index 3712598..9ccf079 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
@@ -41,7 +41,7 @@ public class FlinkMergingNonShuffleReduceFunction<
     K, InputT, AccumT, OutputT, W extends BoundedWindow>
     extends RichGroupReduceFunction<WindowedValue<KV<K, InputT>>, WindowedValue<KV<K, OutputT>>> {
 
-  private final CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, OutputT> combineFn;
+  private final CombineFnBase.GlobalCombineFn<InputT, AccumT, OutputT> combineFn;
 
   private final WindowingStrategy<Object, W> windowingStrategy;
 
@@ -50,12 +50,12 @@ public class FlinkMergingNonShuffleReduceFunction<
   private final SerializedPipelineOptions serializedOptions;
 
   public FlinkMergingNonShuffleReduceFunction(
-      CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, OutputT> keyedCombineFn,
+      CombineFnBase.GlobalCombineFn<InputT, AccumT, OutputT> combineFn,
       WindowingStrategy<Object, W> windowingStrategy,
       Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs,
       PipelineOptions pipelineOptions) {
 
-    this.combineFn = keyedCombineFn;
+    this.combineFn = combineFn;
 
     this.windowingStrategy = windowingStrategy;
     this.sideInputs = sideInputs;
@@ -75,7 +75,6 @@ public class FlinkMergingNonShuffleReduceFunction<
         new FlinkSideInputReader(sideInputs, getRuntimeContext());
 
     AbstractFlinkCombineRunner<K, InputT, AccumT, OutputT, W> reduceRunner;
-
     if (windowingStrategy.getWindowFn().windowCoder().equals(IntervalWindow.getCoder())) {
       reduceRunner = new SortingFlinkCombineRunner<>();
     } else {
@@ -83,13 +82,12 @@ public class FlinkMergingNonShuffleReduceFunction<
     }
 
     reduceRunner.combine(
-        new AbstractFlinkCombineRunner.CompleteFlinkCombiner<>(combineFn),
+        new AbstractFlinkCombineRunner.CompleteFlinkCombiner<K, InputT, AccumT, OutputT>(combineFn),
         windowingStrategy,
         sideInputReader,
         options,
         elements,
         out);
-
   }
 
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
index 9a44840..4099f52 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
@@ -42,7 +42,7 @@ import org.apache.flink.util.Collector;
 public class FlinkPartialReduceFunction<K, InputT, AccumT, W extends BoundedWindow>
     extends RichGroupCombineFunction<WindowedValue<KV<K, InputT>>, WindowedValue<KV<K, AccumT>>> {
 
-  protected final CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, ?> combineFn;
+  protected final CombineFnBase.GlobalCombineFn<InputT, AccumT, ?> combineFn;
 
   protected final WindowingStrategy<Object, W> windowingStrategy;
 
@@ -51,7 +51,7 @@ public class FlinkPartialReduceFunction<K, InputT, AccumT, W extends BoundedWind
   protected final Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs;
 
   public FlinkPartialReduceFunction(
-      CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, ?> combineFn,
+      CombineFnBase.GlobalCombineFn<InputT, AccumT, ?> combineFn,
       WindowingStrategy<Object, W> windowingStrategy,
       Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs,
       PipelineOptions pipelineOptions) {
@@ -83,7 +83,7 @@ public class FlinkPartialReduceFunction<K, InputT, AccumT, W extends BoundedWind
     }
 
     reduceRunner.combine(
-        new AbstractFlinkCombineRunner.PartialFlinkCombiner<>(combineFn),
+        new AbstractFlinkCombineRunner.PartialFlinkCombiner<K, InputT, AccumT>(combineFn),
         windowingStrategy,
         sideInputReader,
         options,

http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
index 6c1a2e4..90dcbff 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
@@ -42,7 +42,7 @@ import org.apache.flink.util.Collector;
 public class FlinkReduceFunction<K, AccumT, OutputT, W extends BoundedWindow>
     extends RichGroupReduceFunction<WindowedValue<KV<K, AccumT>>, WindowedValue<KV<K, OutputT>>> {
 
-  protected final CombineFnBase.PerKeyCombineFn<K, ?, AccumT, OutputT> combineFn;
+  protected final CombineFnBase.GlobalCombineFn<?, AccumT, OutputT> combineFn;
 
   protected final WindowingStrategy<Object, W> windowingStrategy;
 
@@ -51,12 +51,12 @@ public class FlinkReduceFunction<K, AccumT, OutputT, W extends BoundedWindow>
   protected final SerializedPipelineOptions serializedOptions;
 
   public FlinkReduceFunction(
-      CombineFnBase.PerKeyCombineFn<K, ?, AccumT, OutputT> keyedCombineFn,
+      CombineFnBase.GlobalCombineFn<?, AccumT, OutputT> combineFn,
       WindowingStrategy<Object, W> windowingStrategy,
       Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs,
       PipelineOptions pipelineOptions) {
 
-    this.combineFn = keyedCombineFn;
+    this.combineFn = combineFn;
 
     this.windowingStrategy = windowingStrategy;
     this.sideInputs = sideInputs;
@@ -83,15 +83,13 @@ public class FlinkReduceFunction<K, AccumT, OutputT, W extends BoundedWindow>
     } else {
       reduceRunner = new SortingFlinkCombineRunner<>();
     }
-
     reduceRunner.combine(
-        new AbstractFlinkCombineRunner.FinalFlinkCombiner<>(combineFn),
+        new AbstractFlinkCombineRunner.FinalFlinkCombiner<K, AccumT, OutputT>(combineFn),
         windowingStrategy,
         sideInputReader,
         options,
         elements,
         out);
-
   }
 
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SortingFlinkCombineRunner.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SortingFlinkCombineRunner.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SortingFlinkCombineRunner.java
index eac465c..4aacb4a 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SortingFlinkCombineRunner.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SortingFlinkCombineRunner.java
@@ -43,7 +43,6 @@ import org.joda.time.Instant;
 public class SortingFlinkCombineRunner<K, InputT, AccumT, OutputT, W extends BoundedWindow>
     extends AbstractFlinkCombineRunner<K, InputT, AccumT, OutputT, W> {
 
-
   @Override
   public void combine(
       FlinkCombiner<K, InputT, AccumT, OutputT> flinkCombiner,


Mime
View raw message