beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [1/7] incubator-beam git commit: Add access to original DoFn on ParDo
Date Fri, 21 Oct 2016 02:53:09 GMT
Repository: incubator-beam
Updated Branches:
  refs/heads/master 465c7d1f1 -> 153351126


Add access to original DoFn on ParDo


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0db32631
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0db32631
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0db32631

Branch: refs/heads/master
Commit: 0db32631238b5ec5cacf91b8ecb81e8fda99963a
Parents: 44878e5
Author: Kenneth Knowles <klk@google.com>
Authored: Wed Oct 19 20:47:26 2016 -0700
Committer: Kenneth Knowles <klk@google.com>
Committed: Thu Oct 20 18:32:06 2016 -0700

----------------------------------------------------------------------
 .../beam/runners/core/SimpleDoFnRunner.java     |   8 +-
 .../org/apache/beam/sdk/transforms/ParDo.java   | 248 ++++++++++++-------
 2 files changed, 161 insertions(+), 95 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0db32631/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index d5ecbc9..4eda376 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -79,7 +79,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT,
Out
 
   private final TupleTag<OutputT> mainOutputTag;
 
-  private final boolean ignoresWindow;
+  private final boolean observesWindow;
 
   public SimpleDoFnRunner(
       PipelineOptions options,
@@ -92,8 +92,8 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT,
Out
       AggregatorFactory aggregatorFactory,
       WindowingStrategy<?, ?> windowingStrategy) {
     this.fn = fn;
-    this.ignoresWindow =
-        !DoFnSignatures.INSTANCE.getSignature(fn.getClass()).processElement().usesSingleWindow();
+    this.observesWindow =
+        DoFnSignatures.INSTANCE.getSignature(fn.getClass()).processElement().observesWindow();
     this.invoker = DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn);
     this.outputManager = outputManager;
     this.mainOutputTag = mainOutputTag;
