beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lc...@apache.org
Subject [1/2] beam git commit: [BEAM-2862] Add PTransform overrides specific to execution of StatefulDoFns over the Fn API using the DataflowRunner.
Date Mon, 11 Sep 2017 22:00:36 GMT
Repository: beam
Updated Branches:
  refs/heads/master 5a995e6c3 -> 774b26819


[BEAM-2862] Add PTransform overrides specific to execution of StatefulDoFns over the Fn API
using the DataflowRunner.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/97806e3d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/97806e3d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/97806e3d

Branch: refs/heads/master
Commit: 97806e3d6442518e6b9bd9e094a3dd1081fef760
Parents: 5a995e6
Author: Luke Cwik <lcwik@google.com>
Authored: Fri Sep 8 18:26:46 2017 -0700
Committer: Luke Cwik <lcwik@google.com>
Committed: Mon Sep 11 15:00:03 2017 -0700

----------------------------------------------------------------------
 .../dataflow/BatchStatefulParDoOverrides.java   | 71 +++++++++++++++++---
 .../beam/runners/dataflow/DataflowRunner.java   |  4 +-
 .../BatchStatefulParDoOverridesTest.java        | 45 ++++++++++++-
 3 files changed, 106 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/97806e3d/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java
index 7309f61..d7e9d06 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java
@@ -19,10 +19,12 @@ package org.apache.beam.runners.dataflow;
 
 import static com.google.common.base.Preconditions.checkState;
 
+import java.util.List;
 import java.util.Map;
 import org.apache.beam.runners.core.construction.PTransformReplacements;
 import org.apache.beam.runners.core.construction.ReplacementOutputs;
 import org.apache.beam.runners.dataflow.BatchViewOverrides.GroupByKeyAndSortValuesOnly;
+import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.InstantCoder;
 import org.apache.beam.sdk.coders.KvCoder;
@@ -52,6 +54,9 @@ import org.joda.time.Instant;
  * stateful {@link ParDo} using window-unaware {@link GroupByKeyAndSortValuesOnly} to linearize
  * processing per key.
  *
+ * <p>For the Fn API, the {@link PTransformOverrideFactory} is only required to perform
+ * per key grouping and expansion.
+ *
  * <p>This implementation relies on implementation details of the Dataflow runner,
specifically
  * standard fusion behavior of {@link ParDo} tranforms following a {@link GroupByKey}.
  */
