beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [1/2] incubator-beam git commit: Add DoFnInvoker for OldDoFn, for migration ease
Date Fri, 07 Oct 2016 03:18:39 GMT
Repository: incubator-beam
Updated Branches:
  refs/heads/master 9b71f1636 -> 03b89c065


Add DoFnInvoker for OldDoFn, for migration ease

This allows any runner to use DoFnInvokers.invokerFor(Object) to be
agnostic as to whether they are running a DoFn or OldDoFn. Thus,
the migration of the runner can occur in advance of further changes
to the SDK and deployment can be independent. For example, a backend
need not know whether it is deserializing a DoFn or OldDoFn.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e2a24f3c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e2a24f3c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e2a24f3c

Branch: refs/heads/master
Commit: e2a24f3c2668c7341b38cc56d331cefd3a69f27f
Parents: 8462acb
Author: Kenneth Knowles <klk@google.com>
Authored: Tue Oct 4 13:56:13 2016 -0700
Committer: Kenneth Knowles <klk@google.com>
Committed: Thu Oct 6 20:16:09 2016 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/transforms/DoFn.java    |  40 ++++++
 .../beam/sdk/transforms/DoFnAdapters.java       | 142 ++++++++++++++++++-
 .../sdk/transforms/reflect/DoFnInvokers.java    |  86 +++++++++++
 .../transforms/reflect/DoFnInvokersTest.java    |  46 ++++++
 .../transforms/DoFnInvokersBenchmark.java       |   7 +
 5 files changed, 318 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e2a24f3c/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
index 59c8323..fb7fbd4 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
@@ -29,6 +29,8 @@ import java.lang.annotation.RetentionPolicy;
 import java.lang.annotation.Target;
 import java.util.HashMap;
 import java.util.Map;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.OldDoFn.DelegatingAggregator;
@@ -37,6 +39,7 @@ import org.apache.beam.sdk.transforms.display.HasDisplayData;
 import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowingInternals;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TypeDescriptor;
@@ -185,6 +188,23 @@ public abstract class DoFn<InputT, OutputT> implements Serializable,
HasDisplayD
      */
     public abstract <T> void sideOutputWithTimestamp(
         TupleTag<T> tag, T output, Instant timestamp);
+
+    /**
+     * Creates an {@link Aggregator} in the {@link DoFn} context with the specified name
and
+     * aggregation logic specified by {@link CombineFn}. This is to be overridden by a particular
+     * runner context with an implementation that delivers the values as appropriate.
+     *
+     * <p>The aggregators declared on the {@link DoFn} will be wired up to aggregators
allocated via
+     * this method.
+     *
+     * @param name the name of the aggregator
+     * @param combiner the {@link CombineFn} to use in the aggregator
+     * @return an aggregator for the provided name and {@link CombineFn} in this context
+     */
+    @Experimental(Kind.AGGREGATOR)
+    protected abstract <AggInputT, AggOutputT>
+        Aggregator<AggInputT, AggOutputT> createAggregator(
+            String name, CombineFn<AggInputT, ?, AggOutputT> combiner);
   }
 
   /**
@@ -306,6 +326,21 @@ public abstract class DoFn<InputT, OutputT> implements Serializable,
HasDisplayD
      * A placeholder for testing purposes.
      */
     OutputReceiver<OutputT> outputReceiver();
+
+    /**
+     * For migration from {@link OldDoFn} to {@link DoFn}, provide
+     * a {@link WindowingInternals} so an {@link OldDoFn} can be run
+     * via {@link DoFnInvoker}.
+     *
+     * <p>This is <i>not</i> exposed via the reflective capabilities
+     * of {@link DoFn}.
+     *
+     * @deprecated Please port occurences of {@link OldDoFn} to {@link DoFn}. If they require
+     * state and timers, they will need to wait for the arrival of those features. Do not
introduce
+     * new uses of this method.
+     */
+    @Deprecated
+    WindowingInternals<InputT, OutputT> windowingInternals();
   }
 
   /** A placeholder for testing handling of output types during {@link DoFn} reflection.
*/
@@ -335,6 +370,11 @@ public abstract class DoFn<InputT, OutputT> implements Serializable,
HasDisplayD
     public OutputReceiver<OutputT> outputReceiver() {
       return null;
     }
