beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [10/50] incubator-beam git commit: Move DoFn.ArgumentProvider to DoFnInvoker.ArgumentProvider
Date Wed, 23 Nov 2016 06:52:06 GMT
Move DoFn.ArgumentProvider to DoFnInvoker.ArgumentProvider

The arguments provided as a single object are an aspect of the
DoFnInvoker, not the DoFn. The DoFn itself is a specification
that may have other ways of being invoked, depending on the
circumstance.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/33fb8c2d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/33fb8c2d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/33fb8c2d

Branch: refs/heads/gearpump-runner
Commit: 33fb8c2db8c64275f1b9b8ac6dfd12e92d7fb777
Parents: bb9c386
Author: Kenneth Knowles <klk@google.com>
Authored: Thu Nov 17 23:04:55 2016 -0800
Committer: Kenneth Knowles <klk@google.com>
Committed: Fri Nov 18 14:20:20 2016 -0800

----------------------------------------------------------------------
 .../beam/runners/core/SimpleDoFnRunner.java     |   4 +-
 .../beam/runners/core/SplittableParDo.java      |   7 +-
 .../org/apache/beam/sdk/transforms/DoFn.java    | 122 -------------------
 .../beam/sdk/transforms/DoFnAdapters.java       |  10 +-
 .../reflect/ByteBuddyDoFnInvokerFactory.java    |  41 +++----
 .../sdk/transforms/reflect/DoFnInvoker.java     | 121 +++++++++++++++++-
 .../sdk/transforms/reflect/DoFnInvokers.java    |   4 +-
 .../sdk/transforms/reflect/OnTimerInvoker.java  |   8 +-
 .../transforms/reflect/DoFnInvokersTest.java    |   5 +-
 .../transforms/reflect/OnTimerInvokersTest.java |   2 +-
 .../transforms/DoFnInvokersBenchmark.java       |   5 +-
 11 files changed, 161 insertions(+), 168 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/33fb8c2d/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index 76aae8f..841e412 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -183,7 +183,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT,
Out
    * @param <OutputT> the type of the {@link DoFn} (main) output elements
    */
   private static class DoFnContext<InputT, OutputT> extends DoFn<InputT, OutputT>.Context
