beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [5/7] beam git commit: Removes some OldDoFn code from DoFnRunners
Date Tue, 17 Jan 2017 17:53:11 GMT
Removes some OldDoFn code from DoFnRunners

DoFnRunners.createDefault() can be replaced with simpleRunner()
at the existing call sites, since it is never called with a
ReduceFnExecutor at those call sites.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2b26ec89
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2b26ec89
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2b26ec89

Branch: refs/heads/master
Commit: 2b26ec8934725a600954ced9c4063766a582396a
Parents: 149d52b
Author: Eugene Kirpichov <kirpichov@google.com>
Authored: Thu Jan 12 13:10:40 2017 -0800
Committer: Eugene Kirpichov <kirpichov@google.com>
Committed: Fri Jan 13 14:34:23 2017 -0800

----------------------------------------------------------------------
 .../operators/ApexParDoOperator.java            |   2 +-
 .../apache/beam/runners/core/DoFnRunners.java   | 137 +------------------
 .../beam/runners/direct/ParDoEvaluator.java     |   9 +-
 .../wrappers/streaming/DoFnOperator.java        |   2 +-
 .../beam/runners/dataflow/util/DoFnInfo.java    |  62 ++++-----
 .../runners/spark/translation/DoFnFunction.java |  11 +-
 .../spark/translation/MultiDoFnFunction.java    |   9 +-
 .../sdk/transforms/reflect/DoFnInvokers.java    |  17 +--
 8 files changed, 55 insertions(+), 194 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/2b26ec89/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
index 1a3387c..de4c15d 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
@@ -305,7 +305,7 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator
implements
       sideOutputPortMapping.put(sideOutputTags.get(i), port);
     }
 
