beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [1/2] beam git commit: Key FlinkRunner streaming translation off URN
Date Thu, 14 Sep 2017 18:36:25 GMT
Repository: beam
Updated Branches:
  refs/heads/master fa4ecea26 -> e7601aac3


Key FlinkRunner streaming translation off URN


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/be9fb299
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/be9fb299
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/be9fb299

Branch: refs/heads/master
Commit: be9fb29901cf4a1ae7b4a9d8e9f25f4ea78359fd
Parents: fa4ecea
Author: Kenneth Knowles <klk@google.com>
Authored: Tue Sep 5 07:47:14 2017 -0700
Committer: Kenneth Knowles <klk@google.com>
Committed: Thu Sep 14 11:22:58 2017 -0700

----------------------------------------------------------------------
 .../construction/PTransformTranslation.java     |  27 ++-
 runners/flink/pom.xml                           |   5 +
 .../runners/flink/CreateStreamingFlinkView.java |   3 +
 .../FlinkStreamingTransformTranslators.java     | 205 +++++++++++++++++--
 4 files changed, 215 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/be9fb299/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
index 4bfe17a..41fac11 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
@@ -20,13 +20,17 @@ package org.apache.beam.runners.core.construction;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
+import com.google.common.base.Joiner;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Sets;
 import com.google.protobuf.ByteString;
 import java.io.IOException;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.ServiceLoader;