@@ -65,8 +70,8 @@ public class BatchStatefulParDoOverrides {
       PTransformOverrideFactory<
               PCollection<KV<K, InputT>>, PCollection<OutputT>,
               ParDo.SingleOutput<KV<K, InputT>, OutputT>>
-          singleOutputOverrideFactory() {
-    return new SingleOutputOverrideFactory<>();
+          singleOutputOverrideFactory(DataflowPipelineOptions options) {
+    return new SingleOutputOverrideFactory<>(isFnApi(options));
   }
 
   /**
@@ -77,8 +82,13 @@ public class BatchStatefulParDoOverrides {
       PTransformOverrideFactory<
               PCollection<KV<K, InputT>>, PCollectionTuple,
               ParDo.MultiOutput<KV<K, InputT>, OutputT>>
-          multiOutputOverrideFactory() {
-    return new MultiOutputOverrideFactory<>();
+          multiOutputOverrideFactory(DataflowPipelineOptions options) {
+    return new MultiOutputOverrideFactory<>(isFnApi(options));
+  }
+
+  private static boolean isFnApi(DataflowPipelineOptions options) {
+    List<String> experiments = options.getExperiments();
+    return experiments != null && experiments.contains("beam_fn_api");
   }
 
   private static class SingleOutputOverrideFactory<K, InputT, OutputT>
@@ -86,6 +96,11 @@ public class BatchStatefulParDoOverrides {
           PCollection<KV<K, InputT>>, PCollection<OutputT>,
           ParDo.SingleOutput<KV<K, InputT>, OutputT>> {
 
+    private final boolean isFnApi;
+    private SingleOutputOverrideFactory(boolean isFnApi) {
+      this.isFnApi = isFnApi;
+    }
+
     @Override
     public PTransformReplacement<PCollection<KV<K, InputT>>, PCollection<OutputT>>
         getReplacementTransform(
@@ -95,7 +110,7 @@ public class BatchStatefulParDoOverrides {
                 transform) {
       return PTransformReplacement.of(
           PTransformReplacements.getSingletonMainInput(transform),
-          new StatefulSingleOutputParDo<>(transform.getTransform()));
+          new StatefulSingleOutputParDo<>(transform.getTransform(), isFnApi));
     }
 
     @Override
@@ -104,11 +119,15 @@ public class BatchStatefulParDoOverrides {
       return ReplacementOutputs.singleton(outputs, newOutput);
     }
   }
-
   private static class MultiOutputOverrideFactory<K, InputT, OutputT>
       implements PTransformOverrideFactory<
           PCollection<KV<K, InputT>>, PCollectionTuple, ParDo.MultiOutput<KV<K,
InputT>, OutputT>> {
 
+    private final boolean isFnApi;
+    private MultiOutputOverrideFactory(boolean isFnApi) {
+      this.isFnApi = isFnApi;
+    }
+
     @Override
     public PTransformReplacement<PCollection<KV<K, InputT>>, PCollectionTuple>
         getReplacementTransform(
@@ -118,7 +137,7 @@ public class BatchStatefulParDoOverrides {
                 transform) {
       return PTransformReplacement.of(
           PTransformReplacements.getSingletonMainInput(transform),
-          new StatefulMultiOutputParDo<>(transform.getTransform()));
+          new StatefulMultiOutputParDo<>(transform.getTransform(), isFnApi));
     }
 
     @Override
@@ -132,9 +151,12 @@ public class BatchStatefulParDoOverrides {
       extends PTransform<PCollection<KV<K, InputT>>, PCollection<OutputT>>
{
 
     private final ParDo.SingleOutput<KV<K, InputT>, OutputT> originalParDo;
+    private final boolean isFnApi;
 
-    StatefulSingleOutputParDo(ParDo.SingleOutput<KV<K, InputT>, OutputT> originalParDo)
{
+    StatefulSingleOutputParDo(ParDo.SingleOutput<KV<K, InputT>, OutputT> originalParDo,
+        boolean isFnApi) {
       this.originalParDo = originalParDo;
+      this.isFnApi = isFnApi;
     }
 
     ParDo.SingleOutput<KV<K, InputT>, OutputT> getOriginalParDo() {
@@ -148,6 +170,12 @@ public class BatchStatefulParDoOverrides {
       DataflowRunner.verifyStateSupported(fn);
       DataflowRunner.verifyStateSupportForWindowingStrategy(input.getWindowingStrategy());
 
+      if (isFnApi) {
+        return input.apply(GroupByKey.<K, InputT>create())
+            .apply(ParDo.of(new ExpandGbkFn<K, InputT>()))
+            .apply(originalParDo);
+      }
+
       PTransform<
               PCollection<? extends KV<K, Iterable<KV<Instant, WindowedValue<KV<K,
InputT>>>>>>,
               PCollection<OutputT>>
@@ -162,9 +190,12 @@ public class BatchStatefulParDoOverrides {
       extends PTransform<PCollection<KV<K, InputT>>, PCollectionTuple>
{
 
     private final ParDo.MultiOutput<KV<K, InputT>, OutputT> originalParDo;
+    private final boolean isFnApi;
 
-    StatefulMultiOutputParDo(ParDo.MultiOutput<KV<K, InputT>, OutputT> originalParDo)
{
+    StatefulMultiOutputParDo(ParDo.MultiOutput<KV<K, InputT>, OutputT> originalParDo,
+        boolean isFnApi) {
       this.originalParDo = originalParDo;
+      this.isFnApi = isFnApi;
     }
 
     @Override
@@ -174,6 +205,12 @@ public class BatchStatefulParDoOverrides {
       DataflowRunner.verifyStateSupported(fn);
       DataflowRunner.verifyStateSupportForWindowingStrategy(input.getWindowingStrategy());
 
+      if (isFnApi) {
+        return input.apply(GroupByKey.<K, InputT>create())
+            .apply(ParDo.of(new ExpandGbkFn<K, InputT>()))
+            .apply(originalParDo);
+      }
+
       PTransform<
               PCollection<? extends KV<K, Iterable<KV<Instant, WindowedValue<KV<K,
InputT>>>>>>,
               PCollectionTuple>
@@ -247,6 +284,21 @@ public class BatchStatefulParDoOverrides {
   }
 
   /**
+   * A key preserving {@link DoFn} that expands the output of a GBK {@code KV<K, Iterable<V>>}
into
+   * individual KVs.
+   */
+  static class ExpandGbkFn<K, V>
+      extends DoFn<KV<K, Iterable<V>>, KV<K, V>> {
+    @ProcessElement
+    public void processElement(ProcessContext c) {
+      K k = c.element().getKey();
+      for (V v : c.element().getValue()) {
+        c.output(KV.of(k, v));
+      }
+    }
+  }
+
+  /**
    * A key-preserving {@link DoFn} that explodes an iterable that has been grouped by key
and
    * window.
    */
@@ -287,3 +339,4 @@ public class BatchStatefulParDoOverrides {
         ParDo.class.getSimpleName());
   }
 }
+

