beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [1/2] beam git commit: [BEAM-1855] Support Splittable DoFn on Flink Runner
Date Sat, 01 Apr 2017 08:12:18 GMT
Repository: beam
Updated Branches:
  refs/heads/master e31ca8b0d -> ea33e3373


[BEAM-1855] Support Splittable DoFn on Flink Runner


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5824bb4b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5824bb4b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5824bb4b

Branch: refs/heads/master
Commit: 5824bb4b5700b5230f569c570d5e8ed2d11cedf2
Parents: e31ca8b
Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Authored: Mon Mar 13 21:23:14 2017 +0100
Committer: Eugene Kirpichov <kirpichov@google.com>
Committed: Sat Apr 1 01:10:55 2017 -0700

----------------------------------------------------------------------
 runners/flink/runner/pom.xml                    |   4 +-
 .../flink/FlinkStreamingPipelineTranslator.java |  37 ++
 .../FlinkStreamingTransformTranslators.java     | 341 ++++++++++++++-----
 .../streaming/SplittableDoFnOperator.java       | 150 ++++++++
 .../beam/sdk/transforms/SplittableDoFnTest.java |   3 +-
 5 files changed, 448 insertions(+), 87 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/5824bb4b/runners/flink/runner/pom.xml
----------------------------------------------------------------------
diff --git a/runners/flink/runner/pom.xml b/runners/flink/runner/pom.xml
index 3b35c8e..f26aeb0 100644
--- a/runners/flink/runner/pom.xml
+++ b/runners/flink/runner/pom.xml
@@ -88,9 +88,9 @@
                     org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders,
                     org.apache.beam.sdk.testing.UsesSetState,
                     org.apache.beam.sdk.testing.UsesMapState,
-                    org.apache.beam.sdk.testing.UsesSplittableParDo,
                     org.apache.beam.sdk.testing.UsesAttemptedMetrics,
-                    org.apache.beam.sdk.testing.UsesCommittedMetrics
+                    org.apache.beam.sdk.testing.UsesCommittedMetrics,
+                    org.apache.beam.sdk.testing.UsesTestStream
                   </excludedGroups>
                   <parallel>none</parallel>
                   <failIfNoTests>true</failIfNoTests>

http://git-wip-us.apache.org/repos/asf/beam/blob/5824bb4b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
index d50d6bf..0cedf66 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
@@ -18,8 +18,12 @@
 package org.apache.beam.runners.flink;
 
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import java.util.List;
 import java.util.Map;
+import org.apache.beam.runners.core.SplittableParDo;
 import org.apache.beam.runners.core.construction.PTransformMatchers;
+import org.apache.beam.runners.core.construction.ReplacementOutputs;
 import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -29,9 +33,13 @@ import org.apache.beam.sdk.runners.PTransformOverrideFactory;
 import org.apache.beam.sdk.runners.TransformHierarchy;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.util.InstanceBuilder;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TaggedPValue;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -66,6 +74,8 @@ class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator {
     Map<PTransformMatcher, PTransformOverrideFactory> transformOverrides =
         ImmutableMap.<PTransformMatcher, PTransformOverrideFactory>builder()
             .put(
+                PTransformMatchers.splittableParDoMulti(), new SplittableParDoOverrideFactory())
+            .put(
                 PTransformMatchers.classEqualTo(View.AsIterable.class),
                 new ReflectiveOneToOneOverrideFactory(
                     FlinkStreamingViewOverrides.StreamingViewAsIterable.class, flinkRunner))
@@ -228,4 +238,31 @@ class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator
{
     }
   }
 