+import java.util.Set;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi;
@@ -56,6 +60,8 @@ public class PTransformTranslation {
   // Not strictly a primitive transform
   public static final String COMBINE_TRANSFORM_URN = "urn:beam:transform:combine:v1";
 
+  public static final String RESHUFFLE_URN = "urn:beam:transform:reshuffle:v1";
+
   // Less well-known. And where shall these live?
   public static final String WRITE_FILES_TRANSFORM_URN = "urn:beam:transform:write_files:0.1";
 
@@ -71,13 +77,26 @@ public class PTransformTranslation {
 
   private static Map<Class<? extends PTransform>, TransformPayloadTranslator>
       loadTransformPayloadTranslators() {
-    ImmutableMap.Builder<Class<? extends PTransform>, TransformPayloadTranslator>
builder =
-        ImmutableMap.builder();
+    HashMap<Class<? extends PTransform>, TransformPayloadTranslator> translators
= new HashMap<>();
+
     for (TransformPayloadTranslatorRegistrar registrar :
         ServiceLoader.load(TransformPayloadTranslatorRegistrar.class)) {
-      builder.putAll(registrar.getTransformPayloadTranslators());
+
+      Map<Class<? extends PTransform>, TransformPayloadTranslator> newTranslators
=
+          (Map) registrar.getTransformPayloadTranslators();
+
+      Set<Class<? extends PTransform>> alreadyRegistered = Sets.intersection(
+          translators.keySet(), newTranslators.keySet());
+
+      if (!alreadyRegistered.isEmpty()) {
+        throw new IllegalArgumentException(
+            String.format(
+                "Classes already registered: %s", Joiner.on(", ").join(alreadyRegistered)));
+      }
+
+      translators.putAll(newTranslators);
     }
-    return builder.build();
+    return ImmutableMap.copyOf(translators);
   }
 
   private PTransformTranslation() {}

http://git-wip-us.apache.org/repos/asf/beam/blob/be9fb299/runners/flink/pom.xml
----------------------------------------------------------------------
diff --git a/runners/flink/pom.xml b/runners/flink/pom.xml
index 06746fd..0ef1931 100644
--- a/runners/flink/pom.xml
+++ b/runners/flink/pom.xml
@@ -219,6 +219,11 @@
     <!-- Beam -->
     <dependency>
       <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-common-runner-api</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.beam</groupId>
       <artifactId>beam-sdks-java-core</artifactId>
       <exclusions>
         <exclusion>

http://git-wip-us.apache.org/repos/asf/beam/blob/be9fb299/runners/flink/src/main/java/org/apache/beam/runners/flink/CreateStreamingFlinkView.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/CreateStreamingFlinkView.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/CreateStreamingFlinkView.java
index 3114a6f..ceecc1f 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/CreateStreamingFlinkView.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/CreateStreamingFlinkView.java
@@ -40,6 +40,9 @@ class CreateStreamingFlinkView<ElemT, ViewT>
     extends PTransform<PCollection<ElemT>, PCollection<ElemT>> {
   private final PCollectionView<ViewT> view;
 
+  public static final String CREATE_STREAMING_FLINK_VIEW_URN =
+      "beam:transform:flink:create-streaming-flink-view:v1";
+
   public CreateStreamingFlinkView(PCollectionView<ViewT> view) {
     this.view = view;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/be9fb299/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
index 058e195..7cedb56 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
@@ -18,6 +18,10 @@
 
 package org.apache.beam.runners.flink;
 
+import static org.apache.beam.runners.core.construction.SplittableParDo.SPLITTABLE_PROCESS_URN;
+
+import com.google.auto.service.AutoService;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -26,9 +30,14 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import javax.annotation.Nullable;
 import org.apache.beam.runners.core.KeyedWorkItem;
 import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems;
 import org.apache.beam.runners.core.SystemReduceFn;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.construction.SdkComponents;
+import org.apache.beam.runners.core.construction.SplittableParDo;
+import org.apache.beam.runners.core.construction.TransformPayloadTranslatorRegistrar;
 import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows;
 import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
 import org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator;
@@ -45,7 +54,9 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi;
 import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Flatten;
@@ -65,7 +76,9 @@ import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.AppliedCombineFn;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.TupleTag;
@@ -101,39 +114,38 @@ class FlinkStreamingTransformTranslators {
   //  Transform Translator Registry
   // --------------------------------------------------------------------------------------------
 
+  /**
+   * A map from a Transform URN to the translator.
+   */
   @SuppressWarnings("rawtypes")
-  private static final Map<
-      Class<? extends PTransform>,
-      FlinkStreamingPipelineTranslator.StreamTransformTranslator> TRANSLATORS = new HashMap<>();
+  private static final Map<String, FlinkStreamingPipelineTranslator.StreamTransformTranslator>
+      TRANSLATORS = new HashMap<>();
 
   // here you can find all the available translators.
   static {
-    TRANSLATORS.put(Read.Bounded.class, new BoundedReadSourceTranslator());
-    TRANSLATORS.put(Read.Unbounded.class, new UnboundedReadSourceTranslator());
+    TRANSLATORS.put(PTransformTranslation.READ_TRANSFORM_URN, new ReadSourceTranslator());
 
-    TRANSLATORS.put(ParDo.MultiOutput.class, new ParDoStreamingTranslator());
+    TRANSLATORS.put(PTransformTranslation.PAR_DO_TRANSFORM_URN, new ParDoStreamingTranslator());
     TRANSLATORS.put(
-        SplittableParDoViaKeyedWorkItems.ProcessElements.class,
-        new SplittableProcessElementsStreamingTranslator());
-    TRANSLATORS.put(
-        SplittableParDoViaKeyedWorkItems.GBKIntoKeyedWorkItems.class,
-        new GBKIntoKeyedWorkItemsTranslator());
-
+        SPLITTABLE_PROCESS_URN, new SplittableProcessElementsStreamingTranslator());
+    TRANSLATORS.put(SplittableParDo.SPLITTABLE_GBKIKWI_URN, new GBKIntoKeyedWorkItemsTranslator());
 
-    TRANSLATORS.put(Window.Assign.class, new WindowAssignTranslator());
-    TRANSLATORS.put(Flatten.PCollections.class, new FlattenPCollectionTranslator());
+    TRANSLATORS.put(PTransformTranslation.WINDOW_TRANSFORM_URN, new WindowAssignTranslator());
     TRANSLATORS.put(
-        CreateStreamingFlinkView.CreateFlinkPCollectionView.class,
+        PTransformTranslation.FLATTEN_TRANSFORM_URN, new FlattenPCollectionTranslator());
+    TRANSLATORS.put(
+        CreateStreamingFlinkView.CREATE_STREAMING_FLINK_VIEW_URN,
         new CreateViewStreamingTranslator());
 
-    TRANSLATORS.put(Reshuffle.class, new ReshuffleTranslatorStreaming());
-    TRANSLATORS.put(GroupByKey.class, new GroupByKeyTranslator());
-    TRANSLATORS.put(Combine.PerKey.class, new CombinePerKeyTranslator());
+    TRANSLATORS.put(PTransformTranslation.RESHUFFLE_URN, new ReshuffleTranslatorStreaming());
+    TRANSLATORS.put(PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN, new GroupByKeyTranslator());
+    TRANSLATORS.put(PTransformTranslation.COMBINE_TRANSFORM_URN, new CombinePerKeyTranslator());
   }
 
   public static FlinkStreamingPipelineTranslator.StreamTransformTranslator<?> getTranslator(
       PTransform<?, ?> transform) {
-    return TRANSLATORS.get(transform.getClass());
+    @Nullable String urn = PTransformTranslation.urnForTransformOrNull(transform);
+    return urn == null ? null : TRANSLATORS.get(urn);
   }
 
   // --------------------------------------------------------------------------------------------
@@ -215,6 +227,26 @@ class FlinkStreamingTransformTranslators {
 
   }
 
+  private static class ReadSourceTranslator<T>
+      extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<
+          PTransform<PBegin, PCollection<T>>> {
+
+    private final BoundedReadSourceTranslator<T> boundedTranslator =
+        new BoundedReadSourceTranslator<>();
+    private final UnboundedReadSourceTranslator<T> unboundedTranslator =
+        new UnboundedReadSourceTranslator<>();
+
+    @Override
+    void translateNode(
+        PTransform<PBegin, PCollection<T>> transform, FlinkStreamingTranslationContext
context) {
+      if (context.getOutput(transform).isBounded().equals(PCollection.IsBounded.BOUNDED))
{
+        boundedTranslator.translateNode((Read.Bounded<T>) transform, context);
+      } else {
+        unboundedTranslator.translateNode((Read.Unbounded<T>) transform, context);
+      }
+    }
+  }
+
   private static class BoundedReadSourceTranslator<T>
       extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<Read.Bounded<T>>
{
 
@@ -497,13 +529,15 @@ class FlinkStreamingTransformTranslators {
 
   private static class ParDoStreamingTranslator<InputT, OutputT>
       extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<
-      ParDo.MultiOutput<InputT, OutputT>> {
+          PTransform<PCollection<InputT>, PCollectionTuple>> {
 
     @Override
     public void translateNode(
-        ParDo.MultiOutput<InputT, OutputT> transform,
+        PTransform<PCollection<InputT>, PCollectionTuple> rawTransform,
         FlinkStreamingTranslationContext context) {
 
+      ParDo.MultiOutput<InputT, OutputT> transform = (ParDo.MultiOutput) rawTransform;
+
       ParDoTranslationHelper.translateParDo(
           transform.getName(),
           transform.getFn(),
@@ -1046,4 +1080,133 @@ class FlinkStreamingTransformTranslators {
     }
   }
 
+  /**
+   * A translator just to vend the URN. This will need to be moved to runners-core-construction-java
+   * once SDF is reorganized appropriately.
+   */
+  private static class SplittableParDoProcessElementsTranslator
+      implements PTransformTranslation.TransformPayloadTranslator<
+      SplittableParDoViaKeyedWorkItems.ProcessElements<?, ?, ?, ?>> {
+
+    private SplittableParDoProcessElementsTranslator() {}
+
+    @Override
+    public String getUrn(SplittableParDoViaKeyedWorkItems.ProcessElements<?, ?, ?, ?>
transform) {
+      return SPLITTABLE_PROCESS_URN;
+    }
+
+    @Override
+    public RunnerApi.FunctionSpec translate(
+        AppliedPTransform<?, ?, SplittableParDoViaKeyedWorkItems.ProcessElements<?,
?, ?, ?>>
+            transform,
+        SdkComponents components) {
+      throw new UnsupportedOperationException(
+          String.format(
+              "%s should never be translated",
+              SplittableParDoViaKeyedWorkItems.ProcessElements.class.getCanonicalName()));
+    }
+  }
+
+  /** Registers classes specialized to the Flink runner. */
+  @AutoService(TransformPayloadTranslatorRegistrar.class)
+  public static class FlinkTransformsRegistrar implements TransformPayloadTranslatorRegistrar
{
+    @Override
+    public Map<
+        ? extends Class<? extends PTransform>,
+        ? extends PTransformTranslation.TransformPayloadTranslator>
+    getTransformPayloadTranslators() {
+      return ImmutableMap
+          .<Class<? extends PTransform>, PTransformTranslation.TransformPayloadTranslator>builder()
+          .put(
+              CreateStreamingFlinkView.CreateFlinkPCollectionView.class,
+              new CreateStreamingFlinkViewPayloadTranslator())
+          .put(
+              SplittableParDoViaKeyedWorkItems.ProcessElements.class,
+              new SplittableParDoProcessElementsTranslator())
+          .put(
+              SplittableParDoViaKeyedWorkItems.GBKIntoKeyedWorkItems.class,
+              new SplittableParDoGbkIntoKeyedWorkItemsPayloadTranslator())
+          .build();
+    }
+  }
+
+  /**
+   * A translator just to vend the URN. This will need to be moved to runners-core-construction-java
+   * once SDF is reorganized appropriately.
+   */
+  private static class SplittableParDoProcessElementsPayloadTranslator
+      implements PTransformTranslation.TransformPayloadTranslator<
+      SplittableParDoViaKeyedWorkItems.ProcessElements<?, ?, ?, ?>> {
+
+    private SplittableParDoProcessElementsPayloadTranslator() {}
+
+    @Override
+    public String getUrn(SplittableParDoViaKeyedWorkItems.ProcessElements<?, ?, ?, ?>
transform) {
+      return SplittableParDo.SPLITTABLE_PROCESS_URN;
+    }
+
+    @Override
+    public RunnerApi.FunctionSpec translate(
+        AppliedPTransform<?, ?, SplittableParDoViaKeyedWorkItems.ProcessElements<?,
?, ?, ?>>
+            transform,
+        SdkComponents components) {
+      throw new UnsupportedOperationException(
+          String.format(
+              "%s should never be translated",
+              SplittableParDoViaKeyedWorkItems.ProcessElements.class.getCanonicalName()));
+    }
+  }
+
+  /**
+   * A translator just to vend the URN. This will need to be moved to runners-core-construction-java
+   * once SDF is reorganized appropriately.
+   */
+  private static class SplittableParDoGbkIntoKeyedWorkItemsPayloadTranslator
+      implements PTransformTranslation.TransformPayloadTranslator<
+      SplittableParDoViaKeyedWorkItems.GBKIntoKeyedWorkItems<?, ?>> {
+
+    private SplittableParDoGbkIntoKeyedWorkItemsPayloadTranslator() {}
+
+    @Override
+    public String getUrn(SplittableParDoViaKeyedWorkItems.GBKIntoKeyedWorkItems<?, ?>
transform) {
+      return SplittableParDo.SPLITTABLE_GBKIKWI_URN;
+    }
+
+    @Override
+    public RunnerApi.FunctionSpec translate(
+        AppliedPTransform<?, ?, SplittableParDoViaKeyedWorkItems.GBKIntoKeyedWorkItems<?,
?>>
+            transform,
+        SdkComponents components) {
+      throw new UnsupportedOperationException(
+          String.format(
+              "%s should never be translated",
+              SplittableParDoViaKeyedWorkItems.GBKIntoKeyedWorkItems.class.getCanonicalName()));
+    }
+  }
+
+  /**
+   * A translator just to vend the URN.
+   */
+  private static class CreateStreamingFlinkViewPayloadTranslator
+      implements PTransformTranslation.TransformPayloadTranslator<
+          CreateStreamingFlinkView.CreateFlinkPCollectionView<?, ?>> {
+
+    private CreateStreamingFlinkViewPayloadTranslator() {}
+
+    @Override
+    public String getUrn(CreateStreamingFlinkView.CreateFlinkPCollectionView<?, ?>
transform) {
+      return CreateStreamingFlinkView.CREATE_STREAMING_FLINK_VIEW_URN;
+    }
+
+    @Override
+    public RunnerApi.FunctionSpec translate(
+        AppliedPTransform<?, ?, CreateStreamingFlinkView.CreateFlinkPCollectionView<?,
?>>
+            transform,
+        SdkComponents components) {
+      throw new UnsupportedOperationException(
+          String.format(
+              "%s should never be translated",
+              CreateStreamingFlinkView.CreateFlinkPCollectionView.class.getCanonicalName()));
+    }
+  }
 }


Mime
View raw message