beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tg...@apache.org
Subject [2/2] incubator-beam git commit: Implement GetDefaultOutputCoder in DirectGroupByKey
Date Wed, 14 Dec 2016 19:30:10 GMT
Implement GetDefaultOutputCoder in DirectGroupByKey

This uses the standard Coder Inference path to set coders, rather than
explicitly setting the output coders for intermediate PCollections.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4cbccee8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4cbccee8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4cbccee8

Branch: refs/heads/master
Commit: 4cbccee8ee9a3b4235c6338fe49efc1f8a079812
Parents: 5a51ace
Author: Thomas Groh <tgroh@google.com>
Authored: Mon Dec 12 13:55:49 2016 -0800
Committer: Thomas Groh <tgroh@google.com>
Committed: Wed Dec 14 11:29:29 2016 -0800

----------------------------------------------------------------------
 .../beam/runners/direct/DirectGroupByKey.java   | 36 +++++++++++---------
 1 file changed, 20 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4cbccee8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
index 405d913..6c10bd2 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
@@ -22,6 +22,7 @@ import static com.google.common.base.Preconditions.checkArgument;
 
 import org.apache.beam.runners.core.KeyedWorkItem;
 import org.apache.beam.runners.core.KeyedWorkItemCoder;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.KvCoder;
@@ -46,9 +47,6 @@ class DirectGroupByKey<K, V>
 
   @Override
   public PCollection<KV<K, Iterable<V>>> expand(PCollection<KV<K,
V>> input) {
-    @SuppressWarnings("unchecked")
-    KvCoder<K, V> inputCoder = (KvCoder<K, V>) input.getCoder();
-
     // This operation groups by the combination of key and window,
     // merging windows as needed, using the windows assigned to the
     // key/value input elements and the window merge operation of the
@@ -61,19 +59,11 @@ class DirectGroupByKey<K, V>
     // By default, implement GroupByKey via a series of lower-level operations.
     return input
         .apply(new DirectGroupByKeyOnly<K, V>())
-        .setCoder(
-            KeyedWorkItemCoder.of(
-                inputCoder.getKeyCoder(),
-                inputCoder.getValueCoder(),
-                inputWindowingStrategy.getWindowFn().windowCoder()))
 
         // Group each key's values by window, merging windows as needed.
         .apply(
             "GroupAlsoByWindow",
-            new DirectGroupAlsoByWindow<K, V>(inputWindowingStrategy, outputWindowingStrategy))
-
-        .setCoder(
-            KvCoder.of(inputCoder.getKeyCoder(), IterableCoder.of(inputCoder.getValueCoder())));
+            new DirectGroupAlsoByWindow<K, V>(inputWindowingStrategy, outputWindowingStrategy));
   }
 
   static final class DirectGroupByKeyOnly<K, V>
@@ -85,6 +75,16 @@ class DirectGroupByKey<K, V>
     }
 
     DirectGroupByKeyOnly() {}
+
+    @Override
+    protected Coder<?> getDefaultOutputCoder(
+        @SuppressWarnings("unused") PCollection<KV<K, V>> input)
+        throws CannotProvideCoderException {
+      return KeyedWorkItemCoder.of(
+          GroupByKey.getKeyCoder(input.getCoder()),
+          GroupByKey.getInputValueCoder(input.getCoder()),
+          input.getWindowingStrategy().getWindowFn().windowCoder());
+    }
   }
 
   static final class DirectGroupAlsoByWindow<K, V>
@@ -117,15 +117,19 @@ class DirectGroupByKey<K, V>
       return kvCoder;
     }
 
-    public Coder<K> getKeyCoder(Coder<KeyedWorkItem<K, V>> inputCoder)
{
-      return getKeyedWorkItemCoder(inputCoder).getKeyCoder();
-    }
-
     public Coder<V> getValueCoder(Coder<KeyedWorkItem<K, V>> inputCoder)
{
       return getKeyedWorkItemCoder(inputCoder).getElementCoder();
     }
 
     @Override
+    protected Coder<?> getDefaultOutputCoder(
+        @SuppressWarnings("unused") PCollection<KeyedWorkItem<K, V>> input)
+        throws CannotProvideCoderException {
+      KeyedWorkItemCoder<K, V> inputCoder = getKeyedWorkItemCoder(input.getCoder());
+      return KvCoder.of(inputCoder.getKeyCoder(), IterableCoder.of(inputCoder.getElementCoder()));
+    }
+
+    @Override
     public PCollection<KV<K, Iterable<V>>> expand(PCollection<KeyedWorkItem<K,
V>> input) {
       return PCollection.createPrimitiveOutputInternal(
           input.getPipeline(), outputWindowingStrategy, input.isBounded());


Mime
View raw message