beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [43/50] incubator-beam git commit: Reject stateful DoFn in FlinkRunner
Date Wed, 23 Nov 2016 06:52:39 GMT
Reject stateful DoFn in FlinkRunner


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9c2d5da7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9c2d5da7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9c2d5da7

Branch: refs/heads/gearpump-runner
Commit: 9c2d5da7c659a2603d37c492ff44f4a9cda387fe
Parents: 7949b70
Author: Kenneth Knowles <klk@google.com>
Authored: Tue Nov 15 21:33:28 2016 -0800
Committer: Kenneth Knowles <klk@google.com>
Committed: Tue Nov 22 10:52:39 2016 -0800

----------------------------------------------------------------------
 runners/flink/runner/pom.xml                    |  1 +
 .../FlinkBatchTransformTranslators.java         | 34 +++++++++++++++++---
 .../FlinkStreamingTransformTranslators.java     | 25 +++++++++++++-
 3 files changed, 55 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9c2d5da7/runners/flink/runner/pom.xml
----------------------------------------------------------------------
diff --git a/runners/flink/runner/pom.xml b/runners/flink/runner/pom.xml
index adcb3de..c060c25 100644
--- a/runners/flink/runner/pom.xml
+++ b/runners/flink/runner/pom.xml
@@ -53,6 +53,7 @@
                 </goals>
                 <configuration>
                   <groups>org.apache.beam.sdk.testing.RunnableOnService</groups>
+                  <excludedGroups>org.apache.beam.sdk.testing.UsesStatefulParDo</excludedGroups>
                   <parallel>none</parallel>
                   <failIfNoTests>true</failIfNoTests>
                   <dependenciesToScan>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9c2d5da7/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
index 935a9ac..474d4e3 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
@@ -24,6 +24,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import org.apache.beam.runners.flink.FlinkRunner;
 import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows;
 import org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction;
 import org.apache.beam.runners.flink.translation.functions.FlinkMergingNonShuffleReduceFunction;
@@ -46,6 +47,7 @@ import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.CombineFnBase;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.OldDoFn;
@@ -54,6 +56,7 @@ import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.join.RawUnionValue;
 import org.apache.beam.sdk.transforms.join.UnionCoder;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
@@ -487,11 +490,23 @@ class FlinkBatchTransformTranslators {
     @Override
     public void translateNode(
         ParDo.Bound<InputT, OutputT> transform,
+
         FlinkBatchTranslationContext context) {
+      DoFn<InputT, OutputT> doFn = transform.getNewFn();
+      if (DoFnSignatures.getSignature(doFn.getClass()).stateDeclarations().size() > 0)
{
+        throw new UnsupportedOperationException(
+            String.format(
+                "Found %s annotations on %s, but %s cannot yet be used with state in the
%s.",
+                DoFn.StateId.class.getSimpleName(),
+                doFn.getClass().getName(),
+                DoFn.class.getSimpleName(),
+                FlinkRunner.class.getSimpleName()));
+      }
+
       DataSet<WindowedValue<InputT>> inputDataSet =
           context.getInputDataSet(context.getInput(transform));
 
-      final OldDoFn<InputT, OutputT> doFn = transform.getFn();
+      final OldDoFn<InputT, OutputT> oldDoFn = transform.getFn();
 
       TypeInformation<WindowedValue<OutputT>> typeInformation =
           context.getTypeInfo(context.getOutput(transform));
@@ -507,7 +522,7 @@ class FlinkBatchTransformTranslators {
 
       FlinkDoFnFunction<InputT, OutputT> doFnWrapper =
           new FlinkDoFnFunction<>(
-              doFn,
+              oldDoFn,
               context.getOutput(transform).getWindowingStrategy(),
               sideInputStrategies,
               context.getPipelineOptions());
@@ -533,10 +548,21 @@ class FlinkBatchTransformTranslators {
     public void translateNode(
         ParDo.BoundMulti<InputT, OutputT> transform,
         FlinkBatchTranslationContext context) {
+      DoFn<InputT, OutputT> doFn = transform.getNewFn();
+      if (DoFnSignatures.getSignature(doFn.getClass()).stateDeclarations().size() > 0)
{
+        throw new UnsupportedOperationException(
+            String.format(
+                "Found %s annotations on %s, but %s cannot yet be used with state in the
%s.",
+                DoFn.StateId.class.getSimpleName(),
+                doFn.getClass().getName(),
+                DoFn.class.getSimpleName(),
+                FlinkRunner.class.getSimpleName()));
+      }
+
       DataSet<WindowedValue<InputT>> inputDataSet =
           context.getInputDataSet(context.getInput(transform));
 
-      final OldDoFn<InputT, OutputT> doFn = transform.getFn();
+      final OldDoFn<InputT, OutputT> oldDoFn = transform.getFn();
 
       Map<TupleTag<?>, PCollection<?>> outputs = context.getOutput(transform).getAll();
 
@@ -584,7 +610,7 @@ class FlinkBatchTransformTranslators {
       @SuppressWarnings("unchecked")
       FlinkMultiOutputDoFnFunction<InputT, OutputT> doFnWrapper =
           new FlinkMultiOutputDoFnFunction(
-              doFn,
+              oldDoFn,
               windowingStrategy,
               sideInputStrategies,
               context.getPipelineOptions(),

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9c2d5da7/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
index 687e9c8..40dfbb9 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
@@ -21,7 +21,6 @@ package org.apache.beam.runners.flink.translation;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -51,6 +50,7 @@ import org.apache.beam.sdk.io.Sink;
 import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.io.Write;
 import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.OldDoFn;
@@ -58,6 +58,7 @@ import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.join.RawUnionValue;
 import org.apache.beam.sdk.transforms.join.UnionCoder;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
@@ -311,6 +312,17 @@ public class FlinkStreamingTransformTranslators {
         ParDo.Bound<InputT, OutputT> transform,
         FlinkStreamingTranslationContext context) {
 
+      DoFn<InputT, OutputT> doFn = transform.getNewFn();
+      if (DoFnSignatures.getSignature(doFn.getClass()).stateDeclarations().size() > 0)
{
+        throw new UnsupportedOperationException(
+            String.format(
+                "Found %s annotations on %s, but %s cannot yet be used with state in the
%s.",
+                DoFn.StateId.class.getSimpleName(),
+                doFn.getClass().getName(),
+                DoFn.class.getSimpleName(),
+                FlinkRunner.class.getSimpleName()));
+      }
+
       WindowingStrategy<?, ?> windowingStrategy =
           context.getOutput(transform).getWindowingStrategy();
 
@@ -460,6 +472,17 @@ public class FlinkStreamingTransformTranslators {
         ParDo.BoundMulti<InputT, OutputT> transform,
         FlinkStreamingTranslationContext context) {
 
+      DoFn<InputT, OutputT> doFn = transform.getNewFn();
+      if (DoFnSignatures.getSignature(doFn.getClass()).stateDeclarations().size() > 0)
{
+        throw new UnsupportedOperationException(
+            String.format(
+                "Found %s annotations on %s, but %s cannot yet be used with state in the
%s.",
+                DoFn.StateId.class.getSimpleName(),
+                doFn.getClass().getName(),
+                DoFn.class.getSimpleName(),
+                FlinkRunner.class.getSimpleName()));
+      }
+
       // we assume that the transformation does not change the windowing strategy.
       WindowingStrategy<?, ?> windowingStrategy =
           context.getInput(transform).getWindowingStrategy();


Mime
View raw message