beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [1/2] incubator-beam git commit: Replace WindowAssignment OldDoFn by FlatMap in Flink Runner
Date Thu, 24 Nov 2016 14:21:28 GMT
Repository: incubator-beam
Updated Branches:
  refs/heads/master 3e4b2fd0d -> 8d1214a3b


Replace WindowAssignment OldDoFn by FlatMap in Flink Runner

The streaming runner had an OldDoFn that was used for assigning windows
using a WindowFn. This is now done with a FlatMap.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4a097729
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4a097729
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4a097729

Branch: refs/heads/master
Commit: 4a097729ac9fc65283f4f11f85812188589c8df3
Parents: 3e4b2fd
Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Authored: Tue Nov 8 11:03:21 2016 +0100
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Thu Nov 24 11:39:30 2016 +0100

----------------------------------------------------------------------
 .../FlinkStreamingTransformTranslators.java     | 63 +++-----------------
 1 file changed, 9 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4a097729/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
index 40dfbb9..47935eb 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
@@ -18,7 +18,6 @@
 
 package org.apache.beam.runners.flink.translation;
 
-import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import java.nio.ByteBuffer;
@@ -31,6 +30,7 @@ import java.util.Map;
 import java.util.Set;
 import org.apache.beam.runners.core.SystemReduceFn;
 import org.apache.beam.runners.flink.FlinkRunner;
+import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows;
 import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
 import org.apache.beam.runners.flink.translation.types.FlinkCoder;
 import org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator;
@@ -53,7 +53,6 @@ import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.join.RawUnionValue;
@@ -637,64 +636,20 @@ public class FlinkStreamingTransformTranslators {
       TypeInformation<WindowedValue<T>> typeInfo =
           context.getTypeInfo(context.getOutput(transform));
 
-      OldDoFn<T, T> windowAssignerDoFn =
-          createWindowAssigner(windowingStrategy.getWindowFn());
-
-      @SuppressWarnings("unchecked")
-      PCollection<T> inputPCollection = context.getInput(transform);
-
-      TypeInformation<WindowedValue<T>> inputTypeInfo =
-          context.getTypeInfo(inputPCollection);
-
-      DoFnOperator<T, T, WindowedValue<T>> doFnOperator = new DoFnOperator<>(
-          windowAssignerDoFn,
-          inputTypeInfo,
-          new TupleTag<T>("main output"),
-          Collections.<TupleTag<?>>emptyList(),
-          new DoFnOperator.DefaultOutputManagerFactory<WindowedValue<T>>(),
-          windowingStrategy,
-          new HashMap<Integer, PCollectionView<?>>(), /* side-input mapping */
-          Collections.<PCollectionView<?>>emptyList(), /* side inputs */
-          context.getPipelineOptions());
-
       DataStream<WindowedValue<T>> inputDataStream =
           context.getInputDataStream(context.getInput(transform));
 
-      SingleOutputStreamOperator<WindowedValue<T>> outDataStream = inputDataStream
-          .transform(transform.getName(), typeInfo, doFnOperator);
-
-      context.setOutputDataStream(context.getOutput(transform), outDataStream);
-    }
+      WindowFn<T, ? extends BoundedWindow> windowFn = windowingStrategy.getWindowFn();
 
-    private static <T, W extends BoundedWindow> OldDoFn<T, T> createWindowAssigner(
-        final WindowFn<T, W> windowFn) {
+      FlinkAssignWindows<T, ? extends BoundedWindow> assignWindowsFunction =
+          new FlinkAssignWindows<>(windowFn);
 
-      return new OldDoFn<T, T>() {
+      SingleOutputStreamOperator<WindowedValue<T>> outputDataStream = inputDataStream
+          .flatMap(assignWindowsFunction)
+          .name(context.getOutput(transform).getName())
+          .returns(typeInfo);
 
-        @Override
-        public void processElement(final ProcessContext c) throws Exception {
-          Collection<W> windows = windowFn.assignWindows(
-              windowFn.new AssignContext() {
-                @Override
-                public T element() {
-                  return c.element();
-                }
-
-                @Override
-                public Instant timestamp() {
-                  return c.timestamp();
-                }
-
-                @Override
-                public BoundedWindow window() {
-                  return Iterables.getOnlyElement(c.windowingInternals().windows());
-                }
-              });
-
-          c.windowingInternals().outputWindowedValue(
-              c.element(), c.timestamp(), windows, c.pane());
-        }
-      };
+      context.setOutputDataStream(context.getOutput(transform), outputDataStream);
     }
   }
 


Mime
View raw message