http://git-wip-us.apache.org/repos/asf/beam/blob/97806e3d/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index db861d4..422fd11 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -379,11 +379,11 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob>
{
           .add(
               PTransformOverride.of(
                   PTransformMatchers.stateOrTimerParDoMulti(),
-                  BatchStatefulParDoOverrides.multiOutputOverrideFactory()))
+                  BatchStatefulParDoOverrides.multiOutputOverrideFactory(options)))
           .add(
               PTransformOverride.of(
                   PTransformMatchers.stateOrTimerParDoSingle(),
-                  BatchStatefulParDoOverrides.singleOutputOverrideFactory()))
+                  BatchStatefulParDoOverrides.singleOutputOverrideFactory(options)))
           .add(
               PTransformOverride.of(
                   PTransformMatchers.createViewWithViewFn(PCollectionViews.MapViewFn.class),

http://git-wip-us.apache.org/repos/asf/beam/blob/97806e3d/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverridesTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverridesTest.java
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverridesTest.java
index e62a8b8..db9b224 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverridesTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverridesTest.java
@@ -52,6 +52,7 @@ import org.apache.beam.sdk.util.gcsfs.GcsPath;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TupleTagList;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -77,17 +78,54 @@ public class BatchStatefulParDoOverridesTest implements Serializable {
   }
 
   @Test
+  public void testFnApiSingleOutputOverrideNonCrashing() throws Exception {
+    DataflowPipelineOptions options = buildPipelineOptions("--experiments=beam_fn_api");
+    options.setRunner(DataflowRunner.class);
+    Pipeline pipeline = Pipeline.create(options);
+
+    DummyStatefulDoFn fn = new DummyStatefulDoFn();
+    pipeline.apply(Create.of(KV.of(1, 2))).apply(ParDo.of(fn));
+
+    DataflowRunner runner = DataflowRunner.fromOptions(options);
+    runner.replaceTransforms(pipeline);
+    assertThat(findBatchStatefulDoFn(pipeline), equalTo((DoFn) fn));
+  }
+
+  @Test
   public void testMultiOutputOverrideNonCrashing() throws Exception {
     DataflowPipelineOptions options = buildPipelineOptions();
     options.setRunner(DataflowRunner.class);
     Pipeline pipeline = Pipeline.create(options);
 
     TupleTag<Integer> mainOutputTag = new TupleTag<Integer>() {};
+    TupleTag<Integer> sideOutputTag = new TupleTag<Integer>() {};
+
+    DummyStatefulDoFn fn = new DummyStatefulDoFn();
+    pipeline
+        .apply(Create.of(KV.of(1, 2)))
+        .apply(ParDo.of(fn).withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag)));
+
+    DataflowRunner runner = DataflowRunner.fromOptions(options);
+    runner.replaceTransforms(pipeline);
+    assertThat(findBatchStatefulDoFn(pipeline), equalTo((DoFn) fn));
+  }
+
+  @Test
+  @Ignore("TODO: BEAM-2902 Add support for user state in a ParDo.Multi once PTransformMatcher
"
+      + "exposes a way to know when the replacement is not required by checking that the
"
+      + "preceding ParDos to a GBK are key preserving.")
+  public void testFnApiMultiOutputOverrideNonCrashing() throws Exception {
+    DataflowPipelineOptions options = buildPipelineOptions("--experiments=beam_fn_api");
+    options.setRunner(DataflowRunner.class);
+    Pipeline pipeline = Pipeline.create(options);
+
+    TupleTag<Integer> mainOutputTag = new TupleTag<Integer>() {};
+    TupleTag<Integer> sideOutputTag = new TupleTag<Integer>() {};
 
     DummyStatefulDoFn fn = new DummyStatefulDoFn();
     pipeline
         .apply(Create.of(KV.of(1, 2)))
-        .apply(ParDo.of(fn).withOutputTags(mainOutputTag, TupleTagList.empty()));
+        .apply(ParDo.of(fn).withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag)));
 
     DataflowRunner runner = DataflowRunner.fromOptions(options);
     runner.replaceTransforms(pipeline);
@@ -146,7 +184,7 @@ public class BatchStatefulParDoOverridesTest implements Serializable {
     }
   }
 
-  private static DataflowPipelineOptions buildPipelineOptions() throws IOException {
+  private static DataflowPipelineOptions buildPipelineOptions(String ... args) throws IOException
{
     GcsUtil mockGcsUtil = mock(GcsUtil.class);
     when(mockGcsUtil.expand(any(GcsPath.class))).then(new Answer<List<GcsPath>>()
{
       @Override
@@ -156,7 +194,8 @@ public class BatchStatefulParDoOverridesTest implements Serializable {
     });
     when(mockGcsUtil.bucketAccessible(any(GcsPath.class))).thenReturn(true);
 
-    DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
+    DataflowPipelineOptions options =
+        PipelineOptionsFactory.fromArgs(args).as(DataflowPipelineOptions.class);
     options.setRunner(DataflowRunner.class);
     options.setGcpCredential(new TestCredential());
     options.setJobName("some-job-name");


Mime
View raw message