beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [1/2] incubator-beam git commit: [BEAM-80] Enable combiner lifting for combine with contexts
Date Thu, 17 Mar 2016 02:24:00 GMT
Repository: incubator-beam
Updated Branches:
  refs/heads/master 6ba288d67 -> c199f0854


[BEAM-80] Enable combiner lifting for combine with contexts


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/06b18fde
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/06b18fde
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/06b18fde

Branch: refs/heads/master
Commit: 06b18fde6ec2d092a9733e5bfcfa63de3cf00833
Parents: b2b5f42
Author: Pei He <peihe0@gmail.com>
Authored: Thu Mar 10 14:17:36 2016 -0800
Committer: Pei He <peihe0@gmail.com>
Committed: Thu Mar 10 14:17:36 2016 -0800

----------------------------------------------------------------------
 .../sdk/runners/DataflowPipelineTranslator.java   |  3 +++
 .../cloud/dataflow/sdk/transforms/Combine.java    | 18 +++---------------
 .../cloud/dataflow/sdk/util/PropertyNames.java    |  1 +
 3 files changed, 7 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/06b18fde/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java
index d0cc4e5..0feae95 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java
@@ -952,6 +952,9 @@ public class DataflowPipelineTranslator {
             context.addInput(
                 PropertyNames.SERIALIZED_FN,
                 byteArrayToJsonString(serializeToByteArray(windowingStrategy)));
+            context.addInput(
+                PropertyNames.IS_MERGING_WINDOW_FN,
+                !windowingStrategy.getWindowFn().isNonMerging());
           }
         });
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/06b18fde/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Combine.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Combine.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Combine.java
index cc0347a..b8d20e3 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Combine.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Combine.java
@@ -1690,21 +1690,9 @@ public class Combine {
 
     @Override
     public PCollection<KV<K, OutputT>> apply(PCollection<KV<K, InputT>>
input) {
-      if (fn instanceof RequiresContextInternal) {
-        return input
-            .apply(GroupByKey.<K, InputT>create(fewKeys))
-            .apply(ParDo.of(new DoFn<KV<K, Iterable<InputT>>, KV<K, Iterable<InputT>>>()
{
-              @Override
-              public void processElement(ProcessContext c) throws Exception {
-                c.output(c.element());
-              }
-            }))
-            .apply(Combine.<K, InputT, OutputT>groupedValues(fn).withSideInputs(sideInputs));
-      } else {
-        return input
-            .apply(GroupByKey.<K, InputT>create(fewKeys))
-            .apply(Combine.<K, InputT, OutputT>groupedValues(fn).withSideInputs(sideInputs));
-      }
+      return input
+          .apply(GroupByKey.<K, InputT>create(fewKeys))
+          .apply(Combine.<K, InputT, OutputT>groupedValues(fn).withSideInputs(sideInputs));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/06b18fde/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PropertyNames.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PropertyNames.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PropertyNames.java
index 5611fab..ec65189 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PropertyNames.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PropertyNames.java
@@ -65,6 +65,7 @@ public class PropertyNames {
   public static final String INPUTS = "inputs";
   public static final String INPUT_CODER = "input_coder";
   public static final String IS_GENERATED = "is_generated";
+  public static final String IS_MERGING_WINDOW_FN = "is_merging_window_fn";
   public static final String IS_PAIR_LIKE = "is_pair_like";
   public static final String IS_STREAM_LIKE = "is_stream_like";
   public static final String IS_WRAPPER = "is_wrapper";


Mime
View raw message