-    DoFnRunner<InputT, OutputT> doFnRunner = DoFnRunners.createDefault(
+    DoFnRunner<InputT, OutputT> doFnRunner = DoFnRunners.simpleRunner(
         pipelineOptions.get(),
         doFn,
         sideInputReader,

http://git-wip-us.apache.org/repos/asf/beam/blob/2b26ec89/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
index 820bfcd..2f3e93c 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
@@ -18,9 +18,7 @@
 package org.apache.beam.runners.core;
 
 import java.util.List;
-import org.apache.beam.runners.core.DoFnRunner.ReduceFnExecutor;
 import org.apache.beam.runners.core.ExecutionContext.StepContext;
-import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupAlsoByWindow;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -53,7 +51,7 @@ public class DoFnRunners {
    * compressed {@link WindowedValue}. It is the responsibility of the runner to perform
any key
    * partitioning needed, etc.
    */
-  static <InputT, OutputT> DoFnRunner<InputT, OutputT> simpleRunner(
+  public static <InputT, OutputT> DoFnRunner<InputT, OutputT> simpleRunner(
       PipelineOptions options,
       DoFn<InputT, OutputT> fn,
       SideInputReader sideInputReader,
@@ -119,137 +117,4 @@ public class DoFnRunners {
         stepContext.timerInternals(),
         droppedDueToLatenessAggregator);
   }
-
-  /**
-   * Creates a {@link DoFnRunner} for the provided {@link DoFn}.
-   */
-  public static <InputT, OutputT> DoFnRunner<InputT, OutputT> createDefault(
-      PipelineOptions options,
-      DoFn<InputT, OutputT> doFn,
-      SideInputReader sideInputReader,
-      OutputManager outputManager,
-      TupleTag<OutputT> mainOutputTag,
-      List<TupleTag<?>> sideOutputTags,
-      StepContext stepContext,
-      AggregatorFactory aggregatorFactory,
-      WindowingStrategy<?, ?> windowingStrategy) {
-
-    // Unlike for OldDoFn, there is no ReduceFnExecutor that is a new DoFn,
-    // and window-exploded processing is achieved within the simple runner
-    return simpleRunner(
-        options,
-        doFn,
-        sideInputReader,
-        outputManager,
-        mainOutputTag,
-        sideOutputTags,
-        stepContext,
-        aggregatorFactory,
-        windowingStrategy);
-  }
-
-  /**
-   * Creates a {@link DoFnRunner} for the provided {@link OldDoFn}.
-   *
-   * <p>In particular, if the {@link OldDoFn} is a {@link ReduceFnExecutor}, a specialized
-   * implementation detail of streaming {@link GroupAlsoByWindow}, then it will create a
special
-   * runner that operates on {@link KeyedWorkItem KeyedWorkItems}, drops late data and counts
-   * dropped elements.
-   *
-   * @deprecated please port uses of {@link OldDoFn} to use {@link DoFn}
-   */
-  @Deprecated
-  public static <InputT, OutputT> DoFnRunner<InputT, OutputT> createDefault(
-      PipelineOptions options,
-      OldDoFn<InputT, OutputT> doFn,
-      SideInputReader sideInputReader,
-      OutputManager outputManager,
-      TupleTag<OutputT> mainOutputTag,
-      List<TupleTag<?>> sideOutputTags,
-      StepContext stepContext,
-      AggregatorFactory aggregatorFactory,
-      WindowingStrategy<?, ?> windowingStrategy) {
-
-    DoFnRunner<InputT, OutputT> doFnRunner = simpleRunner(
-        options,
-        doFn,
-        sideInputReader,
-        outputManager,
-        mainOutputTag,
-        sideOutputTags,
-        stepContext,
-        aggregatorFactory,
-        windowingStrategy);
-
-    if (!(doFn instanceof ReduceFnExecutor)) {
-      return doFnRunner;
-    } else {
-      // When a DoFn is a ReduceFnExecutor, we know it has to have an aggregator for dropped
-      // elements and we also learn that for some K and V,
-      //   InputT = KeyedWorkItem<K, V>
-      //   OutputT = KV<K, V>
-
-      Aggregator<Long, Long> droppedDueToLatenessAggregator =
-          ((ReduceFnExecutor<?, ?, ?, ?>) doFn).getDroppedDueToLatenessAggregator();
-
-      @SuppressWarnings({"unchecked", "cast", "rawtypes"})
-      DoFnRunner<InputT, OutputT> runner = (DoFnRunner<InputT, OutputT>) lateDataDroppingRunner(
-          (DoFnRunner) doFnRunner,
-          stepContext,
-          (WindowingStrategy) windowingStrategy,
-          droppedDueToLatenessAggregator);
-
-      return runner;
-    }
-  }
-
-  /**
-   * Creates the right kind of {@link DoFnRunner} for an object that can be either a {@link
DoFn} or
-   * {@link OldDoFn}. This can be used so that the client need not explicitly reference either
such
-   * class, but merely deserialize a payload and pass it to this method.
-   *
-   * @deprecated for migration purposes only for services where users may still submit either
{@link
-   *     OldDoFn} or {@link DoFn}. If you know that you have a {@link DoFn} then you should
use the
-   *     variant for that instead.
-   */
-  @Deprecated
-  public static <InputT, OutputT> DoFnRunner<InputT, OutputT> createDefault(
-      PipelineOptions options,
-      Object deserializedFn,
-      SideInputReader sideInputReader,
-      OutputManager outputManager,
-      TupleTag<OutputT> mainOutputTag,
-      List<TupleTag<?>> sideOutputTags,
-      StepContext stepContext,
-      AggregatorFactory aggregatorFactory,
-      WindowingStrategy<?, ?> windowingStrategy) {
-    if (deserializedFn instanceof DoFn) {
-      return createDefault(
-          options,
-          (DoFn) deserializedFn,
-          sideInputReader,
-          outputManager,
-          mainOutputTag,
-          sideOutputTags,
-          stepContext,
-          aggregatorFactory,
-          windowingStrategy);
-    } else if (deserializedFn instanceof OldDoFn) {
-      return createDefault(
-          options,
-          (OldDoFn) deserializedFn,
-          sideInputReader,
-          outputManager,
-          mainOutputTag,
-          sideOutputTags,
-          stepContext,
-          aggregatorFactory,
-          windowingStrategy);
-    } else {
-      throw new IllegalArgumentException(String.format("Cannot create %s for %s of class
%s",
-          DoFnRunner.class.getSimpleName(),
-          deserializedFn,
-          deserializedFn.getClass()));
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/2b26ec89/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
index e146470..97d5360 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
@@ -18,7 +18,6 @@
 package org.apache.beam.runners.direct;
 
 import com.google.common.collect.ImmutableList;
-import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -30,6 +29,7 @@ import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
 import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext;
 import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.ReadyCheckingSideInputReader;
 import org.apache.beam.sdk.util.TimerInternals.TimerData;
@@ -47,7 +47,7 @@ class ParDoEvaluator<InputT, OutputT> implements TransformEvaluator<InputT>
{
       DirectStepContext stepContext,
       AppliedPTransform<?, ?, ?> application,
       WindowingStrategy<?, ? extends BoundedWindow> windowingStrategy,
-      Serializable fn, // may be OldDoFn or DoFn
+      DoFn<InputT, OutputT> fn,
       StructuralKey<?> key,
       List<PCollectionView<?>> sideInputs,
       TupleTag<OutputT> mainOutputTag,
@@ -72,8 +72,11 @@ class ParDoEvaluator<InputT, OutputT> implements TransformEvaluator<InputT>
{
 
     ReadyCheckingSideInputReader sideInputReader =
         evaluationContext.createSideInputReader(sideInputs);
+
+    // Unlike for OldDoFn, there is no ReduceFnExecutor that is a new DoFn,
+    // and window-exploded processing is achieved within the simple runner
     DoFnRunner<InputT, OutputT> underlying =
-        DoFnRunners.createDefault(
+        DoFnRunners.simpleRunner(
             evaluationContext.getPipelineOptions(),
             fn,
             sideInputReader,

http://git-wip-us.apache.org/repos/asf/beam/blob/2b26ec89/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index 95f2bfd..90cdf4c 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -244,7 +244,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
       sideInputReader = sideInputHandler;
     }
 
-    DoFnRunner<InputT, FnOutputT> doFnRunner = DoFnRunners.createDefault(
+    DoFnRunner<InputT, FnOutputT> doFnRunner = DoFnRunners.simpleRunner(
         serializedOptions.getPipelineOptions(),
         oldDoFn,
         sideInputReader,

http://git-wip-us.apache.org/repos/asf/beam/blob/2b26ec89/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java
index b84def8..0c5be90 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java
@@ -17,8 +17,6 @@
  */
 package org.apache.beam.runners.dataflow.util;
 
-import static com.google.common.base.Preconditions.checkState;
-
 import java.io.Serializable;
 import java.util.Map;
 import org.apache.beam.sdk.coders.Coder;
@@ -29,14 +27,13 @@ import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 
 /**
- * Wrapper class holding the necessary information to serialize a {@link OldDoFn}
- * or {@link DoFn}.
+ * Wrapper class holding the necessary information to serialize a {@link DoFn}.
  *
- * @param <InputT> the type of the (main) input elements of the {@link OldDoFn}
- * @param <OutputT> the type of the (main) output elements of the {@link OldDoFn}
+ * @param <InputT> the type of the (main) input elements of the {@link DoFn}
+ * @param <OutputT> the type of the (main) output elements of the {@link DoFn}
  */
 public class DoFnInfo<InputT, OutputT> implements Serializable {
-  private final Serializable doFn;
+  private final DoFn<InputT, OutputT> doFn;
   private final WindowingStrategy<?, ?> windowingStrategy;
   private final Iterable<PCollectionView<?>> sideInputViews;
   private final Coder<InputT> inputCoder;
@@ -48,17 +45,37 @@ public class DoFnInfo<InputT, OutputT> implements Serializable {
    * {@link DoFn} or {@link OldDoFn} or other context-appropriate UDF blob.
    */
   public static <InputT, OutputT> DoFnInfo<InputT, OutputT> forFn(
+      DoFn<InputT, OutputT> doFn,
+      WindowingStrategy<?, ?> windowingStrategy,
+      Iterable<PCollectionView<?>> sideInputViews,
+      Coder<InputT> inputCoder,
+      long mainOutput,
+      Map<Long, TupleTag<?>> outputMap) {
+    return new DoFnInfo<>(
+        doFn, windowingStrategy, sideInputViews, inputCoder, mainOutput, outputMap);
+  }
+
+  /** TODO: remove this when Dataflow worker uses the DoFn overload. */
+  @Deprecated
+  @SuppressWarnings("unchecked")
+  public static <InputT, OutputT> DoFnInfo<InputT, OutputT> forFn(
       Serializable doFn,
       WindowingStrategy<?, ?> windowingStrategy,
       Iterable<PCollectionView<?>> sideInputViews,
       Coder<InputT> inputCoder,
       long mainOutput,
       Map<Long, TupleTag<?>> outputMap) {
-    return new DoFnInfo(doFn, windowingStrategy, sideInputViews, inputCoder, mainOutput,
outputMap);
+    return forFn(
+        (DoFn<InputT, OutputT>) doFn,
+        windowingStrategy,
+        sideInputViews,
+        inputCoder,
+        mainOutput,
+        outputMap);
   }
 
   private DoFnInfo(
-      Serializable doFn,
+      DoFn<InputT, OutputT> doFn,
       WindowingStrategy<?, ?> windowingStrategy,
       Iterable<PCollectionView<?>> sideInputViews,
       Coder<InputT> inputCoder,
@@ -72,34 +89,15 @@ public class DoFnInfo<InputT, OutputT> implements Serializable {
     this.outputMap = outputMap;
   }
 
-  /**
-   * @deprecated use {@link #forFn}.
-   */
+  /** TODO: remove this when Dataflow worker uses {@link #getDoFn}. */
   @Deprecated
-  public DoFnInfo(
-      OldDoFn doFn,
-      WindowingStrategy<?, ?> windowingStrategy,
-      Iterable<PCollectionView<?>> sideInputViews,
-      Coder<InputT> inputCoder,
-      long mainOutput,
-      Map<Long, TupleTag<?>> outputMap) {
-    this((Serializable) doFn, windowingStrategy, sideInputViews, inputCoder, mainOutput,
outputMap);
-  }
-
-  /** Returns the embedded serialized function. It may be a {@code DoFn} or {@code OldDoFn}.
*/
   public Serializable getFn() {
     return doFn;
   }
 
-  /** @deprecated use {@link #getFn()} */
-  @Deprecated
-  public OldDoFn getDoFn() {
-    checkState(
-        doFn instanceof OldDoFn,
-        "Deprecated %s.getDoFn() called when the payload was actually a new %s",
-        DoFnInfo.class.getSimpleName(),
-        DoFn.class.getSimpleName());
-    return (OldDoFn) doFn;
+  /** Returns the embedded function. */
+  public DoFn<InputT, OutputT> getDoFn() {
+    return doFn;
   }
 
   public WindowingStrategy<?, ?> getWindowingStrategy() {

http://git-wip-us.apache.org/repos/asf/beam/blob/2b26ec89/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java
index af8e089..bd6cfbe 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java
@@ -80,18 +80,21 @@ public class DoFnFunction<InputT, OutputT>
       Iterator<WindowedValue<InputT>> iter) throws Exception {
 
     DoFnOutputManager outputManager = new DoFnOutputManager();
+
+    // Unlike for OldDoFn, there is no ReduceFnExecutor that is a new DoFn,
+    // and window-exploded processing is achieved within the simple runner
     DoFnRunner<InputT, OutputT> doFnRunner =
-        DoFnRunners.createDefault(
+        DoFnRunners.simpleRunner(
             runtimeContext.getPipelineOptions(),
             doFn,
             new SparkSideInputReader(sideInputs),
             outputManager,
-            new TupleTag<OutputT>() {},
+            new TupleTag<OutputT>() {
+            },
             Collections.<TupleTag<?>>emptyList(),
             new SparkProcessContext.NoOpStepContext(),
             new SparkAggregators.Factory(runtimeContext, accumulator),
-            windowingStrategy
-        );
+            windowingStrategy);
 
     return new SparkProcessContext<>(doFn, doFnRunner, outputManager).processPartition(iter);
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/2b26ec89/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
index 0f9417a..cceffc8 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
@@ -38,7 +38,6 @@ import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.spark.Accumulator;
 import org.apache.spark.api.java.function.PairFlatMapFunction;
-
 import scala.Tuple2;
 
 
@@ -88,8 +87,11 @@ public class MultiDoFnFunction<InputT, OutputT>
       Iterator<WindowedValue<InputT>> iter) throws Exception {
 
     DoFnOutputManager outputManager = new DoFnOutputManager();
+
+    // Unlike for OldDoFn, there is no ReduceFnExecutor that is a new DoFn,
+    // and window-exploded processing is achieved within the simple runner
     DoFnRunner<InputT, OutputT> doFnRunner =
-        DoFnRunners.createDefault(
+        DoFnRunners.simpleRunner(
             runtimeContext.getPipelineOptions(),
             doFn,
             new SparkSideInputReader(sideInputs),
@@ -98,8 +100,7 @@ public class MultiDoFnFunction<InputT, OutputT>
             Collections.<TupleTag<?>>emptyList(),
             new SparkProcessContext.NoOpStepContext(),
             new SparkAggregators.Factory(runtimeContext, accumulator),
-            windowingStrategy
-        );
+            windowingStrategy);
 
     return new SparkProcessContext<>(doFn, doFnRunner, outputManager).processPartition(iter);
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/2b26ec89/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
index b141d51..33c5a6a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
@@ -36,21 +36,12 @@ public class DoFnInvokers {
     return ByteBuddyDoFnInvokerFactory.only().newByteBuddyInvoker(fn);
   }
 
-  /**
-   * Temporarily retained for compatibility with Dataflow worker.
-   * TODO: delete this when Dataflow worker is fixed to call {@link #invokerFor(DoFn)}.
-   *
-   * @deprecated Use {@link #invokerFor(DoFn)}.
-   */
-  @SuppressWarnings("unchecked")
+  /** TODO: remove this when Dataflow worker uses the DoFn overload. */
   @Deprecated
+  @SuppressWarnings({"unchecked"})
   public static <InputT, OutputT> DoFnInvoker<InputT, OutputT> invokerFor(
-      Serializable deserializedFn) {
-    if (deserializedFn instanceof DoFn) {
-      return invokerFor((DoFn<InputT, OutputT>) deserializedFn);
-    }
-    throw new UnsupportedOperationException(
-        "Only DoFn supported, was: " + deserializedFn.getClass());
+      Serializable fn) {
+    return invokerFor((DoFn) fn);
   }
 
   private DoFnInvokers() {}


Mime
View raw message