+
+    @Override
+    public WindowingInternals<InputT, OutputT> windowingInternals() {
+      return null;
+    }
   }
 
   /////////////////////////////////////////////////////////////////////////////

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e2a24f3c/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
index 77a71e9..7b259aa 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.transforms;
 
 import java.io.IOException;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
 import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
@@ -26,6 +27,7 @@ import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowingInternals;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TypeDescriptor;
@@ -33,7 +35,7 @@ import org.joda.time.Duration;
 import org.joda.time.Instant;
 
 /**
- * Utility class containing adapters for running a {@link DoFn} as an {@link OldDoFn}.
+ * Utility class containing adapters to/from {@link DoFn} and {@link OldDoFn}.
  *
  * @deprecated This class will go away when we start running {@link DoFn}'s directly (using
  * {@link DoFnInvoker}) rather than via {@link OldDoFn}.
@@ -65,6 +67,113 @@ public class DoFnAdapters {
     }
   }
 
+  /** Creates a {@link OldDoFn.ProcessContext} from a {@link DoFn.ProcessContext}. */
+  public static <InputT, OutputT> OldDoFn<InputT, OutputT>.ProcessContext adaptProcessContext(
+      OldDoFn<InputT, OutputT> fn,
+      final DoFn<InputT, OutputT>.ProcessContext c,
+      final DoFn.ExtraContextFactory<InputT, OutputT> extra) {
+    return fn.new ProcessContext() {
+      @Override
+      public InputT element() {
+        return c.element();
+      }
+
+      @Override
+      public <T> T sideInput(PCollectionView<T> view) {
+        return c.sideInput(view);
+      }
+
+      @Override
+      public Instant timestamp() {
+        return c.timestamp();
+      }
+
+      @Override
+      public BoundedWindow window() {
+        return extra.window();
+      }
+
+      @Override
+      public PaneInfo pane() {
+        return c.pane();
+      }
+
+      @Override
+      public WindowingInternals<InputT, OutputT> windowingInternals() {
+        return extra.windowingInternals();
+      }
+
+      @Override
+      public PipelineOptions getPipelineOptions() {
+        return c.getPipelineOptions();
+      }
+
+      @Override
+      public void output(OutputT output) {
+        c.output(output);
+      }
+
+      @Override
+      public void outputWithTimestamp(OutputT output, Instant timestamp) {
+        c.outputWithTimestamp(output, timestamp);
+      }
+
+      @Override
+      public <T> void sideOutput(TupleTag<T> tag, T output) {
+        c.sideOutput(tag, output);
+      }
+
+      @Override
+      public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant
timestamp) {
+        c.sideOutputWithTimestamp(tag, output, timestamp);
+      }
+
+      @Override
+      protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(
+          String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
+        return c.createAggregator(name, combiner);
+      }
+    };
+  }
+
+  /** Creates a {@link OldDoFn.ProcessContext} from a {@link DoFn.ProcessContext}. */
+  public static <InputT, OutputT> OldDoFn<InputT, OutputT>.Context adaptContext(
+      OldDoFn<InputT, OutputT> fn,
+      final DoFn<InputT, OutputT>.Context c) {
+    return fn.new Context() {
+      @Override
+      public PipelineOptions getPipelineOptions() {
+        return c.getPipelineOptions();
+      }
+
+      @Override
+      public void output(OutputT output) {
+        c.output(output);
+      }
+
+      @Override
+      public void outputWithTimestamp(OutputT output, Instant timestamp) {
+        c.outputWithTimestamp(output, timestamp);
+      }
+
+      @Override
+      public <T> void sideOutput(TupleTag<T> tag, T output) {
+        c.sideOutput(tag, output);
+      }
+
+      @Override
+      public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant
timestamp) {
+        c.sideOutputWithTimestamp(tag, output, timestamp);
+      }
+
+      @Override
+      protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(
+          String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
+        return c.createAggregator(name, combiner);
+      }
+    };
+  }
+
   /**
    * Wraps a {@link DoFn} that doesn't require access to {@link BoundedWindow} as an {@link
    * OldDoFn}.
@@ -183,10 +292,26 @@ public class DoFnAdapters {
     }
 
     @Override
+    protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregator(
+        String name,
+        CombineFn<AggInputT, ?, AggOutputT> combiner) {
+      return context.createAggregatorInternal(name, combiner);
+    }
+
+    @Override
     public BoundedWindow window() {
-      // The DoFn doesn't allow us to ask for these outside ProcessElements, so this
+      // The OldDoFn doesn't allow us to ask for these outside processElement, so this
       // should be unreachable.
-      throw new UnsupportedOperationException("Can only get the window in ProcessElements");
+      throw new UnsupportedOperationException(
+          "Can only get the window in processElement; elsewhere there is no defined window.");
+    }
+
+    @Override
+    public WindowingInternals<InputT, OutputT> windowingInternals() {
+      // The OldDoFn doesn't allow us to ask for these outside ProcessElements, so this
+      // should be unreachable.
+      throw new UnsupportedOperationException(
+          "Can only get WindowingInternals in processElement");
     }
 
     @Override
@@ -247,6 +372,12 @@ public class DoFnAdapters {
     }
 
     @Override
+    protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregator(
+        String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
+      return context.createAggregatorInternal(name, combiner);
+    }
+
+    @Override
     public InputT element() {
       return context.element();
     }
@@ -267,6 +398,11 @@ public class DoFnAdapters {
     }
 
     @Override
+    public WindowingInternals<InputT, OutputT> windowingInternals() {
+      return context.windowingInternals();
+    }
+
+    @Override
     public DoFn.InputProvider<InputT> inputProvider() {
       throw new UnsupportedOperationException("inputProvider() exists only for testing");
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e2a24f3c/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
index edc1dc0..041eb60 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
@@ -58,7 +58,10 @@ import net.bytebuddy.jar.asm.Opcodes;
 import net.bytebuddy.jar.asm.Type;
 import net.bytebuddy.matcher.ElementMatchers;
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.ExtraContextFactory;
 import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
+import org.apache.beam.sdk.transforms.DoFnAdapters;
+import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.util.UserCodeException;
 
 /** Dynamically generates {@link DoFnInvoker} instances for invoking a {@link DoFn}. */
