beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From p..@apache.org
Subject [1/3] beam git commit: Flink runner: refactor the translator into two phases: rewriting and translating.
Date Tue, 20 Jun 2017 21:13:54 GMT
Repository: beam
Updated Branches:
  refs/heads/master e4ef23e16 -> 608a9c459


Flink runner: refactor the translator into two phases: rewriting and translating.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/52794096
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/52794096
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/52794096

Branch: refs/heads/master
Commit: 52794096aa8b4d614423fd787835f5b89b1ea1ac
Parents: f69e3b5
Author: Pei He <pei@apache.org>
Authored: Mon Jun 19 16:10:02 2017 -0700
Committer: Pei He <pei@apache.org>
Committed: Tue Jun 20 14:12:13 2017 -0700

----------------------------------------------------------------------
 .../FlinkPipelineExecutionEnvironment.java      |  2 +
 .../flink/FlinkStreamingPipelineTranslator.java | 23 ---------
 .../runners/flink/FlinkTransformOverrides.java  | 53 ++++++++++++++++++++
 3 files changed, 55 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/52794096/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
index fe5dd87..d2a2016 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
@@ -84,6 +84,8 @@ class FlinkPipelineExecutionEnvironment {
     this.flinkBatchEnv = null;
     this.flinkStreamEnv = null;
 
+    pipeline.replaceAll(FlinkTransformOverrides.getDefaultOverrides(options.isStreaming()));
+
     PipelineTranslationOptimizer optimizer =
         new PipelineTranslationOptimizer(TranslationMode.BATCH, options);
 

http://git-wip-us.apache.org/repos/asf/beam/blob/52794096/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
index d768b01..27bb4ec 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
@@ -17,11 +17,7 @@
  */
 package org.apache.beam.runners.flink;
 
-import com.google.common.collect.ImmutableList;
-import java.util.List;
 import java.util.Map;
-import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems;
-import org.apache.beam.runners.core.construction.PTransformMatchers;
 import org.apache.beam.runners.core.construction.PTransformReplacements;
 import org.apache.beam.runners.core.construction.ReplacementOutputs;
 import org.apache.beam.runners.core.construction.SplittableParDo;
@@ -29,12 +25,10 @@ import org.apache.beam.runners.core.construction.UnconsumedReads;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.runners.AppliedPTransform;
-import org.apache.beam.sdk.runners.PTransformOverride;
 import org.apache.beam.sdk.runners.PTransformOverrideFactory;
 import org.apache.beam.sdk.runners.TransformHierarchy;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo.MultiOutput;
-import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.PValue;
@@ -70,25 +64,8 @@ class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator
{
 
   @Override
   public void translate(Pipeline pipeline) {
-    List<PTransformOverride> transformOverrides =
-        ImmutableList.<PTransformOverride>builder()
-            .add(
-                PTransformOverride.of(
-                    PTransformMatchers.splittableParDoMulti(),
-                    new SplittableParDoOverrideFactory()))
-            .add(
-                PTransformOverride.of(
-                    PTransformMatchers.classEqualTo(SplittableParDo.ProcessKeyedElements.class),
-                    new SplittableParDoViaKeyedWorkItems.OverrideFactory()))
-            .add(
-                PTransformOverride.of(
-                    PTransformMatchers.classEqualTo(CreatePCollectionView.class),
-                    new CreateStreamingFlinkView.Factory()))
-            .build();
-
     // Ensure all outputs of all reads are consumed.
     UnconsumedReads.ensureAllReadsConsumed(pipeline);
-    pipeline.replaceAll(transformOverrides);
     super.translate(pipeline);
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/52794096/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkTransformOverrides.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkTransformOverrides.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkTransformOverrides.java
new file mode 100644
index 0000000..1dc8de9
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkTransformOverrides.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import com.google.common.collect.ImmutableList;
+import java.util.List;
+import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems;
+import org.apache.beam.runners.core.construction.PTransformMatchers;
+import org.apache.beam.runners.core.construction.SplittableParDo;
+import org.apache.beam.sdk.runners.PTransformOverride;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.View;
+
+/**
+ * {@link PTransform} overrides for Flink runner.
+ */
+public class FlinkTransformOverrides {
+  public static List<PTransformOverride> getDefaultOverrides(boolean streaming) {
+    if (streaming) {
+      return ImmutableList.<PTransformOverride>builder()
+          .add(
+              PTransformOverride.of(
+                  PTransformMatchers.splittableParDoMulti(),
+                  new FlinkStreamingPipelineTranslator.SplittableParDoOverrideFactory()))
+          .add(
+              PTransformOverride.of(
+                  PTransformMatchers.classEqualTo(SplittableParDo.ProcessKeyedElements.class),
+                  new SplittableParDoViaKeyedWorkItems.OverrideFactory()))
+          .add(
+              PTransformOverride.of(
+                  PTransformMatchers.classEqualTo(View.CreatePCollectionView.class),
+                  new CreateStreamingFlinkView.Factory()))
+          .build();
+    } else {
+      return ImmutableList.of();
+    }
+  }
+}


Mime
View raw message