+  /**
+   * A {@link PTransformOverrideFactory} that overrides a
+   * <a href="https://s.apache.org/splittable-do-fn">Splittable DoFn</a> with
+   * {@link SplittableParDo}.
+   */
+  static class SplittableParDoOverrideFactory<InputT, OutputT>
+      implements PTransformOverrideFactory<
+      PCollection<? extends InputT>, PCollectionTuple, ParDo.MultiOutput<InputT,
OutputT>> {
+    @Override
+    @SuppressWarnings("unchecked")
+    public PTransform<PCollection<? extends InputT>, PCollectionTuple> getReplacementTransform(
+        ParDo.MultiOutput<InputT, OutputT> transform) {
+      return new SplittableParDo(transform);
+    }
+
+    @Override
+    public PCollection<? extends InputT> getInput(
+        List<TaggedPValue> inputs, Pipeline p) {
+      return (PCollection<? extends InputT>) Iterables.getOnlyElement(inputs).getValue();
+    }
+
+    @Override
+    public Map<PValue, ReplacementOutput> mapOutputs(
+        List<TaggedPValue> outputs, PCollectionTuple newOutput) {
+      return ReplacementOutputs.tagged(outputs, newOutput);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/5824bb4b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
index 00b0412..5c29db2 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
@@ -29,6 +29,9 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import org.apache.beam.runners.core.ElementAndRestriction;
+import org.apache.beam.runners.core.KeyedWorkItem;
+import org.apache.beam.runners.core.SplittableParDo;
 import org.apache.beam.runners.core.SystemReduceFn;
 import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows;
 import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
@@ -37,6 +40,7 @@ import org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator
 import org.apache.beam.runners.flink.translation.wrappers.streaming.KvToByteBufferKeySelector;
 import org.apache.beam.runners.flink.translation.wrappers.streaming.SingletonKeyedWorkItem;
 import org.apache.beam.runners.flink.translation.wrappers.streaming.SingletonKeyedWorkItemCoder;
+import org.apache.beam.runners.flink.translation.wrappers.streaming.SplittableDoFnOperator;
 import org.apache.beam.runners.flink.translation.wrappers.streaming.WindowDoFnOperator;
 import org.apache.beam.runners.flink.translation.wrappers.streaming.WorkItemKeySelector;
 import org.apache.beam.runners.flink.translation.wrappers.streaming.io.BoundedSourceWrapper;
@@ -45,6 +49,7 @@ import org.apache.beam.runners.flink.translation.wrappers.streaming.io.Unbounded
 import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.io.Sink;
@@ -60,6 +65,7 @@ import org.apache.beam.sdk.transforms.join.RawUnionValue;
 import org.apache.beam.sdk.transforms.join.UnionCoder;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
@@ -122,6 +128,11 @@ class FlinkStreamingTransformTranslators {
     TRANSLATORS.put(TextIO.Write.Bound.class, new TextIOWriteBoundStreamingTranslator());
 
     TRANSLATORS.put(ParDo.MultiOutput.class, new ParDoStreamingTranslator());
+    TRANSLATORS.put(
+        SplittableParDo.ProcessElements.class, new SplittableProcessElementsStreamingTranslator());
+    TRANSLATORS.put(
+        SplittableParDo.GBKIntoKeyedWorkItems.class, new GBKIntoKeyedWorkItemsTranslator());
+
 
     TRANSLATORS.put(Window.Assign.class, new WindowAssignTranslator());
     TRANSLATORS.put(Flatten.PCollections.class, new FlattenPCollectionTranslator());
@@ -309,16 +320,6 @@ class FlinkStreamingTransformTranslators {
     }
   }
 
-  private static void rejectSplittable(DoFn<?, ?> doFn) {
-    DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass());
-    if (signature.processElement().isSplittable()) {
-      throw new UnsupportedOperationException(
-          String.format(
-              "%s does not currently support splittable DoFn: %s",
-              FlinkRunner.class.getSimpleName(), doFn));
-    }
-  }
-
   /**
    * Wraps each element in a {@link RawUnionValue} with the given tag id.
    */
@@ -395,64 +396,77 @@ class FlinkStreamingTransformTranslators {
     return new Tuple2<>(intToViewMapping, sideInputUnion);
   }
 
+  /**
+   * Helper for translating {@link ParDo.MultiOutput} and {@link SplittableParDo.ProcessElements}.
+   */
+  static class ParDoTranslationHelper {
+
+    interface DoFnOperatorFactory<InputT, OutputT> {
+      DoFnOperator<InputT, OutputT, RawUnionValue> createDoFnOperator(
+          DoFn<InputT, OutputT> doFn,
+          List<PCollectionView<?>> sideInputs,
+          TupleTag<OutputT> mainOutputTag,
+          List<TupleTag<?>> sideOutputTags,
+          FlinkStreamingTranslationContext context,
+          WindowingStrategy<?, ?> windowingStrategy,
+          Map<TupleTag<?>, Integer> tagsToLabels,
+          Coder<WindowedValue<InputT>> inputCoder,
+          Coder keyCoder,
+          Map<Integer, PCollectionView<?>> transformedSideInputs);
+    }
 
-  private static class ParDoStreamingTranslator<InputT, OutputT>
-      extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<
-      ParDo.MultiOutput<InputT, OutputT>> {
-
-    @Override
-    public void translateNode(
-        ParDo.MultiOutput<InputT, OutputT> transform,
-        FlinkStreamingTranslationContext context) {
-
-      DoFn<InputT, OutputT> doFn = transform.getFn();
-      rejectSplittable(doFn);
+    static <InputT, OutputT> void translateParDo(
+        String transformName,
+        DoFn<InputT, OutputT> doFn,
+        PCollection<InputT> input,
+        List<PCollectionView<?>> sideInputs,
+        List<TaggedPValue> outputs,
+        TupleTag<OutputT> mainOutputTag,
+        List<TupleTag<?>> sideOutputTags,
+        FlinkStreamingTranslationContext context,
+        DoFnOperatorFactory<InputT, OutputT> doFnOperatorFactory) {
 
       // we assume that the transformation does not change the windowing strategy.
-      WindowingStrategy<?, ?> windowingStrategy =
-          context.getInput(transform).getWindowingStrategy();
-
-      List<TaggedPValue> outputs = context.getOutputs(transform);
+      WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
 
       Map<TupleTag<?>, Integer> tagsToLabels =
-          transformTupleTagsToLabels(transform.getMainOutputTag(), outputs);
-
-      List<PCollectionView<?>> sideInputs = transform.getSideInputs();
+          transformTupleTagsToLabels(mainOutputTag, outputs);
 
       SingleOutputStreamOperator<RawUnionValue> unionOutputStream;
 
-      @SuppressWarnings("unchecked")
-      PCollection<InputT> inputPCollection = (PCollection<InputT>) context.getInput(transform);
+      Coder<WindowedValue<InputT>> inputCoder = context.getCoder(input);
 
-      Coder<WindowedValue<InputT>> inputCoder = context.getCoder(inputPCollection);
+      DataStream<WindowedValue<InputT>> inputDataStream = context.getInputDataStream(input);
 
-      DataStream<WindowedValue<InputT>> inputDataStream =
-          context.getInputDataStream(context.getInput(transform));
       Coder keyCoder = null;
       boolean stateful = false;
-      DoFnSignature signature = DoFnSignatures.getSignature(transform.getFn().getClass());
+      DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass());
       if (signature.stateDeclarations().size() > 0
           || signature.timerDeclarations().size() > 0) {
         // Based on the fact that the signature is stateful, DoFnSignatures ensures
         // that it is also keyed
-        keyCoder = ((KvCoder) inputPCollection.getCoder()).getKeyCoder();
+        keyCoder = ((KvCoder) input.getCoder()).getKeyCoder();
         inputDataStream = inputDataStream.keyBy(new KvToByteBufferKeySelector(keyCoder));
         stateful = true;
+      } else if (doFn instanceof SplittableParDo.ProcessFn) {
+        // we know that it is keyed on String
+        keyCoder = StringUtf8Coder.of();
+        stateful = true;
       }
 
       if (sideInputs.isEmpty()) {
         DoFnOperator<InputT, OutputT, RawUnionValue> doFnOperator =
-            new DoFnOperator<>(
-                transform.getFn(),
-                inputCoder,
-                transform.getMainOutputTag(),
-                transform.getSideOutputTags().getAll(),
-                new DoFnOperator.MultiOutputOutputManagerFactory(tagsToLabels),
+            doFnOperatorFactory.createDoFnOperator(
+                doFn,
+                sideInputs,
+                mainOutputTag,
+                sideOutputTags,
+                context,
                 windowingStrategy,
-                new HashMap<Integer, PCollectionView<?>>(), /* side-input mapping
*/
-                Collections.<PCollectionView<?>>emptyList(), /* side inputs */
-                context.getPipelineOptions(),
-                keyCoder);
+                tagsToLabels,
+                inputCoder,
+                keyCoder,
+                new HashMap<Integer, PCollectionView<?>>() /* side-input mapping
*/);
 
         UnionCoder outputUnionCoder = createUnionCoder(outputs);
 
@@ -460,24 +474,24 @@ class FlinkStreamingTransformTranslators {
             new CoderTypeInformation<>(outputUnionCoder);
 
         unionOutputStream = inputDataStream
-            .transform(transform.getName(), outputUnionTypeInformation, doFnOperator);
+            .transform(transformName, outputUnionTypeInformation, doFnOperator);
 
       } else {
         Tuple2<Map<Integer, PCollectionView<?>>, DataStream<RawUnionValue>>
transformedSideInputs =
             transformSideInputs(sideInputs, context);
 
         DoFnOperator<InputT, OutputT, RawUnionValue> doFnOperator =
-            new DoFnOperator<>(
-                transform.getFn(),
-                inputCoder,
-                transform.getMainOutputTag(),
-                transform.getSideOutputTags().getAll(),
-                new DoFnOperator.MultiOutputOutputManagerFactory(tagsToLabels),
-                windowingStrategy,
-                transformedSideInputs.f0,
+            doFnOperatorFactory.createDoFnOperator(
+                doFn,
                 sideInputs,
-                context.getPipelineOptions(),
-                keyCoder);
+                mainOutputTag,
+                sideOutputTags,
+                context,
+                windowingStrategy,
+                tagsToLabels,
+                inputCoder,
+                keyCoder,
+                transformedSideInputs.f0);
 
         UnionCoder outputUnionCoder = createUnionCoder(outputs);
 
@@ -494,7 +508,7 @@ class FlinkStreamingTransformTranslators {
               WindowedValue<OutputT>> rawFlinkTransform = new TwoInputTransformation(
               keyedStream.getTransformation(),
               transformedSideInputs.f1.broadcast().getTransformation(),
-              transform.getName(),
+              transformName,
               (TwoInputStreamOperator) doFnOperator,
               outputUnionTypeInformation,
               keyedStream.getParallelism());
@@ -511,7 +525,7 @@ class FlinkStreamingTransformTranslators {
         } else {
           unionOutputStream = inputDataStream
               .connect(transformedSideInputs.f1.broadcast())
-              .transform(transform.getName(), outputUnionTypeInformation, doFnOperator);
+              .transform(transformName, outputUnionTypeInformation, doFnOperator);
         }
       }
 
@@ -541,7 +555,7 @@ class FlinkStreamingTransformTranslators {
       }
     }
 
-    private Map<TupleTag<?>, Integer> transformTupleTagsToLabels(
+    private static Map<TupleTag<?>, Integer> transformTupleTagsToLabels(
         TupleTag<?> mainTag,
         List<TaggedPValue> allTaggedValues) {
 
@@ -556,7 +570,7 @@ class FlinkStreamingTransformTranslators {
       return tagToLabelMap;
     }
 
-    private UnionCoder createUnionCoder(Collection<TaggedPValue> taggedCollections)
{
+    private static UnionCoder createUnionCoder(Collection<TaggedPValue> taggedCollections)
{
       List<Coder<?>> outputCoders = Lists.newArrayList();
       for (TaggedPValue taggedColl : taggedCollections) {
         checkArgument(
@@ -575,6 +589,112 @@ class FlinkStreamingTransformTranslators {
     }
   }
 
+  private static class ParDoStreamingTranslator<InputT, OutputT>
+      extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<
+      ParDo.MultiOutput<InputT, OutputT>> {
+
+    @Override
+    public void translateNode(
+        ParDo.MultiOutput<InputT, OutputT> transform,
+        FlinkStreamingTranslationContext context) {
+
+      ParDoTranslationHelper.translateParDo(
+          transform.getName(),
+          transform.getFn(),
+          (PCollection<InputT>) context.getInput(transform),
+          transform.getSideInputs(),
+          context.getOutputs(transform),
+          transform.getMainOutputTag(),
+          transform.getSideOutputTags().getAll(),
+          context,
+          new ParDoTranslationHelper.DoFnOperatorFactory<InputT, OutputT>() {
+            @Override
+            public DoFnOperator<InputT, OutputT, RawUnionValue> createDoFnOperator(
+                DoFn<InputT, OutputT> doFn,
+                List<PCollectionView<?>> sideInputs,
+                TupleTag<OutputT> mainOutputTag,
+                List<TupleTag<?>> sideOutputTags,
+                FlinkStreamingTranslationContext context,
+                WindowingStrategy<?, ?> windowingStrategy,
+                Map<TupleTag<?>, Integer> tagsToLabels,
+                Coder<WindowedValue<InputT>> inputCoder,
+                Coder keyCoder,
+                Map<Integer, PCollectionView<?>> transformedSideInputs) {
+              return new DoFnOperator<>(
+                  doFn,
+                  inputCoder,
+                  mainOutputTag,
+                  sideOutputTags,
+                  new DoFnOperator.MultiOutputOutputManagerFactory(tagsToLabels),
+                  windowingStrategy,
+                  transformedSideInputs,
+                  sideInputs,
+                  context.getPipelineOptions(),
+                  keyCoder);
+            }
+          });
+    }
+  }
+
+  private static class SplittableProcessElementsStreamingTranslator<
+      InputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker<RestrictionT>>
+      extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<
+      SplittableParDo.ProcessElements<InputT, OutputT, RestrictionT, TrackerT>>
{
+
+    @Override
+    public void translateNode(
+        SplittableParDo.ProcessElements<InputT, OutputT, RestrictionT, TrackerT> transform,
+        FlinkStreamingTranslationContext context) {
+
+      ParDoTranslationHelper.translateParDo(
+          transform.getName(),
+          transform.newProcessFn(transform.getFn()),
+          (PCollection<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>>)
+              context.getInput(transform),
+          transform.getSideInputs(),
+          context.getOutputs(transform),
+          transform.getMainOutputTag(),
+          transform.getSideOutputTags().getAll(),
+          context,
+          new ParDoTranslationHelper.DoFnOperatorFactory<
+              KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>,
OutputT>() {
+            @Override
+            public DoFnOperator<
+                KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>,
+                OutputT,
+                RawUnionValue> createDoFnOperator(
+                    DoFn<
+                        KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>,
+                        OutputT> doFn,
+                    List<PCollectionView<?>> sideInputs,
+                    TupleTag<OutputT> mainOutputTag,
+                    List<TupleTag<?>> sideOutputTags,
+                    FlinkStreamingTranslationContext context,
+                    WindowingStrategy<?, ?> windowingStrategy,
+                    Map<TupleTag<?>, Integer> tagsToLabels,
+                    Coder<
+                        WindowedValue<
+                            KeyedWorkItem<
+                                String,
+                                ElementAndRestriction<InputT, RestrictionT>>>>
inputCoder,
+                    Coder keyCoder,
+                    Map<Integer, PCollectionView<?>> transformedSideInputs) {
+              return new SplittableDoFnOperator<>(
+                  doFn,
+                  inputCoder,
+                  mainOutputTag,
+                  sideOutputTags,
+                  new DoFnOperator.MultiOutputOutputManagerFactory(tagsToLabels),
+                  windowingStrategy,
+                  transformedSideInputs,
+                  sideInputs,
+                  context.getPipelineOptions(),
+                  keyCoder);
+            }
+          });
+    }
+  }
+
   private static class CreateViewStreamingTranslator<ElemT, ViewT>
       extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<
       FlinkStreamingViewOverrides.CreateFlinkPCollectionView<ElemT, ViewT>> {
@@ -677,7 +797,7 @@ class FlinkStreamingTransformTranslators {
 
       DataStream<WindowedValue<SingletonKeyedWorkItem<K, InputT>>> workItemStream
=
           inputDataStream
-              .flatMap(new CombinePerKeyTranslator.ToKeyedWorkItem<K, InputT>())
+              .flatMap(new ToKeyedWorkItem<K, InputT>())
               .returns(workItemTypeInfo).name("ToKeyedWorkItem");
 
       KeyedStream<
@@ -861,30 +981,56 @@ class FlinkStreamingTransformTranslators {
         context.setOutputDataStream(context.getOutput(transform), outDataStream);
       }
     }
+  }
 
-    private static class ToKeyedWorkItem<K, InputT>
-        extends RichFlatMapFunction<
-          WindowedValue<KV<K, InputT>>,
-          WindowedValue<SingletonKeyedWorkItem<K, InputT>>> {
-
-      @Override
-      public void flatMap(
-          WindowedValue<KV<K, InputT>> inWithMultipleWindows,
-          Collector<WindowedValue<SingletonKeyedWorkItem<K, InputT>>> out)
throws Exception {
-
-        // we need to wrap each one work item per window for now
-        // since otherwise the PushbackSideInputRunner will not correctly
-        // determine whether side inputs are ready
-        for (WindowedValue<KV<K, InputT>> in : inWithMultipleWindows.explodeWindows())
{
-          SingletonKeyedWorkItem<K, InputT> workItem =
-              new SingletonKeyedWorkItem<>(
-                  in.getValue().getKey(),
-                  in.withValue(in.getValue().getValue()));
-
-          in.withValue(workItem);
-          out.collect(in.withValue(workItem));
-        }
-      }
+  private static class GBKIntoKeyedWorkItemsTranslator<K, InputT>
+      extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<
+      SplittableParDo.GBKIntoKeyedWorkItems<K, InputT>> {
+
+    @Override
+    boolean canTranslate(
+        SplittableParDo.GBKIntoKeyedWorkItems<K, InputT> transform,
+        FlinkStreamingTranslationContext context) {
+      return true;
+    }
+
+    @Override
+    public void translateNode(
+        SplittableParDo.GBKIntoKeyedWorkItems<K, InputT> transform,
+        FlinkStreamingTranslationContext context) {
+
+      PCollection<KV<K, InputT>> input = context.getInput(transform);
+
+      KvCoder<K, InputT> inputKvCoder = (KvCoder<K, InputT>) input.getCoder();
+
+      SingletonKeyedWorkItemCoder<K, InputT> workItemCoder = SingletonKeyedWorkItemCoder.of(
+          inputKvCoder.getKeyCoder(),
+          inputKvCoder.getValueCoder(),
+          input.getWindowingStrategy().getWindowFn().windowCoder());
+
+
+      WindowedValue.
+          FullWindowedValueCoder<SingletonKeyedWorkItem<K, InputT>> windowedWorkItemCoder
=
+          WindowedValue.getFullCoder(
+              workItemCoder,
+              input.getWindowingStrategy().getWindowFn().windowCoder());
+
+      CoderTypeInformation<WindowedValue<SingletonKeyedWorkItem<K, InputT>>>
workItemTypeInfo =
+          new CoderTypeInformation<>(windowedWorkItemCoder);
+
+      DataStream<WindowedValue<KV<K, InputT>>> inputDataStream = context.getInputDataStream(input);
+
+      DataStream<WindowedValue<SingletonKeyedWorkItem<K, InputT>>> workItemStream
=
+          inputDataStream
+              .flatMap(new ToKeyedWorkItem<K, InputT>())
+              .returns(workItemTypeInfo).name("ToKeyedWorkItem");
+
+      KeyedStream<
+          WindowedValue<
+              SingletonKeyedWorkItem<K, InputT>>, ByteBuffer> keyedWorkItemStream
= workItemStream
+          .keyBy(new WorkItemKeySelector<K, InputT>(inputKvCoder.getKeyCoder()));
+
+      context.setOutputDataStream(context.getOutput(transform), keyedWorkItemStream);
     }
   }
 
@@ -931,4 +1077,31 @@ class FlinkStreamingTransformTranslators {
       }
     }
   }
+
+  private static class ToKeyedWorkItem<K, InputT>
+      extends RichFlatMapFunction<
+      WindowedValue<KV<K, InputT>>,
+      WindowedValue<SingletonKeyedWorkItem<K, InputT>>> {
+
+    @Override
+    public void flatMap(
+        WindowedValue<KV<K, InputT>> inWithMultipleWindows,
+        Collector<WindowedValue<SingletonKeyedWorkItem<K, InputT>>> out)
throws Exception {
+
+      // we need to wrap each one work item per window for now
+      // since otherwise the PushbackSideInputRunner will not correctly
+      // determine whether side inputs are ready
+      //
+      // this is tracked as https://issues.apache.org/jira/browse/BEAM-1850
+      for (WindowedValue<KV<K, InputT>> in : inWithMultipleWindows.explodeWindows())
{
+        SingletonKeyedWorkItem<K, InputT> workItem =
+            new SingletonKeyedWorkItem<>(
+                in.getValue().getKey(),
+                in.withValue(in.getValue().getValue()));
+
+        out.collect(in.withValue(workItem));
+      }
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/5824bb4b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
new file mode 100644
index 0000000..0724ac2
--- /dev/null
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers.streaming;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import org.apache.beam.runners.core.ElementAndRestriction;
+import org.apache.beam.runners.core.KeyedWorkItem;
+import org.apache.beam.runners.core.KeyedWorkItems;
+import org.apache.beam.runners.core.OutputAndTimeBoundedSplittableProcessElementInvoker;
+import org.apache.beam.runners.core.OutputWindowedValue;
+import org.apache.beam.runners.core.SplittableParDo;
+import org.apache.beam.runners.core.StateInternals;
+import org.apache.beam.runners.core.StateInternalsFactory;
+import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.runners.core.TimerInternalsFactory;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * Flink operator for executing splittable {@link DoFn DoFns}. Specifically, for executing
+ * the {@code @ProcessElement} method of a splittable {@link DoFn}.
+ */
+public class SplittableDoFnOperator<
+    InputT, FnOutputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker<RestrictionT>>
+    extends DoFnOperator<
+    KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, FnOutputT,
OutputT> {
+
+  public SplittableDoFnOperator(
+      DoFn<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>,
FnOutputT> doFn,
+      Coder<
+          WindowedValue<
+              KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>>>
inputCoder,
+      TupleTag<FnOutputT> mainOutputTag,
+      List<TupleTag<?>> sideOutputTags,
+      OutputManagerFactory<OutputT> outputManagerFactory,
+      WindowingStrategy<?, ?> windowingStrategy,
+      Map<Integer, PCollectionView<?>> sideInputTagMapping,
+      Collection<PCollectionView<?>> sideInputs,
+      PipelineOptions options,
+      Coder<?> keyCoder) {
+    super(
+        doFn,
+        inputCoder,
+        mainOutputTag,
+        sideOutputTags,
+        outputManagerFactory,
+        windowingStrategy,
+        sideInputTagMapping,
+        sideInputs,
+        options,
+        keyCoder);
+
+  }
+
+  @Override
+  public void open() throws Exception {
+    super.open();
+
+    checkState(doFn instanceof SplittableParDo.ProcessFn);
+
+    StateInternalsFactory<String> stateInternalsFactory = new StateInternalsFactory<String>()
{
+      @Override
+      public StateInternals<String> stateInternalsForKey(String key) {
+        //this will implicitly be keyed by the key of the incoming
+        // element or by the key of a firing timer
+        return (StateInternals<String>) stateInternals;
+      }
+    };
+    TimerInternalsFactory<String> timerInternalsFactory = new TimerInternalsFactory<String>()
{
+      @Override
+      public TimerInternals timerInternalsForKey(String key) {
+        //this will implicitly be keyed like the StateInternalsFactory
+        return timerInternals;
+      }
+    };
+
+    ((SplittableParDo.ProcessFn) doFn).setStateInternalsFactory(stateInternalsFactory);
+    ((SplittableParDo.ProcessFn) doFn).setTimerInternalsFactory(timerInternalsFactory);
+    ((SplittableParDo.ProcessFn) doFn).setProcessElementInvoker(
+        new OutputAndTimeBoundedSplittableProcessElementInvoker<>(
+            doFn,
+            serializedOptions.getPipelineOptions(),
+            new OutputWindowedValue<FnOutputT>() {
+              @Override
+              public void outputWindowedValue(
+                  FnOutputT output,
+                  Instant timestamp,
+                  Collection<? extends BoundedWindow> windows,
+                  PaneInfo pane) {
+                outputManager.output(
+                    mainOutputTag,
+                    WindowedValue.of(output, timestamp, windows, pane));
+              }
+
+              @Override
+              public <SideOutputT> void sideOutputWindowedValue(
+                  TupleTag<SideOutputT> tag,
+                  SideOutputT output,
+                  Instant timestamp,
+                  Collection<? extends BoundedWindow> windows,
+                  PaneInfo pane) {
+                outputManager.output(tag, WindowedValue.of(output, timestamp, windows, pane));
+              }
+            },
+            sideInputReader,
+            Executors.newSingleThreadScheduledExecutor(Executors.defaultThreadFactory()),
+            10000,
+            Duration.standardSeconds(10)));
+  }
+
+  @Override
+  public void fireTimer(InternalTimer<?, TimerInternals.TimerData> timer) {
+    pushbackDoFnRunner.processElement(WindowedValue.valueInGlobalWindow(
+        KeyedWorkItems.<String, ElementAndRestriction<InputT, RestrictionT>>timersWorkItem(
+            (String) stateInternals.getKey(),
+            Collections.singletonList(timer.getNamespace()))));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/5824bb4b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
index acd5584..d926f6c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
@@ -34,6 +34,7 @@ import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.testing.TestStream;
 import org.apache.beam.sdk.testing.UsesSplittableParDo;
+import org.apache.beam.sdk.testing.UsesTestStream;
 import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.DoFn.BoundedPerElement;
 import org.apache.beam.sdk.transforms.splittabledofn.OffsetRange;
@@ -292,7 +293,7 @@ public class SplittableDoFnTest {
   }
 
   @Test
-  @Category({ValidatesRunner.class, UsesSplittableParDo.class})
+  @Category({ValidatesRunner.class, UsesSplittableParDo.class, UsesTestStream.class})
   public void testLateData() throws Exception {
 
     Instant base = Instant.now();


Mime
View raw message