@@ -77,6 +80,89 @@ public class DoFnInvokers {
 
   private DoFnInvokers() {}
 
+  /**
+   * Creates a {@link DoFnInvoker} for the given {@link Object}, which should be either a
+   * {@link DoFn} or an {@link OldDoFn}. The expected use would be to deserialize a user's
+   * function as an {@link Object} and then pass it to this method, so there is no need to
+   * statically specify what sort of object it is.
+   *
+   * @deprecated this is to be used only as a migration path for decoupling upgrades
+   */
+  @Deprecated
+  public DoFnInvoker<?, ?> invokerFor(Object deserializedFn) {
+    if (deserializedFn instanceof DoFn) {
+      return newByteBuddyInvoker((DoFn<?, ?>) deserializedFn);
+    } else if (deserializedFn instanceof OldDoFn){
+      return new OldDoFnInvoker<>((OldDoFn<?, ?>) deserializedFn);
+    } else {
+      throw new IllegalArgumentException(String.format(
+          "Cannot create a %s for %s; it should be either a %s or an %s.",
+          DoFnInvoker.class.getSimpleName(),
+          deserializedFn.toString(),
+          DoFn.class.getSimpleName(),
+          OldDoFn.class.getSimpleName()));
+    }
+  }
+
+  static class OldDoFnInvoker<InputT, OutputT> implements DoFnInvoker<InputT, OutputT>
{
+
+    private final OldDoFn<InputT, OutputT> fn;
+
+    public OldDoFnInvoker(OldDoFn<InputT, OutputT> fn) {
+      this.fn = fn;
+    }
+
+    @Override
+    public void invokeProcessElement(
+        DoFn<InputT, OutputT>.ProcessContext c, ExtraContextFactory<InputT, OutputT>
extra) {
+      OldDoFn<InputT, OutputT>.ProcessContext oldCtx =
+          DoFnAdapters.adaptProcessContext(fn, c, extra);
+      try {
+        fn.processElement(oldCtx);
+      } catch (Throwable exc) {
+        throw UserCodeException.wrap(exc);
+      }
+    }
+
+    @Override
+    public void invokeStartBundle(DoFn.Context c) {
+      OldDoFn<InputT, OutputT>.Context oldCtx = DoFnAdapters.adaptContext(fn, c);
+      try {
+        fn.startBundle(oldCtx);
+      } catch (Throwable exc) {
+        throw UserCodeException.wrap(exc);
+      }
+    }
+
+    @Override
+    public void invokeFinishBundle(DoFn.Context c) {
+      OldDoFn<InputT, OutputT>.Context oldCtx = DoFnAdapters.adaptContext(fn, c);
+      try {
+        fn.finishBundle(oldCtx);
+      } catch (Throwable exc) {
+        throw UserCodeException.wrap(exc);
+      }
+    }
+
+    @Override
+    public void invokeSetup() {
+      try {
+        fn.setup();
+      } catch (Throwable exc) {
+        throw UserCodeException.wrap(exc);
+      }
+    }
+
+    @Override
+    public void invokeTeardown() {
+      try {
+        fn.teardown();
+      } catch (Throwable exc) {
+        throw UserCodeException.wrap(exc);
+      }
+    }
+  }
+
   /** @return the {@link DoFnInvoker} for the given {@link DoFn}. */
   public <InputT, OutputT> DoFnInvoker<InputT, OutputT> newByteBuddyInvoker(
       DoFn<InputT, OutputT> fn) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e2a24f3c/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
index e59cce8..97d810c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
@@ -18,13 +18,16 @@
 package org.apache.beam.sdk.transforms.reflect;
 
 import static org.junit.Assert.assertSame;
+import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.reflect.testhelper.DoFnInvokersTestHelper;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.UserCodeException;
+import org.apache.beam.sdk.util.WindowingInternals;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -43,6 +46,9 @@ public class DoFnInvokersTest {
   @Mock private BoundedWindow mockWindow;
   @Mock private DoFn.InputProvider<String> mockInputProvider;
   @Mock private DoFn.OutputReceiver<String> mockOutputReceiver;
+  @Mock private WindowingInternals<String, String> mockWindowingInternals;
+
+  @Mock private OldDoFn<String, String> mockOldDoFn;
 
   private DoFn.ExtraContextFactory<String, String> extraContextFactory;
 
@@ -65,6 +71,11 @@ public class DoFnInvokersTest {
           public DoFn.OutputReceiver<String> outputReceiver() {
             return mockOutputReceiver;
           }
+
+          @Override
+          public WindowingInternals<String, String> windowingInternals() {
+            return mockWindowingInternals;
+          }
         };
   }
 
@@ -326,4 +337,39 @@ public class DoFnInvokersTest {
     thrown.expectMessage("bogus");
     invoker.invokeFinishBundle(null);
   }
+
+  private class OldDoFnIdentity extends OldDoFn<String, String> {
+    public void processElement(ProcessContext c) {}
+  }
+
+  @Test
+  public void testOldDoFnProcessElement() throws Exception {
+    new DoFnInvokers.OldDoFnInvoker<>(mockOldDoFn)
+        .invokeProcessElement(mockContext, extraContextFactory);
+    verify(mockOldDoFn).processElement(any(OldDoFn.ProcessContext.class));
+  }
+
+  @Test
+  public void testOldDoFnStartBundle() throws Exception {
+    new DoFnInvokers.OldDoFnInvoker<>(mockOldDoFn).invokeStartBundle(mockContext);
+    verify(mockOldDoFn).startBundle(any(OldDoFn.Context.class));
+  }
+
+  @Test
+  public void testOldDoFnFinishBundle() throws Exception {
+    new DoFnInvokers.OldDoFnInvoker<>(mockOldDoFn).invokeFinishBundle(mockContext);
+    verify(mockOldDoFn).finishBundle(any(OldDoFn.Context.class));
+  }
+
+  @Test
+  public void testOldDoFnSetup() throws Exception {
+    new DoFnInvokers.OldDoFnInvoker<>(mockOldDoFn).invokeSetup();
+    verify(mockOldDoFn).setup();
+  }
+
+  @Test
+  public void testOldDoFnTeardown() throws Exception {
+    new DoFnInvokers.OldDoFnInvoker<>(mockOldDoFn).invokeTeardown();
+    verify(mockOldDoFn).teardown();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e2a24f3c/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnInvokersBenchmark.java
----------------------------------------------------------------------
diff --git a/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnInvokersBenchmark.java
b/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnInvokersBenchmark.java
index a574ed8..80324b9 100644
--- a/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnInvokersBenchmark.java
+++ b/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnInvokersBenchmark.java
@@ -220,5 +220,12 @@ public class DoFnInvokersBenchmark {
 
     @Override
     public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant
timestamp) {}
+
+    @Override
+    protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregator(
+        String name,
+        CombineFn<AggInputT, ?, AggOutputT> combiner) {
+      return null;
+    }
   }
 }


Mime
View raw message