beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [5/6] incubator-beam git commit: Reject all timers in ParDo, for now
Date Thu, 20 Oct 2016 01:02:14 GMT
Reject all timers in ParDo, for now


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f9712f2b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f9712f2b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f9712f2b

Branch: refs/heads/master
Commit: f9712f2bacb9aac9d5df5c6021bb3cfb59758806
Parents: ccefc6f
Author: Kenneth Knowles <klk@google.com>
Authored: Tue Oct 18 13:09:57 2016 -0700
Committer: Kenneth Knowles <klk@google.com>
Committed: Wed Oct 19 17:52:21 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/transforms/ParDo.java | 17 +++++++++++++++++
 1 file changed, 17 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f9712f2b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
index 776f768..8aa87e4 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
@@ -563,12 +563,29 @@ public class ParDo {
               DoFn.class.getSimpleName()));
     }
 
+    // To be removed when the features are complete and runners have their own adequate
+    // rejection logic
+    if (!signature.timerDeclarations().isEmpty()) {
+      throw new UnsupportedOperationException(
+          String.format("Found %s annotations on %s, but %s cannot yet be used with timers.",
+              DoFn.TimerId.class.getSimpleName(),
+              fn.getClass().getName(),
+              DoFn.class.getSimpleName()));
+    }
+
     // State is semantically incompatible with splitting
     if (!signature.stateDeclarations().isEmpty() && signature.processElement().isSplittable())
{
       throw new UnsupportedOperationException(
           String.format("%s is splittable and uses state, but these are not compatible",
               fn.getClass().getName()));
     }
+
+    // Timers are semantically incompatible with splitting
+    if (!signature.timerDeclarations().isEmpty() && signature.processElement().isSplittable())
{
+      throw new UnsupportedOperationException(
+          String.format("%s is splittable and uses timers, but these are not compatible",
+              fn.getClass().getName()));
+    }
   }
 
   private static <InputT, OutputT> OldDoFn<InputT, OutputT> adapt(DoFn<InputT,
OutputT> fn) {


Mime
View raw message