beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [1/5] incubator-beam git commit: Access to OnTimerContext via DoFnInvokers.ArgumentProvider
Date Thu, 08 Dec 2016 03:30:11 GMT
Repository: incubator-beam
Updated Branches:
  refs/heads/master 6807480a9 -> 772959447


Access to OnTimerContext via DoFnInvokers.ArgumentProvider


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2883062e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2883062e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2883062e

Branch: refs/heads/master
Commit: 2883062eebe8dba849ab89627f6aeb53266ac1a8
Parents: 42b506f
Author: Kenneth Knowles <klk@google.com>
Authored: Tue Dec 6 20:10:21 2016 -0800
Committer: Kenneth Knowles <klk@google.com>
Committed: Wed Dec 7 19:22:43 2016 -0800

----------------------------------------------------------------------
 .../org/apache/beam/runners/core/SimpleDoFnRunner.java | 13 +++++++++++++
 .../org/apache/beam/runners/core/SplittableParDo.java  |  5 +++++
 .../org/apache/beam/sdk/transforms/DoFnAdapters.java   | 12 ++++++++++++
 .../org/apache/beam/sdk/transforms/DoFnTester.java     |  7 +++++++
 .../beam/sdk/transforms/reflect/DoFnInvoker.java       |  8 ++++++++
 5 files changed, 45 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2883062e/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index 68751f0..0d41a8d 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -35,6 +35,7 @@ import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.DoFn.Context;
 import org.apache.beam.sdk.transforms.DoFn.InputProvider;
+import org.apache.beam.sdk.transforms.DoFn.OnTimerContext;
 import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
 import org.apache.beam.sdk.transforms.DoFn.ProcessContext;
 import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
@@ -403,6 +404,12 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT,
Out
     }
 
     @Override
+    public OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) {
+      throw new UnsupportedOperationException(
+          "Cannot access OnTimerContext outside of @OnTimer methods.");
+    }
+
+    @Override
     public InputProvider<InputT> inputProvider() {
       throw new UnsupportedOperationException("InputProvider is for testing only.");
     }
@@ -589,6 +596,12 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT,
Out
     }
 
     @Override
+    public OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) {
+      throw new UnsupportedOperationException(
+          "Cannot access OnTimerContext outside of @OnTimer methods.");
+    }
+
+    @Override
     public InputProvider<InputT> inputProvider() {
       throw new UnsupportedOperationException("InputProvider parameters are not supported.");
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2883062e/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
index 78f373b..580e842 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
@@ -663,6 +663,11 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
       }
 
       @Override
+      public DoFn.OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) {
+        throw new IllegalStateException("Unexpected extra context access on a splittable
DoFn");
+      }
+
+      @Override
       public DoFn.InputProvider<InputT> inputProvider() {
         // DoFnSignatures should have verified that this DoFn doesn't access extra context.
         throw new IllegalStateException("Unexpected extra context access on a splittable
DoFn");

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2883062e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
index 6ee42e7..e15b08b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
@@ -23,6 +23,7 @@ import javax.annotation.Nullable;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.DoFn.Context;
+import org.apache.beam.sdk.transforms.DoFn.OnTimerContext;
 import org.apache.beam.sdk.transforms.DoFn.ProcessContext;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
@@ -345,6 +346,12 @@ public class DoFnAdapters {
     }
 
     @Override
+    public OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) {
+      throw new UnsupportedOperationException(
+          "Timers are not supported for OldDoFn");
+    }
+
+    @Override
     public WindowingInternals<InputT, OutputT> windowingInternals() {
       // The OldDoFn doesn't allow us to ask for these outside ProcessElements, so this
       // should be unreachable.
@@ -460,6 +467,11 @@ public class DoFnAdapters {
     }
 
     @Override
+    public OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) {
+      throw new UnsupportedOperationException("Timers are not supported for OldDoFn");
+    }
+
+    @Override
     public WindowingInternals<InputT, OutputT> windowingInternals() {
       return context.windowingInternals();
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2883062e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
index 43896c5..93b3f59 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
@@ -35,6 +35,7 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.ValueInSingleWindow;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.DoFn.OnTimerContext;
 import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
 import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
@@ -316,6 +317,12 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable
{
             }
 
             @Override
+            public OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) {
+              throw new UnsupportedOperationException(
+                  "DoFnTester doesn't support timers yet.");
+            }
+
+            @Override
             public DoFn.InputProvider<InputT> inputProvider() {
               throw new UnsupportedOperationException(
                   "Not expected to access InputProvider from DoFnTester");

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2883062e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
index 5e61bdd..97ac9d3 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
@@ -112,6 +112,9 @@ public interface DoFnInvoker<InputT, OutputT> {
     /** Provide a {@link DoFn.ProcessContext} to use with the given {@link DoFn}. */
     DoFn<InputT, OutputT>.ProcessContext processContext(DoFn<InputT, OutputT>
doFn);
 
+    /** Provide a {@link DoFn.OnTimerContext} to use with the given {@link DoFn}. */
+    DoFn<InputT, OutputT>.OnTimerContext onTimerContext(DoFn<InputT, OutputT>
doFn);
+
     /** A placeholder for testing purposes. */
     InputProvider<InputT> inputProvider();
 
@@ -162,6 +165,11 @@ public interface DoFnInvoker<InputT, OutputT> {
     }
 
     @Override
+    public DoFn<InputT, OutputT>.OnTimerContext onTimerContext(DoFn<InputT, OutputT>
doFn) {
+      return null;
+    }
+
+    @Override
     public InputProvider<InputT> inputProvider() {
       return null;
     }


Mime
View raw message