beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [10/13] incubator-beam git commit: Reject timers for ParDo in SparkRunner
Date Thu, 08 Dec 2016 18:09:47 GMT
Reject timers for ParDo in SparkRunner


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/29f3af30
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/29f3af30
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/29f3af30

Branch: refs/heads/master
Commit: 29f3af30a4b871244e14998d670b8ca26bd8de94
Parents: 69e0ea2
Author: Kenneth Knowles <klk@google.com>
Authored: Wed Dec 7 20:35:08 2016 -0800
Committer: Kenneth Knowles <klk@google.com>
Committed: Thu Dec 8 09:53:08 2016 -0800

----------------------------------------------------------------------
 .../spark/translation/TransformTranslator.java  | 46 ++++++++++++--------
 1 file changed, 27 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/29f3af30/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
index e033ab1..8170366 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
@@ -58,6 +58,7 @@ import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.Window;
@@ -81,7 +82,6 @@ import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.api.java.function.Function;
 import org.apache.spark.api.java.function.PairFunction;
-
 import scala.Tuple2;
 
 
@@ -228,20 +228,36 @@ public final class TransformTranslator {
     };
   }
 
+  private static void rejectStateAndTimers(DoFn<?, ?> doFn) {
+    DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass());
+
+    if (signature.stateDeclarations().size() > 0) {
+      throw new UnsupportedOperationException(
+          String.format(
+              "Found %s annotations on %s, but %s cannot yet be used with state in the %s.",
+              DoFn.StateId.class.getSimpleName(),
+              doFn.getClass().getName(),
+              DoFn.class.getSimpleName(),
+              SparkRunner.class.getSimpleName()));
+    }
+
+    if (signature.timerDeclarations().size() > 0) {
+      throw new UnsupportedOperationException(
+          String.format(
+              "Found %s annotations on %s, but %s cannot yet be used with timers in the %s.",
+              DoFn.TimerId.class.getSimpleName(),
+              doFn.getClass().getName(),
+              DoFn.class.getSimpleName(),
+              SparkRunner.class.getSimpleName()));
+    }
+  }
+
   private static <InputT, OutputT> TransformEvaluator<ParDo.Bound<InputT, OutputT>>
parDo() {
     return new TransformEvaluator<ParDo.Bound<InputT, OutputT>>() {
       @Override
       public void evaluate(ParDo.Bound<InputT, OutputT> transform, EvaluationContext
context) {
         DoFn<InputT, OutputT> doFn = transform.getNewFn();
-        if (DoFnSignatures.getSignature(doFn.getClass()).stateDeclarations().size() >
0) {
-          throw new UnsupportedOperationException(
-              String.format(
-                  "Found %s annotations on %s, but %s cannot yet be used with state in the
%s.",
-                  DoFn.StateId.class.getSimpleName(),
-                  doFn.getClass().getName(),
-                  DoFn.class.getSimpleName(),
-                  SparkRunner.class.getSimpleName()));
-        }
+        rejectStateAndTimers(doFn);
         @SuppressWarnings("unchecked")
         JavaRDD<WindowedValue<InputT>> inRDD =
             ((BoundedDataset<InputT>) context.borrowDataset(transform)).getRDD();
@@ -265,15 +281,7 @@ public final class TransformTranslator {
       @Override
       public void evaluate(ParDo.BoundMulti<InputT, OutputT> transform, EvaluationContext
context) {
         DoFn<InputT, OutputT> doFn = transform.getNewFn();
-        if (DoFnSignatures.getSignature(doFn.getClass()).stateDeclarations().size() >
0) {
-          throw new UnsupportedOperationException(
-              String.format(
-                  "Found %s annotations on %s, but %s cannot yet be used with state in the
%s.",
-                  DoFn.StateId.class.getSimpleName(),
-                  doFn.getClass().getName(),
-                  DoFn.class.getSimpleName(),
-                  SparkRunner.class.getSimpleName()));
-        }
+        rejectStateAndTimers(doFn);
         @SuppressWarnings("unchecked")
         JavaRDD<WindowedValue<InputT>> inRDD =
             ((BoundedDataset<InputT>) context.borrowDataset(transform)).getRDD();


Mime
View raw message