beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tg...@apache.org
Subject [2/2] beam git commit: Use Batch Replacement in the Flink Runner
Date Mon, 03 Apr 2017 18:46:02 GMT
Use Batch Replacement in the Flink Runner


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d174a241
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d174a241
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d174a241

Branch: refs/heads/master
Commit: d174a241f0da4e6a9271f75aeb0198d192dc3d46
Parents: 018513e
Author: Thomas Groh <tgroh@google.com>
Authored: Thu Mar 30 15:53:16 2017 -0700
Committer: Thomas Groh <tgroh@google.com>
Committed: Mon Apr 3 11:45:51 2017 -0700

----------------------------------------------------------------------
 .../flink/FlinkStreamingPipelineTranslator.java | 81 +++++++++++---------
 1 file changed, 43 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/d174a241/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
index 0cedf66..8b5637e 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
@@ -17,7 +17,7 @@
  */
 package org.apache.beam.runners.flink;
 
-import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import java.util.List;
 import java.util.Map;
@@ -27,13 +27,13 @@ import org.apache.beam.runners.core.construction.ReplacementOutputs;
 import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.runners.PTransformMatcher;
 import org.apache.beam.sdk.runners.PTransformOverride;
 import org.apache.beam.sdk.runners.PTransformOverrideFactory;
 import org.apache.beam.sdk.runners.TransformHierarchy;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.ParDo.MultiOutput;
 import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.util.InstanceBuilder;
 import org.apache.beam.sdk.values.PCollection;
@@ -71,43 +71,48 @@ class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator
{
 
   @Override
   public void translate(Pipeline pipeline) {
-    Map<PTransformMatcher, PTransformOverrideFactory> transformOverrides =
-        ImmutableMap.<PTransformMatcher, PTransformOverrideFactory>builder()
-            .put(
-                PTransformMatchers.splittableParDoMulti(), new SplittableParDoOverrideFactory())
-            .put(
-                PTransformMatchers.classEqualTo(View.AsIterable.class),
-                new ReflectiveOneToOneOverrideFactory(
-                    FlinkStreamingViewOverrides.StreamingViewAsIterable.class, flinkRunner))
-            .put(
-                PTransformMatchers.classEqualTo(View.AsList.class),
-                new ReflectiveOneToOneOverrideFactory(
-                    FlinkStreamingViewOverrides.StreamingViewAsList.class, flinkRunner))
-            .put(
-                PTransformMatchers.classEqualTo(View.AsMap.class),
-                new ReflectiveOneToOneOverrideFactory(
-                    FlinkStreamingViewOverrides.StreamingViewAsMap.class, flinkRunner))
-            .put(
-                PTransformMatchers.classEqualTo(View.AsMultimap.class),
-                new ReflectiveOneToOneOverrideFactory(
-                    FlinkStreamingViewOverrides.StreamingViewAsMultimap.class, flinkRunner))
-            .put(
-                PTransformMatchers.classEqualTo(View.AsSingleton.class),
-                new ReflectiveOneToOneOverrideFactory(
-                    FlinkStreamingViewOverrides.StreamingViewAsSingleton.class, flinkRunner))
+    List<PTransformOverride> transformOverrides =
+        ImmutableList.<PTransformOverride>builder()
+            .add(
+                PTransformOverride.of(
+                    PTransformMatchers.splittableParDoMulti(),
+                    new SplittableParDoOverrideFactory()))
+            .add(
+                PTransformOverride.of(
+                    PTransformMatchers.classEqualTo(View.AsIterable.class),
+                    new ReflectiveOneToOneOverrideFactory(
+                        FlinkStreamingViewOverrides.StreamingViewAsIterable.class, flinkRunner)))
+            .add(
+                PTransformOverride.of(
+                    PTransformMatchers.classEqualTo(View.AsList.class),
+                    new ReflectiveOneToOneOverrideFactory(
+                        FlinkStreamingViewOverrides.StreamingViewAsList.class, flinkRunner)))
+            .add(
+                PTransformOverride.of(
+                    PTransformMatchers.classEqualTo(View.AsMap.class),
+                    new ReflectiveOneToOneOverrideFactory(
+                        FlinkStreamingViewOverrides.StreamingViewAsMap.class, flinkRunner)))
+            .add(
+                PTransformOverride.of(
+                    PTransformMatchers.classEqualTo(View.AsMultimap.class),
+                    new ReflectiveOneToOneOverrideFactory(
+                        FlinkStreamingViewOverrides.StreamingViewAsMultimap.class, flinkRunner)))
+            .add(
+                PTransformOverride.of(
+                    PTransformMatchers.classEqualTo(View.AsSingleton.class),
+                    new ReflectiveOneToOneOverrideFactory(
+                        FlinkStreamingViewOverrides.StreamingViewAsSingleton.class, flinkRunner)))
             // this has to be last since the ViewAsSingleton override
             // can expand to a Combine.GloballyAsSingletonView
-            .put(
-                PTransformMatchers.classEqualTo(Combine.GloballyAsSingletonView.class),
-                new ReflectiveOneToOneOverrideFactory(
-                    FlinkStreamingViewOverrides.StreamingCombineGloballyAsSingletonView.class,
-                    flinkRunner))
-        .build();
-
-    for (Map.Entry<PTransformMatcher, PTransformOverrideFactory> override :
-        transformOverrides.entrySet()) {
-      pipeline.replace(PTransformOverride.of(override.getKey(), override.getValue()));
-    }
+            .add(
+                PTransformOverride.of(
+                    PTransformMatchers.classEqualTo(Combine.GloballyAsSingletonView.class),
+                    new ReflectiveOneToOneOverrideFactory(
+                        FlinkStreamingViewOverrides.StreamingCombineGloballyAsSingletonView.class,
+                        flinkRunner)))
+            .build();
+
+    pipeline.replaceAll(transformOverrides);
     super.translate(pipeline);
   }
 
@@ -245,7 +250,7 @@ class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator
{
    */
   static class SplittableParDoOverrideFactory<InputT, OutputT>
       implements PTransformOverrideFactory<
-      PCollection<? extends InputT>, PCollectionTuple, ParDo.MultiOutput<InputT,
OutputT>> {
+            PCollection<? extends InputT>, PCollectionTuple, MultiOutput<InputT,
OutputT>> {
     @Override
     @SuppressWarnings("unchecked")
     public PTransform<PCollection<? extends InputT>, PCollectionTuple> getReplacementTransform(


Mime
View raw message