beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t..@apache.org
Subject [39/50] incubator-beam git commit: Make DoFnInfo carry OldDoFn or DoFn
Date Fri, 28 Oct 2016 14:48:13 GMT
Make DoFnInfo carry OldDoFn or DoFn

This will allow consumers to prepare to accept
DoFn while still accepting existing jobs that
use OldDoFn. It is a move towards treating
the Fn itself as just a serialized blob.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/73db5608
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/73db5608
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/73db5608

Branch: refs/heads/apex-runner
Commit: 73db5608a58ff64a0b452140736a150f973986b8
Parents: 95bf7a8
Author: Kenneth Knowles <klk@google.com>
Authored: Mon Oct 24 15:11:12 2016 -0700
Committer: Kenneth Knowles <klk@google.com>
Committed: Thu Oct 27 10:48:34 2016 -0700

----------------------------------------------------------------------
 .../beam/runners/dataflow/util/DoFnInfo.java    | 43 ++++++++++++++++++--
 1 file changed, 39 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/73db5608/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java
index b211c04..bfa12e2 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java
@@ -17,29 +17,38 @@
  */
 package org.apache.beam.runners.dataflow.util;
 
+import static com.google.common.base.Preconditions.checkState;
+
 import java.io.Serializable;
 import java.util.Map;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 
 /**
- * Wrapper class holding the necessary information to serialize a {@link OldDoFn}.
+ * Wrapper class holding the necessary information to serialize a {@link OldDoFn}
+ * or {@link DoFn}.
  *
  * @param <InputT> the type of the (main) input elements of the {@link OldDoFn}
  * @param <OutputT> the type of the (main) output elements of the {@link OldDoFn}
  */
 public class DoFnInfo<InputT, OutputT> implements Serializable {
-  private final OldDoFn<InputT, OutputT> doFn;
+  private final Serializable doFn;
   private final WindowingStrategy<?, ?> windowingStrategy;
   private final Iterable<PCollectionView<?>> sideInputViews;
   private final Coder<InputT> inputCoder;
   private final long mainOutput;
   private final Map<Long, TupleTag<?>> outputMap;
 
-  public DoFnInfo(OldDoFn<InputT, OutputT> doFn,
+  /**
+   * Creates a {@link DoFnInfo} for the given {@link DoFn} or {@link OldDoFn} and auxiliary
bits and
+   * pieces.
+   */
+  public DoFnInfo(
+      Serializable doFn,
       WindowingStrategy<?, ?> windowingStrategy,
       Iterable<PCollectionView<?>> sideInputViews,
       Coder<InputT> inputCoder,
@@ -53,10 +62,36 @@ public class DoFnInfo<InputT, OutputT> implements Serializable {
     this.outputMap = outputMap;
   }
 
-  public OldDoFn<InputT, OutputT> getDoFn() {
+  /**
+   * @deprecated call the constructor with a {@link Serializable}
+   */
+  @Deprecated
+  public DoFnInfo(
+      OldDoFn doFn,
+      WindowingStrategy<?, ?> windowingStrategy,
+      Iterable<PCollectionView<?>> sideInputViews,
+      Coder<InputT> inputCoder,
+      long mainOutput,
+      Map<Long, TupleTag<?>> outputMap) {
+    this((Serializable) doFn, windowingStrategy, sideInputViews, inputCoder, mainOutput,
outputMap);
+  }
+
+  /** Returns the embedded serialized function. It may be a {@code DoFn} or {@code OldDoFn}.
*/
+  public Serializable getFn() {
     return doFn;
   }
 
+  /** @deprecated use {@link #getFn()} */
+  @Deprecated
+  public OldDoFn getDoFn() {
+    checkState(
+        doFn instanceof OldDoFn,
+        "Deprecated %s.getDoFn() called when the payload was actually a new %s",
+        DoFnInfo.class.getSimpleName(),
+        DoFn.class.getSimpleName());
+    return (OldDoFn) doFn;
+  }
+
   public WindowingStrategy<?, ?> getWindowingStrategy() {
     return windowingStrategy;
   }


Mime
View raw message