beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [2/6] beam git commit: Remove KeyedCombineFn
Date Mon, 01 May 2017 02:25:38 GMT
http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/sdks/java/core/src/main/java/org/apache/beam/sdk/util/AppliedCombineFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/AppliedCombineFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/AppliedCombineFn.java
index 30b302c..b16aadc 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/AppliedCombineFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/AppliedCombineFn.java
@@ -23,13 +23,12 @@ import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.transforms.CombineFnBase.PerKeyCombineFn;
-import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
+import org.apache.beam.sdk.transforms.CombineFnBase.GlobalCombineFn;
 import org.apache.beam.sdk.values.PCollectionView;
 
 /**
- * A {@link KeyedCombineFnWithContext} with a fixed accumulator coder. This is created from a
- * specific application of the {@link KeyedCombineFnWithContext}.
+ * A {@link GlobalCombineFn} with a fixed accumulator coder. This is created from a
+ * specific application of the {@link GlobalCombineFn}.
  *
  *  <p>Because the {@code AccumT} may reference {@code InputT}, the specific {@code Coder<AccumT>}
  *  may depend on the {@code Coder<InputT>}.
@@ -41,14 +40,14 @@ import org.apache.beam.sdk.values.PCollectionView;
  */
 public class AppliedCombineFn<K, InputT, AccumT, OutputT> implements Serializable {
 
-  private final PerKeyCombineFn<K, InputT, AccumT, OutputT> fn;
+  private final GlobalCombineFn<InputT, AccumT, OutputT> fn;
   private final Coder<AccumT> accumulatorCoder;
 
   private final Iterable<PCollectionView<?>> sideInputViews;
   private final KvCoder<K, InputT> kvCoder;
   private final WindowingStrategy<?, ?> windowingStrategy;
 
-  private AppliedCombineFn(PerKeyCombineFn<K, InputT, AccumT, OutputT> fn,
+  private AppliedCombineFn(GlobalCombineFn<InputT, AccumT, OutputT> fn,
       Coder<AccumT> accumulatorCoder, Iterable<PCollectionView<?>> sideInputViews,
       KvCoder<K, InputT> kvCoder, WindowingStrategy<?, ?> windowingStrategy) {
     this.fn = fn;
@@ -60,41 +59,41 @@ public class AppliedCombineFn<K, InputT, AccumT, OutputT> implements Serializabl
 
   public static <K, InputT, AccumT, OutputT> AppliedCombineFn<K, InputT, AccumT, OutputT>
       withAccumulatorCoder(
-          PerKeyCombineFn<? super K, ? super InputT, AccumT, OutputT> fn,
+          GlobalCombineFn<? super InputT, AccumT, OutputT> fn,
           Coder<AccumT> accumCoder) {
     return withAccumulatorCoder(fn, accumCoder, null, null, null);
   }
 
   public static <K, InputT, AccumT, OutputT> AppliedCombineFn<K, InputT, AccumT, OutputT>
       withAccumulatorCoder(
-          PerKeyCombineFn<? super K, ? super InputT, AccumT, OutputT> fn,
+          GlobalCombineFn<? super InputT, AccumT, OutputT> fn,
           Coder<AccumT> accumCoder, Iterable<PCollectionView<?>> sideInputViews,
           KvCoder<K, InputT> kvCoder, WindowingStrategy<?, ?> windowingStrategy) {
     // Casting down the K and InputT is safe because they're only used as inputs.
     @SuppressWarnings("unchecked")
-    PerKeyCombineFn<K, InputT, AccumT, OutputT> clonedFn =
-        (PerKeyCombineFn<K, InputT, AccumT, OutputT>) SerializableUtils.clone(fn);
+    GlobalCombineFn<InputT, AccumT, OutputT> clonedFn =
+        (GlobalCombineFn<InputT, AccumT, OutputT>) SerializableUtils.clone(fn);
     return create(clonedFn, accumCoder, sideInputViews, kvCoder, windowingStrategy);
   }
 
   @VisibleForTesting
   public static <K, InputT, AccumT, OutputT> AppliedCombineFn<K, InputT, AccumT, OutputT>
-      withInputCoder(PerKeyCombineFn<? super K, ? super InputT, AccumT, OutputT> fn,
+      withInputCoder(GlobalCombineFn<? super InputT, AccumT, OutputT> fn,
           CoderRegistry registry, KvCoder<K, InputT> kvCoder) {
     return withInputCoder(fn, registry, kvCoder, null, null);
   }
 
   public static <K, InputT, AccumT, OutputT> AppliedCombineFn<K, InputT, AccumT, OutputT>
-      withInputCoder(PerKeyCombineFn<? super K, ? super InputT, AccumT, OutputT> fn,
+      withInputCoder(GlobalCombineFn<? super InputT, AccumT, OutputT> fn,
           CoderRegistry registry, KvCoder<K, InputT> kvCoder,
           Iterable<PCollectionView<?>> sideInputViews, WindowingStrategy<?, ?> windowingStrategy) {
     // Casting down the K and InputT is safe because they're only used as inputs.
     @SuppressWarnings("unchecked")
-    PerKeyCombineFn<K, InputT, AccumT, OutputT> clonedFn =
-        (PerKeyCombineFn<K, InputT, AccumT, OutputT>) SerializableUtils.clone(fn);
+    GlobalCombineFn<InputT, AccumT, OutputT> clonedFn =
+        (GlobalCombineFn<InputT, AccumT, OutputT>) SerializableUtils.clone(fn);
     try {
-      Coder<AccumT> accumulatorCoder = clonedFn.getAccumulatorCoder(
-          registry, kvCoder.getKeyCoder(), kvCoder.getValueCoder());
+      Coder<AccumT> accumulatorCoder =
+          clonedFn.getAccumulatorCoder(registry, kvCoder.getValueCoder());
       return create(clonedFn, accumulatorCoder, sideInputViews, kvCoder, windowingStrategy);
     } catch (CannotProvideCoderException e) {
       throw new IllegalStateException("Could not determine coder for accumulator", e);
@@ -102,14 +101,14 @@ public class AppliedCombineFn<K, InputT, AccumT, OutputT> implements Serializabl
   }
 
   private static <K, InputT, AccumT, OutputT> AppliedCombineFn<K, InputT, AccumT, OutputT> create(
-      PerKeyCombineFn<K, InputT, AccumT, OutputT> fn,
+      GlobalCombineFn<InputT, AccumT, OutputT> fn,
       Coder<AccumT> accumulatorCoder, Iterable<PCollectionView<?>> sideInputViews,
       KvCoder<K, InputT> kvCoder, WindowingStrategy<?, ?> windowingStrategy) {
     return new AppliedCombineFn<>(
         fn, accumulatorCoder, sideInputViews, kvCoder, windowingStrategy);
   }
 
-  public PerKeyCombineFn<K, InputT, AccumT, OutputT> getFn() {
+  public GlobalCombineFn<InputT, AccumT, OutputT> getFn() {
     return fn;
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineFnUtil.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineFnUtil.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineFnUtil.java
index a9a0178..a394180 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineFnUtil.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineFnUtil.java
@@ -24,12 +24,9 @@ import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
 import org.apache.beam.sdk.transforms.CombineFnBase.GlobalCombineFn;
-import org.apache.beam.sdk.transforms.CombineFnBase.PerKeyCombineFn;
 import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext;
 import org.apache.beam.sdk.transforms.CombineWithContext.Context;
-import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.util.state.StateContext;
 
@@ -37,20 +34,21 @@ import org.apache.beam.sdk.util.state.StateContext;
  * Static utility methods that create combine function instances.
  */
 public class CombineFnUtil {
+
   /**
-   * Returns the partial application of the {@link KeyedCombineFnWithContext} to a specific
-   * context to produce a {@link KeyedCombineFn}.
+   * Returns the partial application of the {@link CombineFnWithContext} to a specific context
+   * to produce a {@link CombineFn}.
    *
-   * <p>The returned {@link KeyedCombineFn} cannot be serialized.
+   * <p>The returned {@link CombineFn} cannot be serialized.
    */
-  public static <K, InputT, AccumT, OutputT> KeyedCombineFn<K, InputT, AccumT, OutputT>
-  bindContext(
-      KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn,
+  public static <K, InputT, AccumT, OutputT> CombineFn<InputT, AccumT, OutputT> bindContext(
+      CombineFnWithContext<InputT, AccumT, OutputT> combineFn,
       StateContext<?> stateContext) {
     Context context = CombineContextFactory.createFromStateContext(stateContext);
-    return new NonSerializableBoundedKeyedCombineFn<>(combineFn, context);
+    return new NonSerializableBoundedCombineFn<>(combineFn, context);
   }
 
+
   /**
    * Return a {@link CombineFnWithContext} from the given {@link GlobalCombineFn}.
    */
@@ -110,100 +108,55 @@ public class CombineFnUtil {
     }
   }
 
-  /**
-   * Return a {@link KeyedCombineFnWithContext} from the given {@link PerKeyCombineFn}.
-   */
-  public static <K, InputT, AccumT, OutputT> KeyedCombineFnWithContext<K, InputT, AccumT, OutputT>
-  toFnWithContext(PerKeyCombineFn<K, InputT, AccumT, OutputT> perKeyCombineFn) {
-    if (perKeyCombineFn instanceof KeyedCombineFnWithContext) {
-      @SuppressWarnings("unchecked")
-      KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> keyedCombineFnWithContext =
-          (KeyedCombineFnWithContext<K, InputT, AccumT, OutputT>) perKeyCombineFn;
-      return keyedCombineFnWithContext;
-    } else {
-      @SuppressWarnings("unchecked")
-      final KeyedCombineFn<K, InputT, AccumT, OutputT> keyedCombineFn =
-          (KeyedCombineFn<K, InputT, AccumT, OutputT>) perKeyCombineFn;
-      return new KeyedCombineFnWithContext<K, InputT, AccumT, OutputT>() {
-        @Override
-        public AccumT createAccumulator(K key, Context c) {
-          return keyedCombineFn.createAccumulator(key);
-        }
-        @Override
-        public AccumT addInput(K key, AccumT accumulator, InputT value, Context c) {
-          return keyedCombineFn.addInput(key, accumulator, value);
-        }
-        @Override
-        public AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators, Context c) {
-          return keyedCombineFn.mergeAccumulators(key, accumulators);
-        }
-        @Override
-        public OutputT extractOutput(K key, AccumT accumulator, Context c) {
-          return keyedCombineFn.extractOutput(key, accumulator);
-        }
-        @Override
-        public AccumT compact(K key, AccumT accumulator, Context c) {
-          return keyedCombineFn.compact(key, accumulator);
-        }
-        @Override
-        public Coder<AccumT> getAccumulatorCoder(CoderRegistry registry, Coder<K> keyCoder,
-            Coder<InputT> inputCoder) throws CannotProvideCoderException {
-          return keyedCombineFn.getAccumulatorCoder(registry, keyCoder, inputCoder);
-        }
-        @Override
-        public Coder<OutputT> getDefaultOutputCoder(CoderRegistry registry, Coder<K> keyCoder,
-            Coder<InputT> inputCoder) throws CannotProvideCoderException {
-          return keyedCombineFn.getDefaultOutputCoder(registry, keyCoder, inputCoder);
-        }
-        @Override
-        public void populateDisplayData(DisplayData.Builder builder) {
-          keyedCombineFn.populateDisplayData(builder);
-        }
-      };
-    }
-  }
-
-  private static class NonSerializableBoundedKeyedCombineFn<K, InputT, AccumT, OutputT>
-      extends KeyedCombineFn<K, InputT, AccumT, OutputT> {
-    private final KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn;
+  private static class NonSerializableBoundedCombineFn<InputT, AccumT, OutputT>
+      extends CombineFn<InputT, AccumT, OutputT> {
+    private final CombineFnWithContext<InputT, AccumT, OutputT> combineFn;
     private final Context context;
 
-    private NonSerializableBoundedKeyedCombineFn(
-        KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn,
-        Context context) {
+    private NonSerializableBoundedCombineFn(
+        CombineFnWithContext<InputT, AccumT, OutputT> combineFn, Context context) {
       this.combineFn = combineFn;
       this.context = context;
     }
+
     @Override
-    public AccumT createAccumulator(K key) {
-      return combineFn.createAccumulator(key, context);
+    public AccumT createAccumulator() {
+      return combineFn.createAccumulator(context);
     }
+
     @Override
-    public AccumT addInput(K key, AccumT accumulator, InputT value) {
-      return combineFn.addInput(key, accumulator, value, context);
+    public AccumT addInput(AccumT accumulator, InputT value) {
+      return combineFn.addInput(accumulator, value, context);
     }
+
     @Override
-    public AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators) {
-      return combineFn.mergeAccumulators(key, accumulators, context);
+    public AccumT mergeAccumulators(Iterable<AccumT> accumulators) {
+      return combineFn.mergeAccumulators(accumulators, context);
     }
+
     @Override
-    public OutputT extractOutput(K key, AccumT accumulator) {
-      return combineFn.extractOutput(key, accumulator, context);
+    public OutputT extractOutput(AccumT accumulator) {
+      return combineFn.extractOutput(accumulator, context);
     }
+
     @Override
-    public AccumT compact(K key, AccumT accumulator) {
-      return combineFn.compact(key, accumulator, context);
+    public AccumT compact(AccumT accumulator) {
+      return combineFn.compact(accumulator, context);
     }
+
     @Override
-    public Coder<AccumT> getAccumulatorCoder(CoderRegistry registry, Coder<K> keyCoder,
-        Coder<InputT> inputCoder) throws CannotProvideCoderException {
-      return combineFn.getAccumulatorCoder(registry, keyCoder, inputCoder);
+    public Coder<AccumT> getAccumulatorCoder(CoderRegistry registry, Coder<InputT> inputCoder)
+        throws CannotProvideCoderException {
+      return combineFn.getAccumulatorCoder(registry, inputCoder);
     }
+
     @Override
-    public Coder<OutputT> getDefaultOutputCoder(CoderRegistry registry, Coder<K> keyCoder,
-        Coder<InputT> inputCoder) throws CannotProvideCoderException {
-      return combineFn.getDefaultOutputCoder(registry, keyCoder, inputCoder);
+    public Coder<OutputT> getDefaultOutputCoder(
+        CoderRegistry registry, Coder<InputT> inputCoder)
+        throws CannotProvideCoderException {
+      return combineFn.getDefaultOutputCoder(registry, inputCoder);
     }
+
     @Override
     public void populateDisplayData(DisplayData.Builder builder) {
       combineFn.populateDisplayData(builder);

http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java
index f9ab115..6fe37a1 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java
@@ -45,20 +45,11 @@ public interface StateBinder<K> {
       Coder<AccumT> accumCoder,
       Combine.CombineFn<InputT, AccumT, OutputT> combineFn);
 
-  <InputT, AccumT, OutputT>
-  CombiningState<InputT, AccumT, OutputT> bindKeyedCombining(
-          String id,
-          StateSpec<? super K, CombiningState<InputT, AccumT, OutputT>> spec,
-          Coder<AccumT> accumCoder,
-          Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn);
-
-  <InputT, AccumT, OutputT>
-  CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningWithContext(
-          String id,
-          StateSpec<? super K, CombiningState<InputT, AccumT, OutputT>> spec,
-          Coder<AccumT> accumCoder,
-          CombineWithContext.KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT>
-              combineFn);
+  <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindCombiningWithContext(
+      String id,
+      StateSpec<? super K, CombiningState<InputT, AccumT, OutputT>> spec,
+      Coder<AccumT> accumCoder,
+      CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn);
 
   /**
    * Bind to a watermark {@link StateSpec}.

http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java
index 8fa5bb0..a057a0b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java
@@ -27,8 +27,7 @@ import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
-import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
+import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
 
@@ -77,52 +76,16 @@ public class StateSpecs {
   }
 
   /**
-   * Create a state spec for values that use a {@link KeyedCombineFn} to automatically merge
+   * Create a state spec for values that use a {@link CombineFnWithContext} to automatically merge
    * multiple {@code InputT}s into a single {@code OutputT}.
    */
-  public static <K, InputT, AccumT, OutputT>
-  StateSpec<K, CombiningState<InputT, AccumT, OutputT>> keyedCombining(
-      KeyedCombineFn<K, InputT, AccumT, OutputT> combineFn) {
-    return new KeyedCombiningStateSpec<K, InputT, AccumT, OutputT>(null, combineFn);
-  }
-
-  /**
-   * Create a state spec for values that use a {@link KeyedCombineFn} to automatically merge
-   * multiple {@code InputT}s into a single {@code OutputT}.
-   */
-  public static <K, InputT, AccumT, OutputT>
-      StateSpec<K, CombiningState<InputT, AccumT, OutputT>> keyedCombining(
-          Coder<AccumT> accumCoder, KeyedCombineFn<K, InputT, AccumT, OutputT> combineFn) {
+  public static <InputT, AccumT, OutputT>
+      StateSpec<Object, CombiningState<InputT, AccumT, OutputT>> combining(
+          Coder<AccumT> accumCoder, CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
     checkArgument(accumCoder != null,
         "accumCoder should not be null. "
-            + "Consider using keyedCombining(KeyedCombineFn<> combineFn) instead.");
-    return keyedCombiningInternal(accumCoder, combineFn);
-  }
-
-  /**
-   * Create a state spec for values that use a {@link KeyedCombineFnWithContext} to automatically
-   * merge multiple {@code InputT}s into a single {@code OutputT}.
-   */
-  public static <K, InputT, AccumT, OutputT>
-  StateSpec<K, CombiningState<InputT, AccumT, OutputT>>
-  keyedCombiningWithContext(KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn) {
-    return new KeyedCombiningWithContextStateSpec<K, InputT, AccumT, OutputT>(null, combineFn);
-  }
-
-  /**
-   * Create a state spec for values that use a {@link KeyedCombineFnWithContext} to automatically
-   * merge multiple {@code InputT}s into a single {@code OutputT}.
-   */
-  public static <K, InputT, AccumT, OutputT>
-      StateSpec<K, CombiningState<InputT, AccumT, OutputT>>
-  keyedCombiningWithContext(
-              Coder<AccumT> accumCoder,
-              KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn) {
-    checkArgument(accumCoder != null,
-        "accumCoder should not be null. Consider using "
-            + "keyedCombiningWithContext(KeyedCombineFnWithContext<> combineFn) instead.");
-    return new KeyedCombiningWithContextStateSpec<K, InputT, AccumT, OutputT>(
-        accumCoder, combineFn);
+            + "Consider using combining(CombineFn<> combineFn) instead.");
+    return combiningInternal(accumCoder, combineFn);
   }
 
   /**
@@ -155,10 +118,10 @@ public class StateSpecs {
     return new CombiningStateSpec<InputT, AccumT, OutputT>(accumCoder, combineFn);
   }
 
-  private static <K, InputT, AccumT, OutputT>
-      StateSpec<K, CombiningState<InputT, AccumT, OutputT>> keyedCombiningInternal(
-          Coder<AccumT> accumCoder, KeyedCombineFn<K, InputT, AccumT, OutputT> combineFn) {
-    return new KeyedCombiningStateSpec<K, InputT, AccumT, OutputT>(accumCoder, combineFn);
+  private static <InputT, AccumT, OutputT>
+  StateSpec<Object, CombiningState<InputT, AccumT, OutputT>> combiningInternal(
+      Coder<AccumT> accumCoder, CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
+    return new CombiningWithContextStateSpec<InputT, AccumT, OutputT>(accumCoder, combineFn);
   }
 
   /**
@@ -216,17 +179,17 @@ public class StateSpecs {
   public static <K, InputT, AccumT, OutputT>
       StateSpec<Object, BagState<AccumT>> convertToBagSpecInternal(
           StateSpec<? super K, CombiningState<InputT, AccumT, OutputT>> combiningSpec) {
-    if (combiningSpec instanceof KeyedCombiningStateSpec) {
+    if (combiningSpec instanceof CombiningStateSpec) {
       // Checked above; conversion to a bag spec depends on the provided spec being one of those
       // created via the factory methods in this class.
       @SuppressWarnings("unchecked")
-      KeyedCombiningStateSpec<K, InputT, AccumT, OutputT> typedSpec =
-          (KeyedCombiningStateSpec<K, InputT, AccumT, OutputT>) combiningSpec;
+      CombiningStateSpec<InputT, AccumT, OutputT> typedSpec =
+          (CombiningStateSpec<InputT, AccumT, OutputT>) combiningSpec;
       return typedSpec.asBagSpec();
-    } else if (combiningSpec instanceof KeyedCombiningWithContextStateSpec) {
+    } else if (combiningSpec instanceof CombiningWithContextStateSpec) {
       @SuppressWarnings("unchecked")
-      KeyedCombiningWithContextStateSpec<K, InputT, AccumT, OutputT> typedSpec =
-          (KeyedCombiningWithContextStateSpec<K, InputT, AccumT, OutputT>) combiningSpec;
+      CombiningWithContextStateSpec<InputT, AccumT, OutputT> typedSpec =
+          (CombiningWithContextStateSpec<InputT, AccumT, OutputT>) combiningSpec;
       return typedSpec.asBagSpec();
     } else {
       throw new IllegalArgumentException("Unexpected StateSpec " + combiningSpec);
@@ -297,7 +260,6 @@ public class StateSpecs {
    * <p>Includes the {@link CombineFn} and the coder for the accumulator type.
    */
   private static class CombiningStateSpec<InputT, AccumT, OutputT>
-      extends KeyedCombiningStateSpec<Object, InputT, AccumT, OutputT>
       implements StateSpec<Object, CombiningState<InputT, AccumT, OutputT>> {
 
     @Nullable
@@ -307,14 +269,14 @@ public class StateSpecs {
     private CombiningStateSpec(
         @Nullable Coder<AccumT> accumCoder,
         CombineFn<InputT, AccumT, OutputT> combineFn) {
-      super(accumCoder, combineFn.asKeyedFn());
       this.combineFn = combineFn;
       this.accumCoder = accumCoder;
     }
 
     @Override
-    protected Coder<AccumT> getAccumCoder() {
-      return accumCoder;
+    public CombiningState<InputT, AccumT, OutputT> bind(
+        String id, StateBinder<? extends Object> visitor) {
+      return visitor.bindCombining(id, this, accumCoder, combineFn);
     }
 
     @SuppressWarnings("unchecked")
@@ -326,51 +288,14 @@ public class StateSpecs {
         }
       }
     }
-  }
-
-  /**
-   * A specification for a state cell that is combined according to a
-   * {@link KeyedCombineFnWithContext}.
-   *
-   * <p>Includes the {@link KeyedCombineFnWithContext} and the coder for the accumulator type.
-   */
-  private static class KeyedCombiningWithContextStateSpec<K, InputT, AccumT, OutputT>
-      implements StateSpec<K, CombiningState<InputT, AccumT, OutputT>> {
-
-    @Nullable
-    private Coder<AccumT> accumCoder;
-    private final KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn;
-
-    protected KeyedCombiningWithContextStateSpec(
-        @Nullable Coder<AccumT> accumCoder,
-        KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn) {
-      this.combineFn = combineFn;
-      this.accumCoder = accumCoder;
-    }
-
-    @Override
-    public CombiningState<InputT, AccumT, OutputT> bind(
-        String id, StateBinder<? extends K> visitor) {
-      return visitor.bindKeyedCombiningWithContext(id, this, accumCoder, combineFn);
-    }
-
-    @SuppressWarnings("unchecked")
-    @Override
-    public void offerCoders(Coder[] coders) {
-      if (this.accumCoder == null) {
-        if (coders[2] != null) {
-          this.accumCoder = (Coder<AccumT>) coders[2];
-        }
-      }
-    }
 
     @Override public void finishSpecifying() {
       if (accumCoder == null) {
         throw new IllegalStateException("Unable to infer a coder for"
-            + " KeyedCombiningWithContextState and no Coder was specified."
+            + " CombiningState and no Coder was specified."
             + " Please set a coder by either invoking"
-            + " StateSpecs.keyedCombining(Coder<AccumT> accumCoder,"
-            + " KeyedCombineFn<K, InputT, AccumT, OutputT> combineFn)"
+            + " StateSpecs.combining(Coder<AccumT> accumCoder,"
+            + " CombineFn<InputT, AccumT, OutputT> combineFn)"
             + " or by registering the coder in the Pipeline's CoderRegistry.");
       }
     }
@@ -381,12 +306,12 @@ public class StateSpecs {
         return true;
       }
 
-      if (!(obj instanceof KeyedCombiningWithContextStateSpec)) {
+      if (!(obj instanceof CombiningStateSpec)) {
         return false;
       }
 
-      KeyedCombiningWithContextStateSpec<?, ?, ?, ?> that =
-          (KeyedCombiningWithContextStateSpec<?, ?, ?, ?>) obj;
+      CombiningStateSpec<?, ?, ?> that =
+          (CombiningStateSpec<?, ?, ?>) obj;
       return Objects.equals(this.accumCoder, that.accumCoder);
     }
 
@@ -401,32 +326,28 @@ public class StateSpecs {
   }
 
   /**
-   * A specification for a state cell that is combined according to a {@link KeyedCombineFn}.
+   * A specification for a state cell that is combined according to a {@link
+   * CombineFnWithContext}.
    *
-   * <p>Includes the {@link KeyedCombineFn} and the coder for the accumulator type.
+   * <p>Includes the {@link CombineFnWithContext} and the coder for the accumulator type.
    */
-  private static class KeyedCombiningStateSpec<K, InputT, AccumT, OutputT>
-      implements StateSpec<K, CombiningState<InputT, AccumT, OutputT>> {
+  private static class CombiningWithContextStateSpec<InputT, AccumT, OutputT>
+      implements StateSpec<Object, CombiningState<InputT, AccumT, OutputT>> {
 
-    @Nullable
-    private Coder<AccumT> accumCoder;
-    private final KeyedCombineFn<K, InputT, AccumT, OutputT> keyedCombineFn;
+    @Nullable private Coder<AccumT> accumCoder;
+    private final CombineFnWithContext<InputT, AccumT, OutputT> combineFn;
 
-    protected KeyedCombiningStateSpec(
+    private CombiningWithContextStateSpec(
         @Nullable Coder<AccumT> accumCoder,
-        KeyedCombineFn<K, InputT, AccumT, OutputT> keyedCombineFn) {
-      this.keyedCombineFn = keyedCombineFn;
+        CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
+      this.combineFn = combineFn;
       this.accumCoder = accumCoder;
     }
 
-    protected Coder<AccumT> getAccumCoder() {
-      return accumCoder;
-    }
-
     @Override
     public CombiningState<InputT, AccumT, OutputT> bind(
-        String id, StateBinder<? extends K> visitor) {
-      return visitor.bindKeyedCombining(id, this, getAccumCoder(), keyedCombineFn);
+        String id, StateBinder<? extends Object> visitor) {
+      return visitor.bindCombiningWithContext(id, this, accumCoder, combineFn);
     }
 
     @SuppressWarnings("unchecked")
@@ -439,13 +360,16 @@ public class StateSpecs {
       }
     }
 
-    @Override public void finishSpecifying() {
-      if (getAccumCoder() == null) {
-        throw new IllegalStateException("Unable to infer a coder for GroupingState and no"
-            + " Coder was specified. Please set a coder by either invoking"
-            + " StateSpecs.combining(Coder<AccumT> accumCoder,"
-            + " CombineFn<InputT, AccumT, OutputT> combineFn)"
-            + " or by registering the coder in the Pipeline's CoderRegistry.");
+    @Override
+    public void finishSpecifying() {
+      if (accumCoder == null) {
+        throw new IllegalStateException(
+            "Unable to infer a coder for"
+                + " CombiningWithContextState and no Coder was specified."
+                + " Please set a coder by either invoking"
+                + " StateSpecs.combiningWithcontext(Coder<AccumT> accumCoder,"
+                + " CombineFnWithContext<InputT, AccumT, OutputT> combineFn)"
+                + " or by registering the coder in the Pipeline's CoderRegistry.");
       }
     }
 
@@ -455,12 +379,11 @@ public class StateSpecs {
         return true;
       }
 
-      if (!(obj instanceof CombiningStateSpec)) {
+      if (!(obj instanceof CombiningWithContextStateSpec)) {
         return false;
       }
 
-      KeyedCombiningStateSpec<?, ?, ?, ?> that =
-          (KeyedCombiningStateSpec<?, ?, ?, ?>) obj;
+      CombiningWithContextStateSpec<?, ?, ?> that = (CombiningWithContextStateSpec<?, ?, ?>) obj;
       return Objects.equals(this.accumCoder, that.accumCoder);
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
index 13c5f16..dcb8fdc 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
@@ -40,7 +40,7 @@ import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.Combine.BinaryCombineFn;
 import org.apache.beam.sdk.transforms.CombineFns.CoCombineResult;
-import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
+import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
@@ -79,7 +79,7 @@ public class  CombineFnsTest {
     expectedException.expectMessage("it is already present in the composition");
 
     TupleTag<Integer> tag = new TupleTag<Integer>();
-    CombineFns.composeKeyed()
+    CombineFns.compose()
       .with(new GetIntegerFunction(), Max.ofIntegers(), tag)
       .with(new GetIntegerFunction(), Min.ofIntegers(), tag);
   }
@@ -93,23 +93,6 @@ public class  CombineFnsTest {
     CombineFns.compose()
       .with(
           new GetUserStringFunction(),
-          new ConcatStringWithContext(null /* view */).forKey("G", StringUtf8Coder.of()),
-          tag)
-      .with(
-          new GetUserStringFunction(),
-          new ConcatStringWithContext(null /* view */).forKey("G", StringUtf8Coder.of()),
-          tag);
-  }
-
-  @Test
-  public void testDuplicatedTagsWithContextKeyed() {
-    expectedException.expect(IllegalArgumentException.class);
-    expectedException.expectMessage("it is already present in the composition");
-
-    TupleTag<UserString> tag = new TupleTag<UserString>();
-    CombineFns.composeKeyed()
-      .with(
-          new GetUserStringFunction(),
           new ConcatStringWithContext(null /* view */),
           tag)
       .with(
@@ -153,17 +136,15 @@ public class  CombineFnsTest {
         .apply(
             "ExtractGloballyResult", ParDo.of(new ExtractResultDoFn(maxIntTag, concatStringTag)));
 
-    PCollection<KV<String, KV<Integer, String>>> combinePerKey = perKeyInput
-        .apply(Combine.perKey(CombineFns.composeKeyed()
-            .with(
-                new GetIntegerFunction(),
-                Max.ofIntegers().<String>asKeyedFn(),
-                maxIntTag)
-            .with(
-                new GetUserStringFunction(),
-                new ConcatString().<String>asKeyedFn(),
-                concatStringTag)))
-        .apply("ExtractPerKeyResult", ParDo.of(new ExtractResultDoFn(maxIntTag, concatStringTag)));
+    PCollection<KV<String, KV<Integer, String>>> combinePerKey =
+        perKeyInput
+            .apply(
+                Combine.<String, KV<Integer, UserString>, CoCombineResult>perKey(
+                    CombineFns.compose()
+                        .with(new GetIntegerFunction(), Max.ofIntegers(), maxIntTag)
+                        .with(new GetUserStringFunction(), new ConcatString(), concatStringTag)))
+            .apply(
+                "ExtractPerKeyResult", ParDo.of(new ExtractResultDoFn(maxIntTag, concatStringTag)));
     PAssert.that(combineGlobally).containsInAnyOrder(
         KV.of("global", KV.of(13, "111134")));
     PAssert.that(combinePerKey).containsInAnyOrder(
@@ -205,7 +186,7 @@ public class  CombineFnsTest {
                 maxIntTag)
             .with(
                 new GetUserStringFunction(),
-                new ConcatStringWithContext(view).forKey("G", StringUtf8Coder.of()),
+                new ConcatStringWithContext(view),
                 concatStringTag))
             .withoutDefaults()
             .withSideInputs(ImmutableList.of(view)))
@@ -213,23 +194,24 @@ public class  CombineFnsTest {
         .apply(
             "ExtractGloballyResult", ParDo.of(new ExtractResultDoFn(maxIntTag, concatStringTag)));
 
-    PCollection<KV<String, KV<Integer, String>>> combinePerKey = perKeyInput
-        .apply(Combine.perKey(CombineFns.composeKeyed()
-            .with(
-                new GetIntegerFunction(),
-                Max.ofIntegers().<String>asKeyedFn(),
-                maxIntTag)
-            .with(
-                new GetUserStringFunction(),
-                new ConcatStringWithContext(view),
-                concatStringTag))
-            .withSideInputs(ImmutableList.of(view)))
-        .apply("ExtractPerKeyResult", ParDo.of(new ExtractResultDoFn(maxIntTag, concatStringTag)));
+    PCollection<KV<String, KV<Integer, String>>> combinePerKey =
+        perKeyInput
+            .apply(
+                Combine.<String, KV<Integer, UserString>, CoCombineResult>perKey(
+                        CombineFns.compose()
+                            .with(new GetIntegerFunction(), Max.ofIntegers(), maxIntTag)
+                            .with(
+                                new GetUserStringFunction(),
+                                new ConcatStringWithContext(view),
+                                concatStringTag))
+                    .withSideInputs(ImmutableList.of(view)))
+            .apply(
+                "ExtractPerKeyResult", ParDo.of(new ExtractResultDoFn(maxIntTag, concatStringTag)));
     PAssert.that(combineGlobally).containsInAnyOrder(
-        KV.of("global", KV.of(13, "111134GI")));
+        KV.of("global", KV.of(13, "111134I")));
     PAssert.that(combinePerKey).containsInAnyOrder(
-        KV.of("a", KV.of(4, "114Ia")),
-        KV.of("b", KV.of(13, "113Ib")));
+        KV.of("a", KV.of(4, "114I")),
+        KV.of("b", KV.of(13, "113I")));
     p.run();
   }
 
@@ -256,17 +238,16 @@ public class  CombineFnsTest {
     TupleTag<Integer> maxIntTag = new TupleTag<Integer>();
     TupleTag<UserString> concatStringTag = new TupleTag<UserString>();
 
-    PCollection<KV<String, KV<Integer, String>>> combinePerKey = perKeyInput
-        .apply(Combine.perKey(CombineFns.composeKeyed()
-            .with(
-                new GetIntegerFunction(),
-                Max.ofIntegers().<String>asKeyedFn(),
-                maxIntTag)
-            .with(
-                new GetUserStringFunction(),
-                new OutputNullString().<String>asKeyedFn(),
-                concatStringTag)))
-        .apply("ExtractPerKeyResult", ParDo.of(new ExtractResultDoFn(maxIntTag, concatStringTag)));
+    PCollection<KV<String, KV<Integer, String>>> combinePerKey =
+        perKeyInput
+            .apply(
+                Combine.<String, KV<Integer, UserString>, CoCombineResult>perKey(
+                    CombineFns.compose()
+                        .with(new GetIntegerFunction(), Max.ofIntegers(), maxIntTag)
+                        .with(
+                            new GetUserStringFunction(), new OutputNullString(), concatStringTag)))
+            .apply(
+                "ExtractPerKeyResult", ParDo.of(new ExtractResultDoFn(maxIntTag, concatStringTag)));
     PAssert.that(combinePerKey).containsInAnyOrder(
         KV.of("a", KV.of(4, (String) null)),
         KV.of("b", KV.of(13, (String) null)));
@@ -407,7 +388,7 @@ public class  CombineFnsTest {
   }
 
   private static class ConcatStringWithContext
-      extends KeyedCombineFnWithContext<String, UserString, UserString, UserString> {
+      extends CombineFnWithContext<UserString, UserString, UserString> {
     private final PCollectionView<String> view;
 
     private ConcatStringWithContext(PCollectionView<String> view) {
@@ -415,22 +396,22 @@ public class  CombineFnsTest {
     }
 
     @Override
-    public UserString createAccumulator(String key, CombineWithContext.Context c) {
-      return UserString.of(key + c.sideInput(view));
+    public UserString createAccumulator(CombineWithContext.Context c) {
+      return UserString.of(c.sideInput(view));
     }
 
     @Override
     public UserString addInput(
-        String key, UserString accumulator, UserString input, CombineWithContext.Context c) {
-      assertThat(accumulator.strValue, Matchers.startsWith(key + c.sideInput(view)));
+        UserString accumulator, UserString input, CombineWithContext.Context c) {
+      assertThat(accumulator.strValue, Matchers.startsWith(c.sideInput(view)));
       accumulator.strValue += input.strValue;
       return accumulator;
     }
 
     @Override
     public UserString mergeAccumulators(
-        String key, Iterable<UserString> accumulators, CombineWithContext.Context c) {
-      String keyPrefix = key + c.sideInput(view);
+        Iterable<UserString> accumulators, CombineWithContext.Context c) {
+      String keyPrefix = c.sideInput(view);
       String all = keyPrefix;
       for (UserString accumulator : accumulators) {
         assertThat(accumulator.strValue, Matchers.startsWith(keyPrefix));
@@ -441,9 +422,8 @@ public class  CombineFnsTest {
     }
 
     @Override
-    public UserString extractOutput(
-        String key, UserString accumulator, CombineWithContext.Context c) {
-      assertThat(accumulator.strValue, Matchers.startsWith(key + c.sideInput(view)));
+    public UserString extractOutput(UserString accumulator, CombineWithContext.Context c) {
+      assertThat(accumulator.strValue, Matchers.startsWith(c.sideInput(view)));
       char[] chars = accumulator.strValue.toCharArray();
       Arrays.sort(chars);
       return UserString.of(new String(chars));

http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
index a5f3df2..82c2504 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
@@ -18,7 +18,6 @@
 package org.apache.beam.sdk.transforms;
 
 import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 import static org.apache.beam.sdk.TestUtils.checkCombineFn;
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
@@ -58,9 +57,10 @@ import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.testing.ValidatesRunner;
-import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.CombineTest.TestCombineFn.Accumulator;
+import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext;
 import org.apache.beam.sdk.transforms.CombineWithContext.Context;
-import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
 import org.apache.beam.sdk.transforms.windowing.AfterPane;
@@ -127,7 +127,7 @@ public class CombineTest implements Serializable {
 
     // Java 8 will infer.
     PCollection<KV<String, String>> sumPerKey = input
-        .apply(Combine.perKey(new TestKeyedCombineFn()));
+        .apply(Combine.<String, Integer, String>perKey(new TestCombineFn()));
 
     PAssert.that(sum).containsInAnyOrder(globalSum);
     PAssert.that(sumPerKey).containsInAnyOrder(perKeyCombines);
@@ -147,13 +147,13 @@ public class CombineTest implements Serializable {
     PCollectionView<Integer> globallySumView = sum.apply(View.<Integer>asSingleton());
 
     // Java 8 will infer.
-    PCollection<KV<String, String>> combinePerKey = perKeyInput
-        .apply(Combine.perKey(new TestKeyedCombineFnWithContext(globallySumView))
-            .withSideInputs(Arrays.asList(globallySumView)));
+    PCollection<KV<String, String>> combinePerKey =
+        perKeyInput.apply(
+            Combine.<String, Integer, String>perKey(new TestCombineFnWithContext(globallySumView))
+                .withSideInputs(Arrays.asList(globallySumView)));
 
     PCollection<String> combineGlobally = globallyInput
-        .apply(Combine.globally(new TestKeyedCombineFnWithContext(globallySumView)
-            .forKey("G", StringUtf8Coder.of()))
+        .apply(Combine.globally(new TestCombineFnWithContext(globallySumView))
             .withoutDefaults()
             .withSideInputs(Arrays.asList(globallySumView)));
 
@@ -168,7 +168,7 @@ public class CombineTest implements Serializable {
   @Category(ValidatesRunner.class)
   @SuppressWarnings({"rawtypes", "unchecked"})
   public void testSimpleCombine() {
-    runTestSimpleCombine(TABLE, 20, Arrays.asList(KV.of("a", "114a"), KV.of("b", "113b")));
+    runTestSimpleCombine(TABLE, 20, Arrays.asList(KV.of("a", "114"), KV.of("b", "113")));
   }
 
   @Test
@@ -176,8 +176,8 @@ public class CombineTest implements Serializable {
   @SuppressWarnings({"rawtypes", "unchecked"})
   public void testSimpleCombineWithContext() {
     runTestSimpleCombineWithContext(TABLE, 20,
-        Arrays.asList(KV.of("a", "01124a"), KV.of("b", "01123b")),
-        new String[] {"01111234G"});
+        Arrays.asList(KV.of("a", "01124"), KV.of("b", "01123")),
+        new String[] {"01111234"});
   }
 
   @Test
@@ -260,14 +260,14 @@ public class CombineTest implements Serializable {
         .apply(Combine.globally(new SumInts()).withoutDefaults());
 
     PCollection<KV<String, String>> sumPerKey = input
-        .apply(Combine.perKey(new TestKeyedCombineFn()));
+        .apply(Combine.<String, Integer, String>perKey(new TestCombineFn()));
 
     PAssert.that(sum).containsInAnyOrder(2, 5, 13);
     PAssert.that(sumPerKey).containsInAnyOrder(
-        KV.of("a", "11a"),
-        KV.of("a", "4a"),
-        KV.of("b", "1b"),
-        KV.of("b", "13b"));
+        KV.of("a", "11"),
+        KV.of("a", "4"),
+        KV.of("b", "1"),
+        KV.of("b", "13"));
     pipeline.run();
   }
 
@@ -286,23 +286,23 @@ public class CombineTest implements Serializable {
 
     PCollectionView<Integer> globallySumView = sum.apply(View.<Integer>asSingleton());
 
-    PCollection<KV<String, String>> combinePerKeyWithContext = perKeyInput
-        .apply(Combine.perKey(new TestKeyedCombineFnWithContext(globallySumView))
-            .withSideInputs(Arrays.asList(globallySumView)));
+    PCollection<KV<String, String>> combinePerKeyWithContext =
+        perKeyInput.apply(
+            Combine.<String, Integer, String>perKey(new TestCombineFnWithContext(globallySumView))
+                .withSideInputs(Arrays.asList(globallySumView)));
 
     PCollection<String> combineGloballyWithContext = globallyInput
-        .apply(Combine.globally(new TestKeyedCombineFnWithContext(globallySumView)
-            .forKey("G", StringUtf8Coder.of()))
+        .apply(Combine.globally(new TestCombineFnWithContext(globallySumView))
             .withoutDefaults()
             .withSideInputs(Arrays.asList(globallySumView)));
 
     PAssert.that(sum).containsInAnyOrder(2, 5, 13);
     PAssert.that(combinePerKeyWithContext).containsInAnyOrder(
-        KV.of("a", "112a"),
-        KV.of("a", "45a"),
-        KV.of("b", "15b"),
-        KV.of("b", "1133b"));
-    PAssert.that(combineGloballyWithContext).containsInAnyOrder("112G", "145G", "1133G");
+        KV.of("a", "112"),
+        KV.of("a", "45"),
+        KV.of("b", "15"),
+        KV.of("b", "1133"));
+    PAssert.that(combineGloballyWithContext).containsInAnyOrder("112", "145", "1133");
     pipeline.run();
   }
 
@@ -321,28 +321,28 @@ public class CombineTest implements Serializable {
 
     PCollectionView<Integer> globallySumView = sum.apply(View.<Integer>asSingleton());
 
-    PCollection<KV<String, String>> combinePerKeyWithContext = perKeyInput
-        .apply(Combine.perKey(new TestKeyedCombineFnWithContext(globallySumView))
-            .withSideInputs(Arrays.asList(globallySumView)));
+    PCollection<KV<String, String>> combinePerKeyWithContext =
+        perKeyInput.apply(
+            Combine.<String, Integer, String>perKey(new TestCombineFnWithContext(globallySumView))
+                .withSideInputs(Arrays.asList(globallySumView)));
 
     PCollection<String> combineGloballyWithContext = globallyInput
-        .apply(Combine.globally(new TestKeyedCombineFnWithContext(globallySumView)
-            .forKey("G", StringUtf8Coder.of()))
+        .apply(Combine.globally(new TestCombineFnWithContext(globallySumView))
             .withoutDefaults()
             .withSideInputs(Arrays.asList(globallySumView)));
 
     PAssert.that(sum).containsInAnyOrder(1, 2, 1, 4, 5, 14, 13);
     PAssert.that(combinePerKeyWithContext).containsInAnyOrder(
-        KV.of("a", "11a"),
-        KV.of("a", "112a"),
-        KV.of("a", "11a"),
-        KV.of("a", "44a"),
-        KV.of("a", "45a"),
-        KV.of("b", "15b"),
-        KV.of("b", "11134b"),
-        KV.of("b", "1133b"));
+        KV.of("a", "11"),
+        KV.of("a", "112"),
+        KV.of("a", "11"),
+        KV.of("a", "44"),
+        KV.of("a", "45"),
+        KV.of("b", "15"),
+        KV.of("b", "11134"),
+        KV.of("b", "1133"));
     PAssert.that(combineGloballyWithContext).containsInAnyOrder(
-      "11G", "112G", "11G", "44G", "145G", "11134G", "1133G");
+      "11", "112", "11", "44", "145", "11134", "1133");
     pipeline.run();
   }
 
@@ -392,13 +392,13 @@ public class CombineTest implements Serializable {
         .apply(Combine.globally(new SumInts()).withoutDefaults());
 
     PCollection<KV<String, String>> sumPerKey = input
-        .apply(Combine.perKey(new TestKeyedCombineFn()));
+        .apply(Combine.<String, Integer, String>perKey(new TestCombineFn()));
 
     PAssert.that(sum).containsInAnyOrder(7, 13);
     PAssert.that(sumPerKey).containsInAnyOrder(
-        KV.of("a", "114a"),
-        KV.of("b", "1b"),
-        KV.of("b", "13b"));
+        KV.of("a", "114"),
+        KV.of("b", "1"),
+        KV.of("b", "13"));
     pipeline.run();
   }
 
@@ -419,26 +419,29 @@ public class CombineTest implements Serializable {
     PCollectionView<Integer> globallyFixedWindowsView =
         fixedWindowsSum.apply(View.<Integer>asSingleton().withDefaultValue(0));
 
-    PCollection<KV<String, String>> sessionsCombinePerKey = perKeyInput
-        .apply("PerKey Input Sessions",
-            Window.<KV<String, Integer>>into(Sessions.withGapDuration(Duration.millis(5))))
-        .apply(Combine.perKey(new TestKeyedCombineFnWithContext(globallyFixedWindowsView))
-            .withSideInputs(Arrays.asList(globallyFixedWindowsView)));
+    PCollection<KV<String, String>> sessionsCombinePerKey =
+        perKeyInput
+            .apply(
+                "PerKey Input Sessions",
+                Window.<KV<String, Integer>>into(Sessions.withGapDuration(Duration.millis(5))))
+            .apply(
+                Combine.<String, Integer, String>perKey(
+                        new TestCombineFnWithContext(globallyFixedWindowsView))
+                    .withSideInputs(Arrays.asList(globallyFixedWindowsView)));
 
     PCollection<String> sessionsCombineGlobally = globallyInput
         .apply("Globally Input Sessions",
             Window.<Integer>into(Sessions.withGapDuration(Duration.millis(5))))
-        .apply(Combine.globally(new TestKeyedCombineFnWithContext(globallyFixedWindowsView)
-            .forKey("G", StringUtf8Coder.of()))
+        .apply(Combine.globally(new TestCombineFnWithContext(globallyFixedWindowsView))
             .withoutDefaults()
             .withSideInputs(Arrays.asList(globallyFixedWindowsView)));
 
     PAssert.that(fixedWindowsSum).containsInAnyOrder(2, 4, 1, 13);
     PAssert.that(sessionsCombinePerKey).containsInAnyOrder(
-        KV.of("a", "1114a"),
-        KV.of("b", "11b"),
-        KV.of("b", "013b"));
-    PAssert.that(sessionsCombineGlobally).containsInAnyOrder("11114G", "013G");
+        KV.of("a", "1114"),
+        KV.of("b", "11"),
+        KV.of("b", "013"));
+    PAssert.that(sessionsCombineGlobally).containsInAnyOrder("11114", "013");
     pipeline.run();
   }
 
@@ -502,16 +505,15 @@ public class CombineTest implements Serializable {
   public void testHotKeyCombining() {
     PCollection<KV<String, Integer>> input = copy(createInput(pipeline, TABLE), 10);
 
-    KeyedCombineFn<String, Integer, ?, Double> mean =
-        new MeanInts().<String>asKeyedFn();
+    CombineFn<Integer, ?, Double> mean = new MeanInts();
     PCollection<KV<String, Double>> coldMean = input.apply("ColdMean",
-        Combine.perKey(mean).withHotKeyFanout(0));
+        Combine.<String, Integer, Double>perKey(mean).withHotKeyFanout(0));
     PCollection<KV<String, Double>> warmMean = input.apply("WarmMean",
-        Combine.perKey(mean).withHotKeyFanout(hotKeyFanout));
+        Combine.<String, Integer, Double>perKey(mean).withHotKeyFanout(hotKeyFanout));
     PCollection<KV<String, Double>> hotMean = input.apply("HotMean",
-        Combine.perKey(mean).withHotKeyFanout(5));
+        Combine.<String, Integer, Double>perKey(mean).withHotKeyFanout(5));
     PCollection<KV<String, Double>> splitMean = input.apply("SplitMean",
-        Combine.perKey(mean).withHotKeyFanout(splitHotKeyFanout));
+        Combine.<String, Integer, Double>perKey(mean).withHotKeyFanout(splitHotKeyFanout));
 
     List<KV<String, Double>> expected = Arrays.asList(KV.of("a", 2.0), KV.of("b", 7.0));
     PAssert.that(coldMean).containsInAnyOrder(expected);
@@ -678,10 +680,10 @@ public class CombineTest implements Serializable {
     assertEquals(
         "Combine.GloballyAsSingletonView",
         Combine.globally(new SumInts()).asSingletonView().getName());
-    assertEquals("Combine.perKey(TestKeyed)", Combine.perKey(new TestKeyedCombineFn()).getName());
+    assertEquals("Combine.perKey(Test)", Combine.perKey(new TestCombineFn()).getName());
     assertEquals(
-        "Combine.perKeyWithFanout(TestKeyed)",
-        Combine.perKey(new TestKeyedCombineFn()).withHotKeyFanout(10).getName());
+        "Combine.perKeyWithFanout(Test)",
+        Combine.perKey(new TestCombineFn()).withHotKeyFanout(10).getName());
   }
 
   @Test
@@ -908,14 +910,10 @@ public class CombineTest implements Serializable {
   }
 
   /**
-   * A KeyedCombineFn that exercises the full generality of [Keyed]CombineFn.
-   *
-   * <p>The net result of applying this CombineFn is a sorted list of all
-   * characters occurring in the key and the decimal representations of
-   * each value.
+   * A {@link CombineFn} that results in a sorted list of all characters occurring in the key and
+   * the decimal representations of each value.
    */
-  public static class TestKeyedCombineFn
-      extends KeyedCombineFn<String, Integer, TestKeyedCombineFn.Accumulator, String> {
+  public static class TestCombineFn extends CombineFn<Integer, TestCombineFn.Accumulator, String> {
 
     // Not serializable.
     static class Accumulator {
@@ -943,20 +941,18 @@ public class CombineTest implements Serializable {
 
     @Override
     public Coder<Accumulator> getAccumulatorCoder(
-        CoderRegistry registry, Coder<String> keyCoder, Coder<Integer> inputCoder) {
+        CoderRegistry registry, Coder<Integer> inputCoder) {
       return Accumulator.getCoder();
     }
 
     @Override
-    public Accumulator createAccumulator(String key) {
-      return new Accumulator(key);
+    public Accumulator createAccumulator() {
+      return new Accumulator("");
     }
 
     @Override
-    public Accumulator addInput(String key, Accumulator accumulator, Integer value) {
-      checkNotNull(key);
+    public Accumulator addInput(Accumulator accumulator, Integer value) {
       try {
-        assertThat(accumulator.value, Matchers.startsWith(key));
         return new Accumulator(accumulator.value + String.valueOf(value));
       } finally {
         accumulator.value = "cleared in addInput";
@@ -964,19 +960,17 @@ public class CombineTest implements Serializable {
     }
 
     @Override
-    public Accumulator mergeAccumulators(String key, Iterable<Accumulator> accumulators) {
-      String all = key;
+    public Accumulator mergeAccumulators(Iterable<Accumulator> accumulators) {
+      String all = "";
       for (Accumulator accumulator : accumulators) {
-        assertThat(accumulator.value, Matchers.startsWith(key));
-        all += accumulator.value.substring(key.length());
+        all += accumulator.value;
         accumulator.value = "cleared in mergeAccumulators";
       }
       return new Accumulator(all);
     }
 
     @Override
-    public String extractOutput(String key, Accumulator accumulator) {
-      assertThat(accumulator.value, Matchers.startsWith(key));
+    public String extractOutput(Accumulator accumulator) {
       char[] chars = accumulator.value.toCharArray();
       Arrays.sort(chars);
       return new String(chars);
@@ -984,38 +978,33 @@ public class CombineTest implements Serializable {
   }
 
   /**
-   * A {@link KeyedCombineFnWithContext} that exercises the full generality
-   * of [Keyed]CombineFnWithContext.
-   *
-   * <p>The net result of applying this CombineFn is a sorted list of all
-   * characters occurring in the key and the decimal representations of
-   * main and side inputs values.
+   * A {@link CombineFnWithContext} that produces a sorted list of all characters occurring in the
+   * key and the decimal representations of main and side inputs values.
    */
-  public class TestKeyedCombineFnWithContext
-      extends KeyedCombineFnWithContext<String, Integer, TestKeyedCombineFn.Accumulator, String> {
+  public class TestCombineFnWithContext extends CombineFnWithContext<Integer, Accumulator, String> {
     private final PCollectionView<Integer> view;
 
-    public TestKeyedCombineFnWithContext(PCollectionView<Integer> view) {
+    public TestCombineFnWithContext(PCollectionView<Integer> view) {
       this.view = view;
     }
 
     @Override
-    public Coder<TestKeyedCombineFn.Accumulator> getAccumulatorCoder(
-        CoderRegistry registry, Coder<String> keyCoder, Coder<Integer> inputCoder) {
-      return TestKeyedCombineFn.Accumulator.getCoder();
+    public Coder<TestCombineFn.Accumulator> getAccumulatorCoder(
+        CoderRegistry registry, Coder<Integer> inputCoder) {
+      return TestCombineFn.Accumulator.getCoder();
     }
 
     @Override
-    public TestKeyedCombineFn.Accumulator createAccumulator(String key, Context c) {
-      return new TestKeyedCombineFn.Accumulator(key + c.sideInput(view).toString());
+    public TestCombineFn.Accumulator createAccumulator(Context c) {
+      return new TestCombineFn.Accumulator(c.sideInput(view).toString());
     }
 
     @Override
-    public TestKeyedCombineFn.Accumulator addInput(
-        String key, TestKeyedCombineFn.Accumulator accumulator, Integer value, Context c) {
+    public TestCombineFn.Accumulator addInput(
+        TestCombineFn.Accumulator accumulator, Integer value, Context c) {
       try {
-        assertThat(accumulator.value, Matchers.startsWith(key + c.sideInput(view).toString()));
-        return new TestKeyedCombineFn.Accumulator(accumulator.value + String.valueOf(value));
+        assertThat(accumulator.value, Matchers.startsWith(c.sideInput(view).toString()));
+        return new TestCombineFn.Accumulator(accumulator.value + String.valueOf(value));
       } finally {
         accumulator.value = "cleared in addInput";
       }
@@ -1023,21 +1012,21 @@ public class CombineTest implements Serializable {
     }
 
     @Override
-    public TestKeyedCombineFn.Accumulator mergeAccumulators(
-        String key, Iterable<TestKeyedCombineFn.Accumulator> accumulators, Context c) {
-      String keyPrefix = key + c.sideInput(view).toString();
-      String all = keyPrefix;
-      for (TestKeyedCombineFn.Accumulator accumulator : accumulators) {
-        assertThat(accumulator.value, Matchers.startsWith(keyPrefix));
-        all += accumulator.value.substring(keyPrefix.length());
+    public TestCombineFn.Accumulator mergeAccumulators(
+        Iterable<TestCombineFn.Accumulator> accumulators, Context c) {
+      String prefix = c.sideInput(view).toString();
+      String all = prefix;
+      for (TestCombineFn.Accumulator accumulator : accumulators) {
+        assertThat(accumulator.value, Matchers.startsWith(prefix));
+        all += accumulator.value.substring(prefix.length());
         accumulator.value = "cleared in mergeAccumulators";
       }
-      return new TestKeyedCombineFn.Accumulator(all);
+      return new TestCombineFn.Accumulator(all);
     }
 
     @Override
-    public String extractOutput(String key, TestKeyedCombineFn.Accumulator accumulator, Context c) {
-      assertThat(accumulator.value, Matchers.startsWith(key + c.sideInput(view).toString()));
+    public String extractOutput(TestCombineFn.Accumulator accumulator, Context c) {
+      assertThat(accumulator.value, Matchers.startsWith(c.sideInput(view).toString()));
       char[] chars = accumulator.value.toCharArray();
       Arrays.sort(chars);
       return new String(chars);

http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index 1a976f2..52b2f5e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -2501,7 +2501,7 @@ public class ParDoTest implements Serializable {
         };
 
     thrown.expect(RuntimeException.class);
-    thrown.expectMessage("Unable to infer a coder for GroupingState and no Coder was specified.");
+    thrown.expectMessage("Unable to infer a coder for CombiningState and no Coder was specified.");
 
     pipeline
         .apply(Create.of(KV.of("hello", 3), KV.of("hello", 6), KV.of("hello", 7)))

http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
index 867fe0a..b3fa2c6 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
@@ -1122,7 +1122,7 @@ public class ViewTest implements Serializable {
 
     final PCollectionView<Map<String, Integer>> view =
         pipeline.apply("CreateSideInput", Create.of(KV.of("a", 1), KV.of("a", 20), KV.of("b", 3)))
-            .apply("SumIntegers", Combine.perKey(Sum.ofIntegers().<String>asKeyedFn()))
+            .apply("SumIntegers", Combine.<String, Integer, Integer>perKey(Sum.ofIntegers()))
             .apply(View.<String, Integer>asMap());
 
     PCollection<KV<String, Integer>> output =

http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CombineFnUtilTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CombineFnUtilTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CombineFnUtilTest.java
index 36a90e9..798e8dc 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CombineFnUtilTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CombineFnUtilTest.java
@@ -29,7 +29,6 @@ import java.io.ObjectOutputStream;
 import java.util.List;
 import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext;
 import org.apache.beam.sdk.transforms.CombineWithContext.Context;
-import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.util.state.StateContexts;
 import org.junit.Before;
@@ -48,12 +47,12 @@ public class CombineFnUtilTest {
   @Rule
   public ExpectedException expectedException = ExpectedException.none();
 
-  KeyedCombineFnWithContext<Integer, Integer, Integer, Integer> mockCombineFn;
+  CombineFnWithContext<Integer, Integer, Integer> mockCombineFn;
 
   @SuppressWarnings("unchecked")
   @Before
   public void setUp() {
-    mockCombineFn = mock(KeyedCombineFnWithContext.class, withSettings().serializable());
+    mockCombineFn = mock(CombineFnWithContext.class, withSettings().serializable());
   }
 
   @Test
@@ -72,10 +71,6 @@ public class CombineFnUtilTest {
     CombineFnWithContext<Integer, int[], Integer> fnWithContext =
         CombineFnUtil.toFnWithContext(Sum.ofIntegers());
     assertTrue(fnWithContext == CombineFnUtil.toFnWithContext(fnWithContext));
-
-    KeyedCombineFnWithContext<Object, Integer, int[], Integer> keyedFnWithContext =
-        CombineFnUtil.toFnWithContext(Sum.ofIntegers().asKeyedFn());
-    assertTrue(keyedFnWithContext == CombineFnUtil.toFnWithContext(keyedFnWithContext));
   }
 
   @Test
@@ -89,14 +84,5 @@ public class CombineFnUtilTest {
       accum = fnWithContext.addInput(accum, i, nullContext);
     }
     assertEquals(10, fnWithContext.extractOutput(accum, nullContext).intValue());
-
-    KeyedCombineFnWithContext<String, Integer, int[], Integer> keyedFnWithContext =
-        CombineFnUtil.toFnWithContext(Sum.ofIntegers().<String>asKeyedFn());
-    String key = "key";
-    accum = keyedFnWithContext.createAccumulator(key, nullContext);
-    for (Integer i : inputs) {
-      accum = keyedFnWithContext.addInput(key, accum, i, nullContext);
-    }
-    assertEquals(10, keyedFnWithContext.extractOutput(key, accum, nullContext).intValue());
   }
 }


Mime
View raw message