beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [2/3] beam git commit: Reintroduces DoFn.ProcessContinuation (Dataflow worker compatibility part)
Date Tue, 27 Jun 2017 00:34:18 GMT
Reintroduces DoFn.ProcessContinuation (Dataflow worker compatibility part)


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/bec32fe9
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/bec32fe9
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/bec32fe9

Branch: refs/heads/master
Commit: bec32fe93c6b5c16563d7ea4b877a2dee3352fee
Parents: 1ea1de4
Author: Eugene Kirpichov <kirpichov@google.com>
Authored: Fri Jun 16 14:56:07 2017 -0700
Committer: Eugene Kirpichov <kirpichov@google.com>
Committed: Mon Jun 26 17:25:04 2017 -0700

----------------------------------------------------------------------
 .../src/main/java/org/apache/beam/sdk/transforms/DoFn.java     | 3 +++
 .../sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java    | 6 ++++++
 .../org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java    | 4 +++-
 3 files changed, 12 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/bec32fe9/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
index e711ac2..fb6d0ee 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
@@ -677,6 +677,9 @@ public abstract class DoFn<InputT, OutputT> implements Serializable,
HasDisplayD
   @Experimental(Kind.SPLITTABLE_DO_FN)
   public @interface UnboundedPerElement {}
 
+  /** Temporary, do not use. See https://issues.apache.org/jira/browse/BEAM-1904 */
+  public class ProcessContinuation {}
+
   /**
    * Finalize the {@link DoFn} construction to prepare for processing.
    * This method should be called by runners before any processing methods.

http://git-wip-us.apache.org/repos/asf/beam/blob/bec32fe9/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
index 5d5887a..4f67db4 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
@@ -49,6 +49,7 @@ import net.bytebuddy.implementation.bytecode.Throw;
 import net.bytebuddy.implementation.bytecode.assign.Assigner;
 import net.bytebuddy.implementation.bytecode.assign.Assigner.Typing;
 import net.bytebuddy.implementation.bytecode.assign.TypeCasting;
+import net.bytebuddy.implementation.bytecode.constant.NullConstant;
 import net.bytebuddy.implementation.bytecode.constant.TextConstant;
 import net.bytebuddy.implementation.bytecode.member.FieldAccess;
 import net.bytebuddy.implementation.bytecode.member.MethodInvocation;
@@ -667,6 +668,11 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory
{
       }
       return new StackManipulation.Compound(pushParameters);
     }
+
+    @Override
+    protected StackManipulation afterDelegation(MethodDescription instrumentedMethod) {
+      return new StackManipulation.Compound(NullConstant.INSTANCE, MethodReturn.REFERENCE);
+    }
   }
 
   private static class UserCodeMethodInvocation implements StackManipulation {

http://git-wip-us.apache.org/repos/asf/beam/blob/bec32fe9/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
index 6fd4052..ed81f42 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
@@ -53,8 +53,10 @@ public interface DoFnInvoker<InputT, OutputT> {
    * Invoke the {@link DoFn.ProcessElement} method on the bound {@link DoFn}.
    *
    * @param extra Factory for producing extra parameter objects (such as window), if necessary.
+   * @return {@code null} - see <a href="https://issues.apache.org/jira/browse/BEAM-1904">JIRA</a>
+   *     tracking the complete removal of {@link DoFn.ProcessContinuation}.
    */
-  void invokeProcessElement(ArgumentProvider<InputT, OutputT> extra);
+  DoFn.ProcessContinuation invokeProcessElement(ArgumentProvider<InputT, OutputT> extra);
 
   /** Invoke the appropriate {@link DoFn.OnTimer} method on the bound {@link DoFn}. */
   void invokeOnTimer(String timerId, ArgumentProvider<InputT, OutputT> arguments);


Mime
View raw message