-      implements DoFn.ArgumentProvider<InputT, OutputT> {
+      implements DoFnInvoker.ArgumentProvider<InputT, OutputT> {
     private static final int MAX_SIDE_OUTPUTS = 1000;
 
     final PipelineOptions options;
@@ -424,7 +424,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT,
Out
    * @param <OutputT> the type of the {@link DoFn} (main) output elements
    */
   private class DoFnProcessContext<InputT, OutputT> extends DoFn<InputT, OutputT>.ProcessContext
-      implements DoFn.ArgumentProvider<InputT, OutputT> {
+      implements DoFnInvoker.ArgumentProvider<InputT, OutputT> {
 
     final DoFn<InputT, OutputT> fn;
     final DoFnContext<InputT, OutputT> context;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/33fb8c2d/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
index 3003984..c38ab2f 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
@@ -392,10 +392,11 @@ public class SplittableParDo<
     }
 
     /**
-     * Creates an {@link DoFn.ArgumentProvider} that provides the given tracker as well as
the given
+     * Creates an {@link DoFnInvoker.ArgumentProvider} that provides the given tracker as
well as
+     * the given
      * {@link ProcessContext} (which is also provided when a {@link Context} is requested.
      */
-    private DoFn.ArgumentProvider<InputT, OutputT> wrapTracker(
+    private DoFnInvoker.ArgumentProvider<InputT, OutputT> wrapTracker(
         TrackerT tracker, DoFn<InputT, OutputT>.ProcessContext processContext) {
 
       return new ArgumentProviderForTracker<>(tracker, processContext);
@@ -403,7 +404,7 @@ public class SplittableParDo<
 
     private static class ArgumentProviderForTracker<
             InputT, OutputT, TrackerT extends RestrictionTracker<?>>
-        implements DoFn.ArgumentProvider<InputT, OutputT> {
+        implements DoFnInvoker.ArgumentProvider<InputT, OutputT> {
       private final TrackerT tracker;
       private final DoFn<InputT, OutputT>.ProcessContext processContext;
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/33fb8c2d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
index bf0631b..9978ef4 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
@@ -38,13 +38,11 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.HasDisplayData;
-import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.Timer;
 import org.apache.beam.sdk.util.TimerSpec;
-import org.apache.beam.sdk.util.WindowingInternals;
 import org.apache.beam.sdk.util.state.State;
 import org.apache.beam.sdk.util.state.StateSpec;
 import org.apache.beam.sdk.values.PCollection;
@@ -331,78 +329,6 @@ public abstract class DoFn<InputT, OutputT> implements Serializable,
HasDisplayD
     return new TypeDescriptor<OutputT>(getClass()) {};
   }
 
-  /**
-   * Interface for runner implementors to provide implementations of extra context information.
-   *
-   * <p>The methods on this interface are called by {@link DoFnInvoker} before invoking
an
-   * annotated {@link StartBundle}, {@link ProcessElement} or {@link FinishBundle} method
that
-   * has indicated it needs the given extra context.
-   *
-   * <p>In the case of {@link ProcessElement} it is called once per invocation of
-   * {@link ProcessElement}.
-   */
-  public interface ArgumentProvider<InputT, OutputT> {
-    /**
-     * Construct the {@link BoundedWindow} to use within a {@link DoFn} that
-     * needs it. This is called if the {@link ProcessElement} method has a parameter of type
-     * {@link BoundedWindow}.
-     *
-     * @return {@link BoundedWindow} of the element currently being processed.
-     */
-    BoundedWindow window();
-
-    /**
-     * Provide a {@link DoFn.Context} to use with the given {@link DoFn}.
-     */
-    DoFn<InputT, OutputT>.Context context(DoFn<InputT, OutputT> doFn);
-
-    /**
-     * Provide a {@link DoFn.ProcessContext} to use with the given {@link DoFn}.
-     */
-    DoFn<InputT, OutputT>.ProcessContext processContext(DoFn<InputT, OutputT>
doFn);
-
-    /**
-     * A placeholder for testing purposes.
-     */
-    InputProvider<InputT> inputProvider();
-
-    /**
-     * A placeholder for testing purposes.
-     */
-    OutputReceiver<OutputT> outputReceiver();
-
-    /**
-     * For migration from {@link OldDoFn} to {@link DoFn}, provide
-     * a {@link WindowingInternals} so an {@link OldDoFn} can be run
-     * via {@link DoFnInvoker}.
-     *
-     * <p>This is <i>not</i> exposed via the reflective capabilities
-     * of {@link DoFn}.
-     *
-     * @deprecated Please port occurences of {@link OldDoFn} to {@link DoFn}. If they require
-     * state and timers, they will need to wait for the arrival of those features. Do not
introduce
-     * new uses of this method.
-     */
-    @Deprecated
-    WindowingInternals<InputT, OutputT> windowingInternals();
-
-    /**
-     * If this is a splittable {@link DoFn}, returns the {@link RestrictionTracker} associated
with
-     * the current {@link ProcessElement} call.
-     */
-    <RestrictionT> RestrictionTracker<RestrictionT> restrictionTracker();
-
-    /**
-     * Returns the state cell for the given {@link StateId}.
-     */
-    State state(String stateId);
-
-    /**
-     * Returns the timer for the given {@link TimerId}.
-     */
-    Timer timer(String timerId);
-  }
-
   /** Receives values of the given type. */
   public interface OutputReceiver<T> {
     void output(T output);
@@ -413,54 +339,6 @@ public abstract class DoFn<InputT, OutputT> implements Serializable,
HasDisplayD
     T get();
   }
 
-  /** For testing only, this {@link ArgumentProvider} returns {@code null} for all parameters.
*/
-  public static class FakeArgumentProvider<InputT, OutputT>
-      implements ArgumentProvider<InputT, OutputT> {
-    @Override
-    public DoFn<InputT, OutputT>.ProcessContext processContext(DoFn<InputT, OutputT>
doFn) {
-      return null;
-    }
-
-    @Override
-    public BoundedWindow window() {
-      return null;
-    }
-
-    @Override
-    public DoFn<InputT, OutputT>.Context context(DoFn<InputT, OutputT> doFn)
{
-      return null;
-    }
-
-    @Override
-    public InputProvider<InputT> inputProvider() {
-      return null;
-    }
-
-    @Override
-    public OutputReceiver<OutputT> outputReceiver() {
-      return null;
-    }
-
-    @Override
-    public WindowingInternals<InputT, OutputT> windowingInternals() {
-      return null;
-    }
-
-    @Override
-    public State state(String stateId) {
-      return null;
-    }
-
-    @Override
-    public Timer timer(String timerId) {
-      return null;
-    }
-
-    public <RestrictionT> RestrictionTracker<RestrictionT> restrictionTracker()
{
-      return null;
-    }
-  }
-
   /////////////////////////////////////////////////////////////////////////////
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/33fb8c2d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
index 71a6d1d..a3466bb 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
@@ -77,7 +77,7 @@ public class DoFnAdapters {
   public static <InputT, OutputT> OldDoFn<InputT, OutputT>.ProcessContext adaptProcessContext(
       OldDoFn<InputT, OutputT> fn,
       final DoFn<InputT, OutputT>.ProcessContext c,
-      final DoFn.ArgumentProvider<InputT, OutputT> extra) {
+      final DoFnInvoker.ArgumentProvider<InputT, OutputT> extra) {
     return fn.new ProcessContext() {
       @Override
       public InputT element() {
@@ -270,12 +270,12 @@ public class DoFnAdapters {
   }
 
   /**
-   * Wraps an {@link OldDoFn.Context} as a {@link DoFn.ArgumentProvider} inside a {@link
+   * Wraps an {@link OldDoFn.Context} as a {@link DoFnInvoker.ArgumentProvider} inside a
{@link
    * DoFn.StartBundle} or {@link DoFn.FinishBundle} method, which means the extra context
is
    * unavailable.
    */
   private static class ContextAdapter<InputT, OutputT> extends DoFn<InputT, OutputT>.Context
-      implements DoFn.ArgumentProvider<InputT, OutputT> {
+      implements DoFnInvoker.ArgumentProvider<InputT, OutputT> {
 
     private OldDoFn<InputT, OutputT>.Context context;
 
@@ -371,11 +371,11 @@ public class DoFnAdapters {
   }
 
   /**
-   * Wraps an {@link OldDoFn.ProcessContext} as a {@link DoFn.ArgumentProvider} method.
+   * Wraps an {@link OldDoFn.ProcessContext} as a {@link DoFnInvoker.ArgumentProvider} method.
    */
   private static class ProcessContextAdapter<InputT, OutputT>
       extends DoFn<InputT, OutputT>.ProcessContext
-      implements DoFn.ArgumentProvider<InputT, OutputT> {
+      implements DoFnInvoker.ArgumentProvider<InputT, OutputT> {
 
     private OldDoFn<InputT, OutputT>.ProcessContext context;
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/33fb8c2d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
index bc6d8c9..9998c9d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
@@ -101,7 +101,7 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory
{
 
   /**
    * Creates a {@link DoFnInvoker} for the given {@link DoFn} by generating bytecode that
directly
-   * invokes its methods with arguments extracted from the {@link DoFn.ArgumentProvider}.
+   * invokes its methods with arguments extracted from the {@link DoFnInvoker.ArgumentProvider}.
    */
   @Override
   public <InputT, OutputT> DoFnInvoker<InputT, OutputT> invokerFor(DoFn<InputT,
OutputT> fn) {
@@ -149,19 +149,19 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory
{
     /**
      * Associates the given timer ID with the given {@link OnTimerInvoker}.
      *
-     * <p>ByteBuddy does not like to generate conditional code, so we use a map + lookup
-     * of the timer ID rather than a generated conditional branch to choose which
-     * OnTimerInvoker to invoke.
+     * <p>ByteBuddy does not like to generate conditional code, so we use a map + lookup
of the
+     * timer ID rather than a generated conditional branch to choose which OnTimerInvoker
to invoke.
      *
-     * <p>This method has package level access as it is intended only for assembly
of the
-     * {@link DoFnInvokerBase} not by any subclass.
+     * <p>This method has package level access as it is intended only for assembly
of the {@link
+     * DoFnInvokerBase} not by any subclass.
      */
     void addOnTimerInvoker(String timerId, OnTimerInvoker onTimerInvoker) {
       this.onTimerInvokers.put(timerId, onTimerInvoker);
     }
 
     @Override
-    public void invokeOnTimer(String timerId, DoFn.ArgumentProvider<InputT, OutputT>
arguments) {
+    public void invokeOnTimer(
+        String timerId, DoFnInvoker.ArgumentProvider<InputT, OutputT> arguments) {
       @Nullable OnTimerInvoker onTimerInvoker = onTimerInvokers.get(timerId);
 
       if (onTimerInvoker != null) {
@@ -193,8 +193,8 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory
{
               getByteBuddyInvokerConstructor(signature).newInstance(fn);
 
       for (OnTimerMethod onTimerMethod : signature.onTimerMethods().values()) {
-        invoker.addOnTimerInvoker(onTimerMethod.id(),
-            OnTimerInvokers.forTimer(fn, onTimerMethod.id()));
+        invoker.addOnTimerInvoker(
+            onTimerMethod.id(), OnTimerInvokers.forTimer(fn, onTimerMethod.id()));
       }
 
       return invoker;
@@ -326,8 +326,7 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory
{
             new DefaultRestrictionCoder(signature.getInitialRestriction().restrictionT()));
       } else {
         return new DowncastingParametersMethodDelegation(
-            doFnType,
-            signature.getRestrictionCoder().targetMethod());
+            doFnType, signature.getRestrictionCoder().targetMethod());
       }
     } else {
       return ExceptionMethod.throwing(UnsupportedOperationException.class);
@@ -345,8 +344,8 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory
{
   }
 
   /** Delegates to the given method if available, or does nothing. */
-  private static Implementation delegateOrNoop(TypeDescription doFnType, DoFnSignature.DoFnMethod
-      method) {
+  private static Implementation delegateOrNoop(
+      TypeDescription doFnType, DoFnSignature.DoFnMethod method) {
     return (method == null)
         ? FixedValue.originType()
         : new DoFnMethodDelegation(doFnType, method.targetMethod());
@@ -504,19 +503,19 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory
{
       String methodName, Class<?>... parameterTypes) {
     try {
       return new MethodDescription.ForLoadedMethod(
-          DoFn.ArgumentProvider.class.getMethod(methodName, parameterTypes));
+          DoFnInvoker.ArgumentProvider.class.getMethod(methodName, parameterTypes));
     } catch (Exception e) {
       throw new IllegalStateException(
           String.format(
               "Failed to locate required method %s.%s",
-              DoFn.ArgumentProvider.class.getSimpleName(), methodName),
+              DoFnInvoker.ArgumentProvider.class.getSimpleName(), methodName),
           e);
     }
   }
 
   /**
-   * Calls a zero-parameter getter on the {@link DoFn.ArgumentProvider}, which must be on
top of the
-   * stack.
+   * Calls a zero-parameter getter on the {@link DoFnInvoker.ArgumentProvider}, which must
be on top
+   * of the stack.
    */
   private static StackManipulation simpleExtraContextParameter(String methodName) {
     return new StackManipulation.Compound(
@@ -565,7 +564,7 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory
{
 
           @Override
           public StackManipulation dispatch(RestrictionTrackerParameter p) {
-            // DoFn.ArgumentProvider.restrictionTracker() returns a RestrictionTracker,
+            // DoFnInvoker.ArgumentProvider.restrictionTracker() returns a RestrictionTracker,
             // but the @ProcessElement method expects a concrete subtype of it.
             // Insert a downcast.
             return new StackManipulation.Compound(
@@ -613,8 +612,8 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory
{
     private final DoFnSignature.ProcessElementMethod signature;
 
     /** Implementation of {@link MethodDelegation} for the {@link ProcessElement} method.
*/
-    private ProcessElementDelegation(TypeDescription doFnType, DoFnSignature.ProcessElementMethod
-        signature) {
+    private ProcessElementDelegation(
+        TypeDescription doFnType, DoFnSignature.ProcessElementMethod signature) {
       super(doFnType, signature.targetMethod());
       this.signature = signature;
     }
@@ -622,7 +621,7 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory
{
     @Override
     protected StackManipulation beforeDelegation(MethodDescription instrumentedMethod) {
       // Parameters of the wrapper invoker method:
-      //   DoFn.ArgumentProvider
+      //   DoFnInvoker.ArgumentProvider
       // Parameters of the wrapped DoFn method:
       //   [DoFn.ProcessContext, BoundedWindow, InputProvider, OutputReceiver] in any order
       ArrayList<StackManipulation> pushParameters = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/33fb8c2d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
index 2ae7920..d899207 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
@@ -20,7 +20,19 @@ package org.apache.beam.sdk.transforms.reflect;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.FinishBundle;
+import org.apache.beam.sdk.transforms.DoFn.InputProvider;
+import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
+import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
+import org.apache.beam.sdk.transforms.DoFn.StartBundle;
+import org.apache.beam.sdk.transforms.DoFn.StateId;
+import org.apache.beam.sdk.transforms.DoFn.TimerId;
+import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.Timer;
+import org.apache.beam.sdk.util.WindowingInternals;
+import org.apache.beam.sdk.util.state.State;
 
 /**
  * Interface for invoking the {@code DoFn} processing methods.
@@ -48,11 +60,10 @@ public interface DoFnInvoker<InputT, OutputT> {
    * @return The {@link DoFn.ProcessContinuation} returned by the underlying method, or {@link
    *     DoFn.ProcessContinuation#stop()} if it returns {@code void}.
    */
-  DoFn.ProcessContinuation invokeProcessElement(DoFn.ArgumentProvider<InputT, OutputT>
extra);
+  DoFn.ProcessContinuation invokeProcessElement(ArgumentProvider<InputT, OutputT> extra);
 
   /** Invoke the appropriate {@link DoFn.OnTimer} method on the bound {@link DoFn}. */
-  void invokeOnTimer(
-      String timerId, DoFn.ArgumentProvider<InputT, OutputT> arguments);
+  void invokeOnTimer(String timerId, ArgumentProvider<InputT, OutputT> arguments);
 
   /** Invoke the {@link DoFn.GetInitialRestriction} method on the bound {@link DoFn}. */
   <RestrictionT> RestrictionT invokeGetInitialRestriction(InputT element);
@@ -72,4 +83,108 @@ public interface DoFnInvoker<InputT, OutputT> {
   /** Invoke the {@link DoFn.NewTracker} method on the bound {@link DoFn}. */
   <RestrictionT, TrackerT extends RestrictionTracker<RestrictionT>> TrackerT
invokeNewTracker(
       RestrictionT restriction);
+
+  /**
+   * Interface for runner implementors to provide implementations of extra context information.
+   *
+   * <p>The methods on this interface are called by {@link DoFnInvoker} before invoking
an annotated
+   * {@link StartBundle}, {@link ProcessElement} or {@link FinishBundle} method that has
indicated
+   * it needs the given extra context.
+   *
+   * <p>In the case of {@link ProcessElement} it is called once per invocation of {@link
+   * ProcessElement}.
+   */
+  interface ArgumentProvider<InputT, OutputT> {
+    /**
+     * Construct the {@link BoundedWindow} to use within a {@link DoFn} that needs it. This
is
+     * called if the {@link ProcessElement} method has a parameter of type {@link BoundedWindow}.
+     *
+     * @return {@link BoundedWindow} of the element currently being processed.
+     */
+    BoundedWindow window();
+
+    /** Provide a {@link DoFn.Context} to use with the given {@link DoFn}. */
+    DoFn<InputT, OutputT>.Context context(DoFn<InputT, OutputT> doFn);
+
+    /** Provide a {@link DoFn.ProcessContext} to use with the given {@link DoFn}. */
+    DoFn<InputT, OutputT>.ProcessContext processContext(DoFn<InputT, OutputT>
doFn);
+
+    /** A placeholder for testing purposes. */
+    InputProvider<InputT> inputProvider();
+
+    /** A placeholder for testing purposes. */
+    OutputReceiver<OutputT> outputReceiver();
+
+    /**
+     * For migration from {@link OldDoFn} to {@link DoFn}, provide a {@link WindowingInternals}
so
+     * an {@link OldDoFn} can be run via {@link DoFnInvoker}.
+     *
+     * <p>This is <i>not</i> exposed via the reflective capabilities of
{@link DoFn}.
+     *
+     * @deprecated Please port occurences of {@link OldDoFn} to {@link DoFn}. If they require
state
+     *     and timers, they will need to wait for the arrival of those features. Do not introduce
+     *     new uses of this method.
+     */
+    @Deprecated
+    WindowingInternals<InputT, OutputT> windowingInternals();
+
+    /**
+     * If this is a splittable {@link DoFn}, returns the {@link RestrictionTracker} associated
with
+     * the current {@link ProcessElement} call.
+     */
+    <RestrictionT> RestrictionTracker<RestrictionT> restrictionTracker();
+
+    /** Returns the state cell for the given {@link StateId}. */
+    State state(String stateId);
+
+    /** Returns the timer for the given {@link TimerId}. */
+    Timer timer(String timerId);
+  }
+
+  /** For testing only, this {@link ArgumentProvider} returns {@code null} for all parameters.
*/
+  class FakeArgumentProvider<InputT, OutputT> implements ArgumentProvider<InputT,
OutputT> {
+    @Override
+    public DoFn<InputT, OutputT>.ProcessContext processContext(DoFn<InputT, OutputT>
doFn) {
+      return null;
+    }
+
+    @Override
+    public BoundedWindow window() {
+      return null;
+    }
+
+    @Override
+    public DoFn<InputT, OutputT>.Context context(DoFn<InputT, OutputT> doFn)
{
+      return null;
+    }
+
+    @Override
+    public InputProvider<InputT> inputProvider() {
+      return null;
+    }
+
+    @Override
+    public OutputReceiver<OutputT> outputReceiver() {
+      return null;
+    }
+
+    @Override
+    public WindowingInternals<InputT, OutputT> windowingInternals() {
+      return null;
+    }
+
+    @Override
+    public State state(String stateId) {
+      return null;
+    }
+
+    @Override
+    public Timer timer(String timerId) {
+      return null;
+    }
+
+    public <RestrictionT> RestrictionTracker<RestrictionT> restrictionTracker()
{
+      return null;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/33fb8c2d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
index 7eccaab..15ba198 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
@@ -100,7 +100,7 @@ public class DoFnInvokers {
 
     @Override
     public DoFn.ProcessContinuation invokeProcessElement(
-        DoFn.ArgumentProvider<InputT, OutputT> extra) {
+        ArgumentProvider<InputT, OutputT> extra) {
       // The outer DoFn is immaterial - it exists only to avoid typing InputT and OutputT
repeatedly
       DoFn<InputT, OutputT>.ProcessContext newCtx =
           extra.processContext(new DoFn<InputT, OutputT>() {});
@@ -115,7 +115,7 @@ public class DoFnInvokers {
     }
 
     @Override
-    public void invokeOnTimer(String timerId, DoFn.ArgumentProvider<InputT, OutputT>
arguments) {
+    public void invokeOnTimer(String timerId, ArgumentProvider<InputT, OutputT> arguments)
{
       throw new UnsupportedOperationException(
           String.format("Timers are not supported for %s", OldDoFn.class.getSimpleName()));
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/33fb8c2d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvoker.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvoker.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvoker.java
index bfcafd0..3fbad0f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvoker.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvoker.java
@@ -17,11 +17,11 @@
  */
 package org.apache.beam.sdk.transforms.reflect;
 
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.OnTimer;
 
-/** Interface for invoking the {@link DoFn.OnTimer} method for a particular timer. */
+/** Interface for invoking the {@link OnTimer} method for a particular timer. */
 public interface OnTimerInvoker<InputT, OutputT> {
 
-  /** Invoke the {@link DoFn.OnTimer} method in the provided context. */
-  void invokeOnTimer(DoFn.ArgumentProvider<InputT, OutputT> extra);
+  /** Invoke the {@link OnTimer} method in the provided context. */
+  void invokeOnTimer(DoFnInvoker.ArgumentProvider<InputT, OutputT> extra);
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/33fb8c2d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
index 3d9e3ec..456a6eb 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
@@ -40,10 +40,9 @@ import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.ArgumentProvider;
-import org.apache.beam.sdk.transforms.DoFn.FakeArgumentProvider;
 import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
 import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvoker.FakeArgumentProvider;
 import org.apache.beam.sdk.transforms.reflect.testhelper.DoFnInvokersTestHelper;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -80,7 +79,7 @@ public class DoFnInvokersTest {
   @Mock private DoFn.InputProvider<String> mockInputProvider;
   @Mock private DoFn.OutputReceiver<String> mockOutputReceiver;
   @Mock private WindowingInternals<String, String> mockWindowingInternals;
-  @Mock private ArgumentProvider<String, String> mockArgumentProvider;
+  @Mock private DoFnInvoker.ArgumentProvider<String, String> mockArgumentProvider;
 
   @Mock private OldDoFn<String, String> mockOldDoFn;
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/33fb8c2d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvokersTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvokersTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvokersTest.java
index d51e9cc..177f15f 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvokersTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvokersTest.java
@@ -43,7 +43,7 @@ public class OnTimerInvokersTest {
 
   @Mock private BoundedWindow mockWindow;
 
-  @Mock private DoFn.ArgumentProvider<String, String> mockArgumentProvider;
+  @Mock private DoFnInvoker.ArgumentProvider<String, String> mockArgumentProvider;
 
   @Before
   public void setUp() {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/33fb8c2d/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnInvokersBenchmark.java
----------------------------------------------------------------------
diff --git a/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnInvokersBenchmark.java
b/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnInvokersBenchmark.java
index e0fdac6..442bdec 100644
--- a/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnInvokersBenchmark.java
+++ b/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnInvokersBenchmark.java
@@ -21,10 +21,11 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.FakeArgumentProvider;
 import org.apache.beam.sdk.transforms.DoFnAdapters;
 import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvoker.ArgumentProvider;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvoker.FakeArgumentProvider;
 import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
@@ -56,7 +57,7 @@ public class DoFnInvokersBenchmark {
   private StubOldDoFnProcessContext stubOldDoFnContext =
       new StubOldDoFnProcessContext(oldDoFn, ELEMENT);
   private StubDoFnProcessContext stubDoFnContext = new StubDoFnProcessContext(doFn, ELEMENT);
-  private DoFn.ArgumentProvider<String, String> argumentProvider =
+  private ArgumentProvider<String, String> argumentProvider =
       new FakeArgumentProvider<>();
 
   private OldDoFn<String, String> adaptedDoFnWithContext;


Mime
View raw message