@@ -123,7 +123,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT,
Out
 
   @Override
   public void processElement(WindowedValue<InputT> compressedElem) {
-    if (ignoresWindow) {
+    if (observesWindow) {
       invokeProcessElement(compressedElem);
     } else {
       for (WindowedValue<InputT> elem : compressedElem.explodeWindows()) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0db32631/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
index 93eb1ac..a3a306a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
@@ -27,8 +27,10 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.transforms.OldDoFn.RequiresWindowAccess;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.DisplayData.Builder;
+import org.apache.beam.sdk.transforms.display.HasDisplayData;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
@@ -521,7 +523,7 @@ public class ParDo {
    */
   public static <InputT, OutputT> Bound<InputT, OutputT> of(DoFn<InputT, OutputT>
fn) {
     validate(fn);
-    return of(adapt(fn), displayDataForFn(fn));
+    return new Unbound().of(fn, displayDataForFn(fn));
   }
 
   /**
@@ -538,17 +540,11 @@ public class ParDo {
    */
   @Deprecated
   public static <InputT, OutputT> Bound<InputT, OutputT> of(OldDoFn<InputT,
OutputT> fn) {
-    return of(fn, displayDataForFn(fn));
-  }
-
-  private static <InputT, OutputT> Bound<InputT, OutputT> of(
-          OldDoFn<InputT, OutputT> fn, DisplayData.ItemSpec<? extends Class<?>>
fnDisplayData) {
-    return new Unbound().of(fn, fnDisplayData);
+    return new Unbound().of(fn, displayDataForFn(fn));
   }
 
   private static <T> DisplayData.ItemSpec<? extends Class<?>> displayDataForFn(T
fn) {
-    return DisplayData.item("fn", fn.getClass())
-        .withLabel("Transform Function");
+    return DisplayData.item("fn", fn.getClass()).withLabel("Transform Function");
   }
 
   /**
@@ -647,21 +643,6 @@ public class ParDo {
     }
 
     /**
-     * Returns a new multi-output {@link ParDo} transform that's like
-     * this transform but with the specified main and side output
-     * tags. Does not modify this transform. The resulting transform
-     * is still incomplete.
-     *
-     * <p>See the discussion of Side Outputs above and on
-     * {@link ParDo#withOutputTags} for more explanation.
-     */
-    public <OutputT> UnboundMulti<OutputT> withOutputTags(TupleTag<OutputT>
mainOutputTag,
-                                              TupleTagList sideOutputTags) {
-      return new UnboundMulti<>(
-          name, sideInputs, mainOutputTag, sideOutputTags);
-    }
-
-    /**
      * Returns a new {@link ParDo} {@link PTransform} that's like this
      * transform but which will invoke the given {@link DoFn}
      * function, and which has its input and output types bound. Does
@@ -671,7 +652,20 @@ public class ParDo {
      */
     public <InputT, OutputT> Bound<InputT, OutputT> of(DoFn<InputT, OutputT>
fn) {
       validate(fn);
-      return of(adapt(fn), displayDataForFn(fn));
+      return of(fn, displayDataForFn(fn));
+    }
+
+    /**
+     * Returns a new multi-output {@link ParDo} transform that's like this transform but
with the
+     * specified main and side output tags. Does not modify this transform. The resulting
transform
+     * is still incomplete.
+     *
+     * <p>See the discussion of Side Outputs above and on {@link ParDo#withOutputTags}
for more
+     * explanation.
+     */
+    public <OutputT> UnboundMulti<OutputT> withOutputTags(
+        TupleTag<OutputT> mainOutputTag, TupleTagList sideOutputTags) {
+      return new UnboundMulti<>(name, sideInputs, mainOutputTag, sideOutputTags);
     }
 
     /**
@@ -685,13 +679,13 @@ public class ParDo {
      * @deprecated please port your {@link OldDoFn} to a {@link DoFn}
      */
     @Deprecated
-    public <InputT, OutputT> Bound<InputT, OutputT> of(OldDoFn<InputT, OutputT>
fn) {
-      return of(fn, displayDataForFn(fn));
+    public <InputT, OutputT> Bound<InputT, OutputT> of(OldDoFn<InputT, OutputT>
oldFn) {
+      return of(oldFn, displayDataForFn(oldFn));
     }
 
     private <InputT, OutputT> Bound<InputT, OutputT> of(
-        OldDoFn<InputT, OutputT> fn, DisplayData.ItemSpec<? extends Class<?>>
fnDisplayData) {
-      return new Bound<>(name, sideInputs, fn, fnDisplayData);
+        Serializable originalFn, DisplayData.ItemSpec<? extends Class<?>> fnDisplayData)
{
+      return new Bound<>(name, originalFn, sideInputs, fnDisplayData);
     }
   }
 
@@ -711,17 +705,18 @@ public class ParDo {
       extends PTransform<PCollection<? extends InputT>, PCollection<OutputT>>
{
     // Inherits name.
     private final List<PCollectionView<?>> sideInputs;
-    private final OldDoFn<InputT, OutputT> fn;
+    private final Serializable fn;
     private final DisplayData.ItemSpec<? extends Class<?>> fnDisplayData;
 
-    Bound(String name,
-          List<PCollectionView<?>> sideInputs,
-          OldDoFn<InputT, OutputT> fn,
-          DisplayData.ItemSpec<? extends Class<?>> fnDisplayData) {
+    Bound(
+        String name,
+        Serializable fn,
+        List<PCollectionView<?>> sideInputs,
+        DisplayData.ItemSpec<? extends Class<?>> fnDisplayData) {
       super(name);
-      this.sideInputs = sideInputs;
       this.fn = SerializableUtils.clone(fn);
       this.fnDisplayData = fnDisplayData;
+      this.sideInputs = sideInputs;
     }
 
     /**
@@ -746,35 +741,38 @@ public class ParDo {
      */
     public Bound<InputT, OutputT> withSideInputs(
         Iterable<? extends PCollectionView<?>> sideInputs) {
-      ImmutableList.Builder<PCollectionView<?>> builder = ImmutableList.builder();
-      builder.addAll(this.sideInputs);
-      builder.addAll(sideInputs);
-      return new Bound<>(name, builder.build(), fn, fnDisplayData);
+      return new Bound<>(
+          name,
+          fn,
+          ImmutableList.<PCollectionView<?>>builder()
+              .addAll(this.sideInputs)
+              .addAll(sideInputs)
+              .build(),
+          fnDisplayData);
     }
 
     /**
-     * Returns a new multi-output {@link ParDo} {@link PTransform}
-     * that's like this {@link PTransform} but with the specified main
-     * and side output tags. Does not modify this {@link PTransform}.
+     * Returns a new multi-output {@link ParDo} {@link PTransform} that's like this {@link
+     * PTransform} but with the specified main and side output tags. Does not modify this
{@link
+     * PTransform}.
      *
-     * <p>See the discussion of Side Outputs above and on
-     * {@link ParDo#withOutputTags} for more explanation.
+     * <p>See the discussion of Side Outputs above and on {@link ParDo#withOutputTags}
for more
+     * explanation.
      */
-    public BoundMulti<InputT, OutputT> withOutputTags(TupleTag<OutputT> mainOutputTag,
-                                           TupleTagList sideOutputTags) {
-      return new BoundMulti<>(
-          name, sideInputs, mainOutputTag, sideOutputTags, fn, fnDisplayData);
+    public BoundMulti<InputT, OutputT> withOutputTags(
+        TupleTag<OutputT> mainOutputTag, TupleTagList sideOutputTags) {
+      return new BoundMulti<>(name, fn, sideInputs, mainOutputTag, sideOutputTags,
fnDisplayData);
     }
 
     @Override
     public PCollection<OutputT> apply(PCollection<? extends InputT> input) {
       checkArgument(
-          !isSplittable(fn), "Splittable DoFn not supported by the current runner");
+          !isSplittable(getOldFn()), "Splittable DoFn not supported by the current runner");
       return PCollection.<OutputT>createPrimitiveOutputInternal(
               input.getPipeline(),
               input.getWindowingStrategy(),
               input.isBounded())
-          .setTypeDescriptorInternal(fn.getOutputTypeDescriptor());
+          .setTypeDescriptorInternal(getOldFn().getOutputTypeDescriptor());
     }
 
     @Override
@@ -782,14 +780,14 @@ public class ParDo {
     protected Coder<OutputT> getDefaultOutputCoder(PCollection<? extends InputT>
input)
         throws CannotProvideCoderException {
       return input.getPipeline().getCoderRegistry().getDefaultCoder(
-          fn.getOutputTypeDescriptor(),
-          fn.getInputTypeDescriptor(),
+          getOldFn().getOutputTypeDescriptor(),
+          getOldFn().getInputTypeDescriptor(),
           ((PCollection<InputT>) input).getCoder());
     }
 
     @Override
     protected String getKindString() {
-      Class<?> clazz = DoFnAdapters.getDoFnClass(fn);
+      Class<?> clazz = DoFnAdapters.getDoFnClass(getOldFn());
       if (clazz.isAnonymousClass()) {
         return "AnonymousParDo";
       } else {
@@ -807,10 +805,47 @@ public class ParDo {
     @Override
     public void populateDisplayData(Builder builder) {
       super.populateDisplayData(builder);
-      ParDo.populateDisplayData(builder, fn, fnDisplayData);
+      ParDo.populateDisplayData(builder, (HasDisplayData) fn, fnDisplayData);
     }
 
+    /**
+     * @deprecated this method to be converted to return {@link DoFn}. If you want to receive
+     * an {@link OldDoFn} you should (temporarily) use {@link #getOldFn}.
+     */
+    @Deprecated
     public OldDoFn<InputT, OutputT> getFn() {
+      return getOldFn();
+    }
+
+    /**
+     * @deprecated please migrate to {@link #getNewFn} until {@link #getFn} is migrated to
return
+     * a {@link DoFn}.
+     */
+    @Deprecated
+    public OldDoFn<InputT, OutputT> getOldFn() {
+      if (fn instanceof OldDoFn) {
+        return (OldDoFn<InputT, OutputT>) fn;
+      } else {
+        return adapt((DoFn<InputT, OutputT>) fn);
+      }
+    }
+
+    public DoFn<InputT, OutputT> getNewFn() {
+      if (fn instanceof DoFn) {
+        return (DoFn<InputT, OutputT>) fn;
+      } else {
+        return ((OldDoFn<InputT, OutputT>) fn).toDoFn();
+      }
+    }
+
+    /**
+     * Returns the {@link OldDoFn} or {@link DoFn} used to create this transform.
+     *
+     * @deprecated for migration purposes only. There are some cases of {@link OldDoFn} that
are not
+     *     fully supported by wrapping it into a {@link DoFn}, such as {@link RequiresWindowAccess}.
+     */
+    @Deprecated
+    public Object getOriginalFn() {
       return fn;
     }
 
@@ -870,12 +905,14 @@ public class ParDo {
      */
     public UnboundMulti<OutputT> withSideInputs(
         Iterable<? extends PCollectionView<?>> sideInputs) {
-      ImmutableList.Builder<PCollectionView<?>> builder = ImmutableList.builder();
-      builder.addAll(this.sideInputs);
-      builder.addAll(sideInputs);
       return new UnboundMulti<>(
-          name, builder.build(),
-          mainOutputTag, sideOutputTags);
+          name,
+          ImmutableList.<PCollectionView<?>>builder()
+              .addAll(this.sideInputs)
+              .addAll(sideInputs)
+              .build(),
+          mainOutputTag,
+          sideOutputTags);
     }
 
     /**
@@ -888,7 +925,7 @@ public class ParDo {
      */
     public <InputT> BoundMulti<InputT, OutputT> of(DoFn<InputT, OutputT>
fn) {
       validate(fn);
-      return of(adapt(fn), displayDataForFn(fn));
+      return of(fn, displayDataForFn(fn));
     }
 
     /**
@@ -906,10 +943,9 @@ public class ParDo {
       return of(fn, displayDataForFn(fn));
     }
 
-    private <InputT> BoundMulti<InputT, OutputT> of(OldDoFn<InputT, OutputT>
fn,
-        DisplayData.ItemSpec<? extends Class<?>> fnDisplayData) {
-      return new BoundMulti<>(
-              name, sideInputs, mainOutputTag, sideOutputTags, fn, fnDisplayData);
+    private <InputT> BoundMulti<InputT, OutputT> of(
+        Serializable fn, DisplayData.ItemSpec<? extends Class<?>> fnDisplayData)
{
+      return new BoundMulti<>(name, fn, sideInputs, mainOutputTag, sideOutputTags,
fnDisplayData);
     }
   }
 
@@ -930,15 +966,16 @@ public class ParDo {
     private final List<PCollectionView<?>> sideInputs;
     private final TupleTag<OutputT> mainOutputTag;
     private final TupleTagList sideOutputTags;
-    private final OldDoFn<InputT, OutputT> fn;
     private final DisplayData.ItemSpec<? extends Class<?>> fnDisplayData;
+    private final Serializable fn;
 
-    BoundMulti(String name,
-               List<PCollectionView<?>> sideInputs,
-               TupleTag<OutputT> mainOutputTag,
-               TupleTagList sideOutputTags,
-               OldDoFn<InputT, OutputT> fn,
-               DisplayData.ItemSpec<? extends Class<?>> fnDisplayData) {
+    BoundMulti(
+        String name,
+        Serializable fn,
+        List<PCollectionView<?>> sideInputs,
+        TupleTag<OutputT> mainOutputTag,
+        TupleTagList sideOutputTags,
+        DisplayData.ItemSpec<? extends Class<?>> fnDisplayData) {
       super(name);
       this.sideInputs = sideInputs;
       this.mainOutputTag = mainOutputTag;
@@ -961,28 +998,32 @@ public class ParDo {
     }
 
     /**
-     * Returns a new multi-output {@link ParDo} {@link PTransform}
-     * that's like this {@link PTransform} but with the specified additional side
-     * inputs. Does not modify this {@link PTransform}.
+     * Returns a new multi-output {@link ParDo} {@link PTransform} that's like this {@link
+     * PTransform} but with the specified additional side inputs. Does not modify this {@link
+     * PTransform}.
      *
-     * <p>See the discussion of Side Inputs above and on
-     * {@link ParDo#withSideInputs} for more explanation.
+     * <p>See the discussion of Side Inputs above and on {@link ParDo#withSideInputs}
for more
+     * explanation.
      */
     public BoundMulti<InputT, OutputT> withSideInputs(
         Iterable<? extends PCollectionView<?>> sideInputs) {
-      ImmutableList.Builder<PCollectionView<?>> builder = ImmutableList.builder();
-      builder.addAll(this.sideInputs);
-      builder.addAll(sideInputs);
       return new BoundMulti<>(
-          name, builder.build(),
-          mainOutputTag, sideOutputTags, fn, fnDisplayData);
+          name,
+          fn,
+          ImmutableList.<PCollectionView<?>>builder()
+              .addAll(this.sideInputs)
+              .addAll(sideInputs)
+              .build(),
+          mainOutputTag,
+          sideOutputTags,
+          fnDisplayData);
     }
 
 
     @Override
     public PCollectionTuple apply(PCollection<? extends InputT> input) {
       checkArgument(
-          !isSplittable(fn), "Splittable DoFn not supported by the current runner");
+          !isSplittable(getOldFn()), "Splittable DoFn not supported by the current runner");
 
       PCollectionTuple outputs = PCollectionTuple.ofPrimitiveOutputsInternal(
           input.getPipeline(),
@@ -993,7 +1034,7 @@ public class ParDo {
       // The fn will likely be an instance of an anonymous subclass
       // such as DoFn<Integer, String> { }, thus will have a high-fidelity
       // TypeDescriptor for the output type.
-      outputs.get(mainOutputTag).setTypeDescriptorInternal(fn.getOutputTypeDescriptor());
+      outputs.get(mainOutputTag).setTypeDescriptorInternal(getOldFn().getOutputTypeDescriptor());
 
       return outputs;
     }
@@ -1012,13 +1053,13 @@ public class ParDo {
       Coder<InputT> inputCoder = ((PCollection<InputT>) input).getCoder();
       return input.getPipeline().getCoderRegistry().getDefaultCoder(
           output.getTypeDescriptor(),
-          fn.getInputTypeDescriptor(),
+          getOldFn().getInputTypeDescriptor(),
           inputCoder);
       }
 
     @Override
     protected String getKindString() {
-      Class<?> clazz = DoFnAdapters.getDoFnClass(fn);
+      Class<?> clazz = DoFnAdapters.getDoFnClass(getOldFn());
       if (clazz.isAnonymousClass()) {
         return "AnonymousParMultiDo";
       } else {
@@ -1029,11 +1070,37 @@ public class ParDo {
     @Override
     public void populateDisplayData(Builder builder) {
       super.populateDisplayData(builder);
-      ParDo.populateDisplayData(builder, fn, fnDisplayData);
+      ParDo.populateDisplayData(builder, (HasDisplayData) fn, fnDisplayData);
     }
 
+    /**
+     * @deprecated this method to be converted to return {@link DoFn}. If you want to receive
+     * an {@link OldDoFn} you should (temporarily) use {@link #getOldFn}.
+     */
+    @Deprecated
     public OldDoFn<InputT, OutputT> getFn() {
-      return fn;
+      return getOldFn();
+    }
+
+    /**
+     * @deprecated please migrate to {@link #getNewFn} until {@link #getFn} is migrated to
return
+     * a {@link DoFn}.
+     */
+    @Deprecated
+    public OldDoFn<InputT, OutputT> getOldFn() {
+      if (fn instanceof OldDoFn) {
+        return (OldDoFn<InputT, OutputT>) fn;
+      } else {
+        return adapt((DoFn<InputT, OutputT>) fn);
+      }
+    }
+
+    public DoFn<InputT, OutputT> getNewFn() {
+      if (fn instanceof DoFn) {
+        return (DoFn<InputT, OutputT>) fn;
+      } else {
+        return ((OldDoFn<InputT, OutputT>) fn).toDoFn();
+      }
     }
 
     public TupleTag<OutputT> getMainOutputTag() {
@@ -1050,11 +1117,10 @@ public class ParDo {
   }
 
   private static void populateDisplayData(
-      DisplayData.Builder builder, OldDoFn<?, ?> fn,
+      DisplayData.Builder builder,
+      HasDisplayData fn,
       DisplayData.ItemSpec<? extends Class<?>> fnDisplayData) {
-    builder
-        .include("fn", fn)
-        .add(fnDisplayData);
+    builder.include("fn", fn).add(fnDisplayData);
   }
 
   private static boolean isSplittable(OldDoFn<?, ?> oldDoFn) {


Mime
View raw message