Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 40578200BC3 for ; Fri, 18 Nov 2016 23:20:49 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 3F42D160B04; Fri, 18 Nov 2016 22:20:49 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id BA208160B03 for ; Fri, 18 Nov 2016 23:20:47 +0100 (CET) Received: (qmail 73857 invoked by uid 500); 18 Nov 2016 22:20:47 -0000 Mailing-List: contact commits-help@beam.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.incubator.apache.org Delivered-To: mailing list commits@beam.incubator.apache.org Received: (qmail 73848 invoked by uid 99); 18 Nov 2016 22:20:46 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 18 Nov 2016 22:20:46 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 8506AC07D4 for ; Fri, 18 Nov 2016 22:20:46 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -6.219 X-Spam-Level: X-Spam-Status: No, score=-6.219 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-2.999] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id D7_tOuqcRExN for ; Fri, 18 Nov 2016 22:20:44 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 7286B5F47D for ; Fri, 18 Nov 2016 22:20:42 +0000 (UTC) Received: (qmail 73683 invoked by uid 99); 18 Nov 2016 22:20:41 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 18 Nov 2016 22:20:41 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 7B7EAE0230; Fri, 18 Nov 2016 22:20:41 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: kenn@apache.org To: commits@beam.incubator.apache.org Date: Fri, 18 Nov 2016 22:20:41 -0000 Message-Id: <279dde6ad988474d9a49c297ec2a5afc@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/2] incubator-beam git commit: Move DoFn.ArgumentProvider to DoFnInvoker.ArgumentProvider archived-at: Fri, 18 Nov 2016 22:20:49 -0000 Repository: incubator-beam Updated Branches: refs/heads/master bb9c38664 -> 2a7169b6f Move DoFn.ArgumentProvider to DoFnInvoker.ArgumentProvider The arguments provided as a single object are an aspect of the DoFnInvoker, not the DoFn. The DoFn itself is a specification that may have other ways of being invoked, depending on the circumstance. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/33fb8c2d Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/33fb8c2d Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/33fb8c2d Branch: refs/heads/master Commit: 33fb8c2db8c64275f1b9b8ac6dfd12e92d7fb777 Parents: bb9c386 Author: Kenneth Knowles Authored: Thu Nov 17 23:04:55 2016 -0800 Committer: Kenneth Knowles Committed: Fri Nov 18 14:20:20 2016 -0800 ---------------------------------------------------------------------- .../beam/runners/core/SimpleDoFnRunner.java | 4 +- .../beam/runners/core/SplittableParDo.java | 7 +- .../org/apache/beam/sdk/transforms/DoFn.java | 122 ------------------- .../beam/sdk/transforms/DoFnAdapters.java | 10 +- .../reflect/ByteBuddyDoFnInvokerFactory.java | 41 +++---- .../sdk/transforms/reflect/DoFnInvoker.java | 121 +++++++++++++++++- .../sdk/transforms/reflect/DoFnInvokers.java | 4 +- .../sdk/transforms/reflect/OnTimerInvoker.java | 8 +- .../transforms/reflect/DoFnInvokersTest.java | 5 +- .../transforms/reflect/OnTimerInvokersTest.java | 2 +- .../transforms/DoFnInvokersBenchmark.java | 5 +- 11 files changed, 161 insertions(+), 168 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/33fb8c2d/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java index 76aae8f..841e412 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java @@ -183,7 +183,7 @@ public class SimpleDoFnRunner implements DoFnRunner the type of the {@link DoFn} (main) output elements */ private static class DoFnContext extends DoFn.Context - implements DoFn.ArgumentProvider { + implements DoFnInvoker.ArgumentProvider { private static final int MAX_SIDE_OUTPUTS = 1000; final PipelineOptions options; @@ -424,7 +424,7 @@ public class SimpleDoFnRunner implements DoFnRunner the type of the {@link DoFn} (main) output elements */ private class DoFnProcessContext extends DoFn.ProcessContext - implements DoFn.ArgumentProvider { + implements DoFnInvoker.ArgumentProvider { final DoFn fn; final DoFnContext context; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/33fb8c2d/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java index 3003984..c38ab2f 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java @@ -392,10 +392,11 @@ public class SplittableParDo< } /** - * Creates an {@link DoFn.ArgumentProvider} that provides the given tracker as well as the given + * Creates an {@link DoFnInvoker.ArgumentProvider} that provides the given tracker as well as + * the given * {@link ProcessContext} (which is also provided when a {@link Context} is requested. */ - private DoFn.ArgumentProvider wrapTracker( + private DoFnInvoker.ArgumentProvider wrapTracker( TrackerT tracker, DoFn.ProcessContext processContext) { return new ArgumentProviderForTracker<>(tracker, processContext); @@ -403,7 +404,7 @@ public class SplittableParDo< private static class ArgumentProviderForTracker< InputT, OutputT, TrackerT extends RestrictionTracker> - implements DoFn.ArgumentProvider { + implements DoFnInvoker.ArgumentProvider { private final TrackerT tracker; private final DoFn.ProcessContext processContext; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/33fb8c2d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java index bf0631b..9978ef4 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java @@ -38,13 +38,11 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Combine.CombineFn; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.display.HasDisplayData; -import org.apache.beam.sdk.transforms.reflect.DoFnInvoker; import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.Timer; import org.apache.beam.sdk.util.TimerSpec; -import org.apache.beam.sdk.util.WindowingInternals; import org.apache.beam.sdk.util.state.State; import org.apache.beam.sdk.util.state.StateSpec; import org.apache.beam.sdk.values.PCollection; @@ -331,78 +329,6 @@ public abstract class DoFn implements Serializable, HasDisplayD return new TypeDescriptor(getClass()) {}; } - /** - * Interface for runner implementors to provide implementations of extra context information. - * - *

The methods on this interface are called by {@link DoFnInvoker} before invoking an - * annotated {@link StartBundle}, {@link ProcessElement} or {@link FinishBundle} method that - * has indicated it needs the given extra context. - * - *

In the case of {@link ProcessElement} it is called once per invocation of - * {@link ProcessElement}. - */ - public interface ArgumentProvider { - /** - * Construct the {@link BoundedWindow} to use within a {@link DoFn} that - * needs it. This is called if the {@link ProcessElement} method has a parameter of type - * {@link BoundedWindow}. - * - * @return {@link BoundedWindow} of the element currently being processed. - */ - BoundedWindow window(); - - /** - * Provide a {@link DoFn.Context} to use with the given {@link DoFn}. - */ - DoFn.Context context(DoFn doFn); - - /** - * Provide a {@link DoFn.ProcessContext} to use with the given {@link DoFn}. - */ - DoFn.ProcessContext processContext(DoFn doFn); - - /** - * A placeholder for testing purposes. - */ - InputProvider inputProvider(); - - /** - * A placeholder for testing purposes. - */ - OutputReceiver outputReceiver(); - - /** - * For migration from {@link OldDoFn} to {@link DoFn}, provide - * a {@link WindowingInternals} so an {@link OldDoFn} can be run - * via {@link DoFnInvoker}. - * - *

This is not exposed via the reflective capabilities - * of {@link DoFn}. - * - * @deprecated Please port occurences of {@link OldDoFn} to {@link DoFn}. If they require - * state and timers, they will need to wait for the arrival of those features. Do not introduce - * new uses of this method. - */ - @Deprecated - WindowingInternals windowingInternals(); - - /** - * If this is a splittable {@link DoFn}, returns the {@link RestrictionTracker} associated with - * the current {@link ProcessElement} call. - */ - RestrictionTracker restrictionTracker(); - - /** - * Returns the state cell for the given {@link StateId}. - */ - State state(String stateId); - - /** - * Returns the timer for the given {@link TimerId}. - */ - Timer timer(String timerId); - } - /** Receives values of the given type. */ public interface OutputReceiver { void output(T output); @@ -413,54 +339,6 @@ public abstract class DoFn implements Serializable, HasDisplayD T get(); } - /** For testing only, this {@link ArgumentProvider} returns {@code null} for all parameters. */ - public static class FakeArgumentProvider - implements ArgumentProvider { - @Override - public DoFn.ProcessContext processContext(DoFn doFn) { - return null; - } - - @Override - public BoundedWindow window() { - return null; - } - - @Override - public DoFn.Context context(DoFn doFn) { - return null; - } - - @Override - public InputProvider inputProvider() { - return null; - } - - @Override - public OutputReceiver outputReceiver() { - return null; - } - - @Override - public WindowingInternals windowingInternals() { - return null; - } - - @Override - public State state(String stateId) { - return null; - } - - @Override - public Timer timer(String timerId) { - return null; - } - - public RestrictionTracker restrictionTracker() { - return null; - } - } - ///////////////////////////////////////////////////////////////////////////// /** http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/33fb8c2d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java index 71a6d1d..a3466bb 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java @@ -77,7 +77,7 @@ public class DoFnAdapters { public static OldDoFn.ProcessContext adaptProcessContext( OldDoFn fn, final DoFn.ProcessContext c, - final DoFn.ArgumentProvider extra) { + final DoFnInvoker.ArgumentProvider extra) { return fn.new ProcessContext() { @Override public InputT element() { @@ -270,12 +270,12 @@ public class DoFnAdapters { } /** - * Wraps an {@link OldDoFn.Context} as a {@link DoFn.ArgumentProvider} inside a {@link + * Wraps an {@link OldDoFn.Context} as a {@link DoFnInvoker.ArgumentProvider} inside a {@link * DoFn.StartBundle} or {@link DoFn.FinishBundle} method, which means the extra context is * unavailable. */ private static class ContextAdapter extends DoFn.Context - implements DoFn.ArgumentProvider { + implements DoFnInvoker.ArgumentProvider { private OldDoFn.Context context; @@ -371,11 +371,11 @@ public class DoFnAdapters { } /** - * Wraps an {@link OldDoFn.ProcessContext} as a {@link DoFn.ArgumentProvider} method. + * Wraps an {@link OldDoFn.ProcessContext} as a {@link DoFnInvoker.ArgumentProvider} method. */ private static class ProcessContextAdapter extends DoFn.ProcessContext - implements DoFn.ArgumentProvider { + implements DoFnInvoker.ArgumentProvider { private OldDoFn.ProcessContext context; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/33fb8c2d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java index bc6d8c9..9998c9d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java @@ -101,7 +101,7 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory { /** * Creates a {@link DoFnInvoker} for the given {@link DoFn} by generating bytecode that directly - * invokes its methods with arguments extracted from the {@link DoFn.ArgumentProvider}. + * invokes its methods with arguments extracted from the {@link DoFnInvoker.ArgumentProvider}. */ @Override public DoFnInvoker invokerFor(DoFn fn) { @@ -149,19 +149,19 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory { /** * Associates the given timer ID with the given {@link OnTimerInvoker}. * - *

ByteBuddy does not like to generate conditional code, so we use a map + lookup - * of the timer ID rather than a generated conditional branch to choose which - * OnTimerInvoker to invoke. + *

ByteBuddy does not like to generate conditional code, so we use a map + lookup of the + * timer ID rather than a generated conditional branch to choose which OnTimerInvoker to invoke. * - *

This method has package level access as it is intended only for assembly of the - * {@link DoFnInvokerBase} not by any subclass. + *

This method has package level access as it is intended only for assembly of the {@link + * DoFnInvokerBase} not by any subclass. */ void addOnTimerInvoker(String timerId, OnTimerInvoker onTimerInvoker) { this.onTimerInvokers.put(timerId, onTimerInvoker); } @Override - public void invokeOnTimer(String timerId, DoFn.ArgumentProvider arguments) { + public void invokeOnTimer( + String timerId, DoFnInvoker.ArgumentProvider arguments) { @Nullable OnTimerInvoker onTimerInvoker = onTimerInvokers.get(timerId); if (onTimerInvoker != null) { @@ -193,8 +193,8 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory { getByteBuddyInvokerConstructor(signature).newInstance(fn); for (OnTimerMethod onTimerMethod : signature.onTimerMethods().values()) { - invoker.addOnTimerInvoker(onTimerMethod.id(), - OnTimerInvokers.forTimer(fn, onTimerMethod.id())); + invoker.addOnTimerInvoker( + onTimerMethod.id(), OnTimerInvokers.forTimer(fn, onTimerMethod.id())); } return invoker; @@ -326,8 +326,7 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory { new DefaultRestrictionCoder(signature.getInitialRestriction().restrictionT())); } else { return new DowncastingParametersMethodDelegation( - doFnType, - signature.getRestrictionCoder().targetMethod()); + doFnType, signature.getRestrictionCoder().targetMethod()); } } else { return ExceptionMethod.throwing(UnsupportedOperationException.class); @@ -345,8 +344,8 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory { } /** Delegates to the given method if available, or does nothing. */ - private static Implementation delegateOrNoop(TypeDescription doFnType, DoFnSignature.DoFnMethod - method) { + private static Implementation delegateOrNoop( + TypeDescription doFnType, DoFnSignature.DoFnMethod method) { return (method == null) ? FixedValue.originType() : new DoFnMethodDelegation(doFnType, method.targetMethod()); @@ -504,19 +503,19 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory { String methodName, Class... parameterTypes) { try { return new MethodDescription.ForLoadedMethod( - DoFn.ArgumentProvider.class.getMethod(methodName, parameterTypes)); + DoFnInvoker.ArgumentProvider.class.getMethod(methodName, parameterTypes)); } catch (Exception e) { throw new IllegalStateException( String.format( "Failed to locate required method %s.%s", - DoFn.ArgumentProvider.class.getSimpleName(), methodName), + DoFnInvoker.ArgumentProvider.class.getSimpleName(), methodName), e); } } /** - * Calls a zero-parameter getter on the {@link DoFn.ArgumentProvider}, which must be on top of the - * stack. + * Calls a zero-parameter getter on the {@link DoFnInvoker.ArgumentProvider}, which must be on top + * of the stack. */ private static StackManipulation simpleExtraContextParameter(String methodName) { return new StackManipulation.Compound( @@ -565,7 +564,7 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory { @Override public StackManipulation dispatch(RestrictionTrackerParameter p) { - // DoFn.ArgumentProvider.restrictionTracker() returns a RestrictionTracker, + // DoFnInvoker.ArgumentProvider.restrictionTracker() returns a RestrictionTracker, // but the @ProcessElement method expects a concrete subtype of it. // Insert a downcast. return new StackManipulation.Compound( @@ -613,8 +612,8 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory { private final DoFnSignature.ProcessElementMethod signature; /** Implementation of {@link MethodDelegation} for the {@link ProcessElement} method. */ - private ProcessElementDelegation(TypeDescription doFnType, DoFnSignature.ProcessElementMethod - signature) { + private ProcessElementDelegation( + TypeDescription doFnType, DoFnSignature.ProcessElementMethod signature) { super(doFnType, signature.targetMethod()); this.signature = signature; } @@ -622,7 +621,7 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory { @Override protected StackManipulation beforeDelegation(MethodDescription instrumentedMethod) { // Parameters of the wrapper invoker method: - // DoFn.ArgumentProvider + // DoFnInvoker.ArgumentProvider // Parameters of the wrapped DoFn method: // [DoFn.ProcessContext, BoundedWindow, InputProvider, OutputReceiver] in any order ArrayList pushParameters = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/33fb8c2d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java index 2ae7920..d899207 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java @@ -20,7 +20,19 @@ package org.apache.beam.sdk.transforms.reflect; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderRegistry; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.FinishBundle; +import org.apache.beam.sdk.transforms.DoFn.InputProvider; +import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; +import org.apache.beam.sdk.transforms.DoFn.ProcessElement; +import org.apache.beam.sdk.transforms.DoFn.StartBundle; +import org.apache.beam.sdk.transforms.DoFn.StateId; +import org.apache.beam.sdk.transforms.DoFn.TimerId; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.Timer; +import org.apache.beam.sdk.util.WindowingInternals; +import org.apache.beam.sdk.util.state.State; /** * Interface for invoking the {@code DoFn} processing methods. @@ -48,11 +60,10 @@ public interface DoFnInvoker { * @return The {@link DoFn.ProcessContinuation} returned by the underlying method, or {@link * DoFn.ProcessContinuation#stop()} if it returns {@code void}. */ - DoFn.ProcessContinuation invokeProcessElement(DoFn.ArgumentProvider extra); + DoFn.ProcessContinuation invokeProcessElement(ArgumentProvider extra); /** Invoke the appropriate {@link DoFn.OnTimer} method on the bound {@link DoFn}. */ - void invokeOnTimer( - String timerId, DoFn.ArgumentProvider arguments); + void invokeOnTimer(String timerId, ArgumentProvider arguments); /** Invoke the {@link DoFn.GetInitialRestriction} method on the bound {@link DoFn}. */ RestrictionT invokeGetInitialRestriction(InputT element); @@ -72,4 +83,108 @@ public interface DoFnInvoker { /** Invoke the {@link DoFn.NewTracker} method on the bound {@link DoFn}. */ > TrackerT invokeNewTracker( RestrictionT restriction); + + /** + * Interface for runner implementors to provide implementations of extra context information. + * + *

The methods on this interface are called by {@link DoFnInvoker} before invoking an annotated + * {@link StartBundle}, {@link ProcessElement} or {@link FinishBundle} method that has indicated + * it needs the given extra context. + * + *

In the case of {@link ProcessElement} it is called once per invocation of {@link + * ProcessElement}. + */ + interface ArgumentProvider { + /** + * Construct the {@link BoundedWindow} to use within a {@link DoFn} that needs it. This is + * called if the {@link ProcessElement} method has a parameter of type {@link BoundedWindow}. + * + * @return {@link BoundedWindow} of the element currently being processed. + */ + BoundedWindow window(); + + /** Provide a {@link DoFn.Context} to use with the given {@link DoFn}. */ + DoFn.Context context(DoFn doFn); + + /** Provide a {@link DoFn.ProcessContext} to use with the given {@link DoFn}. */ + DoFn.ProcessContext processContext(DoFn doFn); + + /** A placeholder for testing purposes. */ + InputProvider inputProvider(); + + /** A placeholder for testing purposes. */ + OutputReceiver outputReceiver(); + + /** + * For migration from {@link OldDoFn} to {@link DoFn}, provide a {@link WindowingInternals} so + * an {@link OldDoFn} can be run via {@link DoFnInvoker}. + * + *

This is not exposed via the reflective capabilities of {@link DoFn}. + * + * @deprecated Please port occurences of {@link OldDoFn} to {@link DoFn}. If they require state + * and timers, they will need to wait for the arrival of those features. Do not introduce + * new uses of this method. + */ + @Deprecated + WindowingInternals windowingInternals(); + + /** + * If this is a splittable {@link DoFn}, returns the {@link RestrictionTracker} associated with + * the current {@link ProcessElement} call. + */ + RestrictionTracker restrictionTracker(); + + /** Returns the state cell for the given {@link StateId}. */ + State state(String stateId); + + /** Returns the timer for the given {@link TimerId}. */ + Timer timer(String timerId); + } + + /** For testing only, this {@link ArgumentProvider} returns {@code null} for all parameters. */ + class FakeArgumentProvider implements ArgumentProvider { + @Override + public DoFn.ProcessContext processContext(DoFn doFn) { + return null; + } + + @Override + public BoundedWindow window() { + return null; + } + + @Override + public DoFn.Context context(DoFn doFn) { + return null; + } + + @Override + public InputProvider inputProvider() { + return null; + } + + @Override + public OutputReceiver outputReceiver() { + return null; + } + + @Override + public WindowingInternals windowingInternals() { + return null; + } + + @Override + public State state(String stateId) { + return null; + } + + @Override + public Timer timer(String timerId) { + return null; + } + + public RestrictionTracker restrictionTracker() { + return null; + } + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/33fb8c2d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java index 7eccaab..15ba198 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java @@ -100,7 +100,7 @@ public class DoFnInvokers { @Override public DoFn.ProcessContinuation invokeProcessElement( - DoFn.ArgumentProvider extra) { + ArgumentProvider extra) { // The outer DoFn is immaterial - it exists only to avoid typing InputT and OutputT repeatedly DoFn.ProcessContext newCtx = extra.processContext(new DoFn() {}); @@ -115,7 +115,7 @@ public class DoFnInvokers { } @Override - public void invokeOnTimer(String timerId, DoFn.ArgumentProvider arguments) { + public void invokeOnTimer(String timerId, ArgumentProvider arguments) { throw new UnsupportedOperationException( String.format("Timers are not supported for %s", OldDoFn.class.getSimpleName())); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/33fb8c2d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvoker.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvoker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvoker.java index bfcafd0..3fbad0f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvoker.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvoker.java @@ -17,11 +17,11 @@ */ package org.apache.beam.sdk.transforms.reflect; -import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.OnTimer; -/** Interface for invoking the {@link DoFn.OnTimer} method for a particular timer. */ +/** Interface for invoking the {@link OnTimer} method for a particular timer. */ public interface OnTimerInvoker { - /** Invoke the {@link DoFn.OnTimer} method in the provided context. */ - void invokeOnTimer(DoFn.ArgumentProvider extra); + /** Invoke the {@link OnTimer} method in the provided context. */ + void invokeOnTimer(DoFnInvoker.ArgumentProvider extra); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/33fb8c2d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java index 3d9e3ec..456a6eb 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java @@ -40,10 +40,9 @@ import org.apache.beam.sdk.coders.CoderRegistry; import org.apache.beam.sdk.coders.CustomCoder; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.DoFn.ArgumentProvider; -import org.apache.beam.sdk.transforms.DoFn.FakeArgumentProvider; import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation; import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.reflect.DoFnInvoker.FakeArgumentProvider; import org.apache.beam.sdk.transforms.reflect.testhelper.DoFnInvokersTestHelper; import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; @@ -80,7 +79,7 @@ public class DoFnInvokersTest { @Mock private DoFn.InputProvider mockInputProvider; @Mock private DoFn.OutputReceiver mockOutputReceiver; @Mock private WindowingInternals mockWindowingInternals; - @Mock private ArgumentProvider mockArgumentProvider; + @Mock private DoFnInvoker.ArgumentProvider mockArgumentProvider; @Mock private OldDoFn mockOldDoFn; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/33fb8c2d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvokersTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvokersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvokersTest.java index d51e9cc..177f15f 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvokersTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvokersTest.java @@ -43,7 +43,7 @@ public class OnTimerInvokersTest { @Mock private BoundedWindow mockWindow; - @Mock private DoFn.ArgumentProvider mockArgumentProvider; + @Mock private DoFnInvoker.ArgumentProvider mockArgumentProvider; @Before public void setUp() { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/33fb8c2d/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnInvokersBenchmark.java ---------------------------------------------------------------------- diff --git a/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnInvokersBenchmark.java b/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnInvokersBenchmark.java index e0fdac6..442bdec 100644 --- a/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnInvokersBenchmark.java +++ b/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnInvokersBenchmark.java @@ -21,10 +21,11 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Combine.CombineFn; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.DoFn.FakeArgumentProvider; import org.apache.beam.sdk.transforms.DoFnAdapters; import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.reflect.DoFnInvoker; +import org.apache.beam.sdk.transforms.reflect.DoFnInvoker.ArgumentProvider; +import org.apache.beam.sdk.transforms.reflect.DoFnInvoker.FakeArgumentProvider; import org.apache.beam.sdk.transforms.reflect.DoFnInvokers; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; @@ -56,7 +57,7 @@ public class DoFnInvokersBenchmark { private StubOldDoFnProcessContext stubOldDoFnContext = new StubOldDoFnProcessContext(oldDoFn, ELEMENT); private StubDoFnProcessContext stubDoFnContext = new StubDoFnProcessContext(doFn, ELEMENT); - private DoFn.ArgumentProvider argumentProvider = + private ArgumentProvider argumentProvider = new FakeArgumentProvider<>(); private OldDoFn adaptedDoFnWithContext;