beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [05/13] incubator-beam git commit: Reject timers for ParDo in ApexRunner
Date Thu, 08 Dec 2016 18:09:42 GMT
Reject timers for ParDo in ApexRunner


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/18db3ace
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/18db3ace
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/18db3ace

Branch: refs/heads/master
Commit: 18db3ace77e89203d7ec3f342fe6ce24a2119226
Parents: 50ffc7b
Author: Kenneth Knowles <klk@google.com>
Authored: Wed Dec 7 20:34:34 2016 -0800
Committer: Kenneth Knowles <klk@google.com>
Committed: Thu Dec 8 09:53:08 2016 -0800

----------------------------------------------------------------------
 .../apex/translation/ParDoBoundMultiTranslator.java | 16 +++++++++++++++-
 .../apex/translation/ParDoBoundTranslator.java      | 16 +++++++++++++++-
 2 files changed, 30 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/18db3ace/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundMultiTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundMultiTranslator.java
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundMultiTranslator.java
index fed5f4b..706482a 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundMultiTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundMultiTranslator.java
@@ -33,6 +33,7 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
 import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
 import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
@@ -54,7 +55,9 @@ class ParDoBoundMultiTranslator<InputT, OutputT>
   @Override
   public void translate(ParDo.BoundMulti<InputT, OutputT> transform, TranslationContext
context) {
     DoFn<InputT, OutputT> doFn = transform.getNewFn();
-    if (DoFnSignatures.getSignature(doFn.getClass()).stateDeclarations().size() > 0) {
+    DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass());
+
+    if (signature.stateDeclarations().size() > 0) {
       throw new UnsupportedOperationException(
           String.format(
               "Found %s annotations on %s, but %s cannot yet be used with state in the %s.",
@@ -63,6 +66,17 @@ class ParDoBoundMultiTranslator<InputT, OutputT>
               DoFn.class.getSimpleName(),
               ApexRunner.class.getSimpleName()));
     }
+
+    if (signature.timerDeclarations().size() > 0) {
+      throw new UnsupportedOperationException(
+          String.format(
+              "Found %s annotations on %s, but %s cannot yet be used with timers in the %s.",
+              DoFn.TimerId.class.getSimpleName(),
+              doFn.getClass().getName(),
+              DoFn.class.getSimpleName(),
+              ApexRunner.class.getSimpleName()));
+    }
+
     OldDoFn<InputT, OutputT> oldDoFn = transform.getFn();
     PCollectionTuple output = context.getOutput();
     PCollection<InputT> input = context.getInput();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/18db3ace/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslator.java
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslator.java
index 7a918a7..b5a50f6 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslator.java
@@ -25,6 +25,7 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
 import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
 import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
@@ -41,7 +42,9 @@ class ParDoBoundTranslator<InputT, OutputT>
   @Override
   public void translate(ParDo.Bound<InputT, OutputT> transform, TranslationContext
context) {
     DoFn<InputT, OutputT> doFn = transform.getNewFn();
-    if (DoFnSignatures.getSignature(doFn.getClass()).stateDeclarations().size() > 0) {
+    DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass());
+
+    if (signature.stateDeclarations().size() > 0) {
       throw new UnsupportedOperationException(
           String.format(
               "Found %s annotations on %s, but %s cannot yet be used with state in the %s.",
@@ -50,6 +53,17 @@ class ParDoBoundTranslator<InputT, OutputT>
               DoFn.class.getSimpleName(),
               ApexRunner.class.getSimpleName()));
     }
+
+    if (signature.timerDeclarations().size() > 0) {
+      throw new UnsupportedOperationException(
+          String.format(
+              "Found %s annotations on %s, but %s cannot yet be used with timers in the %s.",
+              DoFn.TimerId.class.getSimpleName(),
+              doFn.getClass().getName(),
+              DoFn.class.getSimpleName(),
+              ApexRunner.class.getSimpleName()));
+    }
+
     OldDoFn<InputT, OutputT> oldDoFn = transform.getOldFn();
     PCollection<OutputT> output = context.getOutput();
     PCollection<InputT> input = context.getInput();


Mime
View raw message