Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 55327200C4F for ; Sat, 1 Apr 2017 10:12:26 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 53828160B9D; Sat, 1 Apr 2017 08:12:26 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id A5BB5160B8D for ; Sat, 1 Apr 2017 10:12:24 +0200 (CEST) Received: (qmail 17946 invoked by uid 500); 1 Apr 2017 08:12:18 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 17926 invoked by uid 99); 1 Apr 2017 08:12:18 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 01 Apr 2017 08:12:18 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id AA933DFEF3; Sat, 1 Apr 2017 08:12:18 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jkff@apache.org To: commits@beam.apache.org Date: Sat, 01 Apr 2017 08:12:18 -0000 Message-Id: <22a5dd89c6f34603b10c128a7a69888e@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/2] beam git commit: [BEAM-1855] Support Splittable DoFn on Flink Runner archived-at: Sat, 01 Apr 2017 08:12:26 -0000 Repository: beam Updated Branches: refs/heads/master e31ca8b0d -> ea33e3373 [BEAM-1855] Support Splittable DoFn on Flink Runner Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5824bb4b Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5824bb4b Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5824bb4b Branch: refs/heads/master Commit: 5824bb4b5700b5230f569c570d5e8ed2d11cedf2 Parents: e31ca8b Author: Aljoscha Krettek Authored: Mon Mar 13 21:23:14 2017 +0100 Committer: Eugene Kirpichov Committed: Sat Apr 1 01:10:55 2017 -0700 ---------------------------------------------------------------------- runners/flink/runner/pom.xml | 4 +- .../flink/FlinkStreamingPipelineTranslator.java | 37 ++ .../FlinkStreamingTransformTranslators.java | 341 ++++++++++++++----- .../streaming/SplittableDoFnOperator.java | 150 ++++++++ .../beam/sdk/transforms/SplittableDoFnTest.java | 3 +- 5 files changed, 448 insertions(+), 87 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/5824bb4b/runners/flink/runner/pom.xml ---------------------------------------------------------------------- diff --git a/runners/flink/runner/pom.xml b/runners/flink/runner/pom.xml index 3b35c8e..f26aeb0 100644 --- a/runners/flink/runner/pom.xml +++ b/runners/flink/runner/pom.xml @@ -88,9 +88,9 @@ org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders, org.apache.beam.sdk.testing.UsesSetState, org.apache.beam.sdk.testing.UsesMapState, - org.apache.beam.sdk.testing.UsesSplittableParDo, org.apache.beam.sdk.testing.UsesAttemptedMetrics, - org.apache.beam.sdk.testing.UsesCommittedMetrics + org.apache.beam.sdk.testing.UsesCommittedMetrics, + org.apache.beam.sdk.testing.UsesTestStream none true http://git-wip-us.apache.org/repos/asf/beam/blob/5824bb4b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java index d50d6bf..0cedf66 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java @@ -18,8 +18,12 @@ package org.apache.beam.runners.flink; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import java.util.List; import java.util.Map; +import org.apache.beam.runners.core.SplittableParDo; import org.apache.beam.runners.core.construction.PTransformMatchers; +import org.apache.beam.runners.core.construction.ReplacementOutputs; import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.options.PipelineOptions; @@ -29,9 +33,13 @@ import org.apache.beam.sdk.runners.PTransformOverrideFactory; import org.apache.beam.sdk.runners.TransformHierarchy; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.util.InstanceBuilder; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.PValue; +import org.apache.beam.sdk.values.TaggedPValue; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -66,6 +74,8 @@ class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator { Map transformOverrides = ImmutableMap.builder() .put( + PTransformMatchers.splittableParDoMulti(), new SplittableParDoOverrideFactory()) + .put( PTransformMatchers.classEqualTo(View.AsIterable.class), new ReflectiveOneToOneOverrideFactory( FlinkStreamingViewOverrides.StreamingViewAsIterable.class, flinkRunner)) @@ -228,4 +238,31 @@ class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator { } } + /** + * A {@link PTransformOverrideFactory} that overrides a + * Splittable DoFn with + * {@link SplittableParDo}. + */ + static class SplittableParDoOverrideFactory + implements PTransformOverrideFactory< + PCollection, PCollectionTuple, ParDo.MultiOutput> { + @Override + @SuppressWarnings("unchecked") + public PTransform, PCollectionTuple> getReplacementTransform( + ParDo.MultiOutput transform) { + return new SplittableParDo(transform); + } + + @Override + public PCollection getInput( + List inputs, Pipeline p) { + return (PCollection) Iterables.getOnlyElement(inputs).getValue(); + } + + @Override + public Map mapOutputs( + List outputs, PCollectionTuple newOutput) { + return ReplacementOutputs.tagged(outputs, newOutput); + } + } } http://git-wip-us.apache.org/repos/asf/beam/blob/5824bb4b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java index 00b0412..5c29db2 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java @@ -29,6 +29,9 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.beam.runners.core.ElementAndRestriction; +import org.apache.beam.runners.core.KeyedWorkItem; +import org.apache.beam.runners.core.SplittableParDo; import org.apache.beam.runners.core.SystemReduceFn; import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows; import org.apache.beam.runners.flink.translation.types.CoderTypeInformation; @@ -37,6 +40,7 @@ import org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator import org.apache.beam.runners.flink.translation.wrappers.streaming.KvToByteBufferKeySelector; import org.apache.beam.runners.flink.translation.wrappers.streaming.SingletonKeyedWorkItem; import org.apache.beam.runners.flink.translation.wrappers.streaming.SingletonKeyedWorkItemCoder; +import org.apache.beam.runners.flink.translation.wrappers.streaming.SplittableDoFnOperator; import org.apache.beam.runners.flink.translation.wrappers.streaming.WindowDoFnOperator; import org.apache.beam.runners.flink.translation.wrappers.streaming.WorkItemKeySelector; import org.apache.beam.runners.flink.translation.wrappers.streaming.io.BoundedSourceWrapper; @@ -45,6 +49,7 @@ import org.apache.beam.runners.flink.translation.wrappers.streaming.io.Unbounded import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.io.Sink; @@ -60,6 +65,7 @@ import org.apache.beam.sdk.transforms.join.RawUnionValue; import org.apache.beam.sdk.transforms.join.UnionCoder; import org.apache.beam.sdk.transforms.reflect.DoFnSignature; import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; @@ -122,6 +128,11 @@ class FlinkStreamingTransformTranslators { TRANSLATORS.put(TextIO.Write.Bound.class, new TextIOWriteBoundStreamingTranslator()); TRANSLATORS.put(ParDo.MultiOutput.class, new ParDoStreamingTranslator()); + TRANSLATORS.put( + SplittableParDo.ProcessElements.class, new SplittableProcessElementsStreamingTranslator()); + TRANSLATORS.put( + SplittableParDo.GBKIntoKeyedWorkItems.class, new GBKIntoKeyedWorkItemsTranslator()); + TRANSLATORS.put(Window.Assign.class, new WindowAssignTranslator()); TRANSLATORS.put(Flatten.PCollections.class, new FlattenPCollectionTranslator()); @@ -309,16 +320,6 @@ class FlinkStreamingTransformTranslators { } } - private static void rejectSplittable(DoFn doFn) { - DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass()); - if (signature.processElement().isSplittable()) { - throw new UnsupportedOperationException( - String.format( - "%s does not currently support splittable DoFn: %s", - FlinkRunner.class.getSimpleName(), doFn)); - } - } - /** * Wraps each element in a {@link RawUnionValue} with the given tag id. */ @@ -395,64 +396,77 @@ class FlinkStreamingTransformTranslators { return new Tuple2<>(intToViewMapping, sideInputUnion); } + /** + * Helper for translating {@link ParDo.MultiOutput} and {@link SplittableParDo.ProcessElements}. + */ + static class ParDoTranslationHelper { + + interface DoFnOperatorFactory { + DoFnOperator createDoFnOperator( + DoFn doFn, + List> sideInputs, + TupleTag mainOutputTag, + List> sideOutputTags, + FlinkStreamingTranslationContext context, + WindowingStrategy windowingStrategy, + Map, Integer> tagsToLabels, + Coder> inputCoder, + Coder keyCoder, + Map> transformedSideInputs); + } - private static class ParDoStreamingTranslator - extends FlinkStreamingPipelineTranslator.StreamTransformTranslator< - ParDo.MultiOutput> { - - @Override - public void translateNode( - ParDo.MultiOutput transform, - FlinkStreamingTranslationContext context) { - - DoFn doFn = transform.getFn(); - rejectSplittable(doFn); + static void translateParDo( + String transformName, + DoFn doFn, + PCollection input, + List> sideInputs, + List outputs, + TupleTag mainOutputTag, + List> sideOutputTags, + FlinkStreamingTranslationContext context, + DoFnOperatorFactory doFnOperatorFactory) { // we assume that the transformation does not change the windowing strategy. - WindowingStrategy windowingStrategy = - context.getInput(transform).getWindowingStrategy(); - - List outputs = context.getOutputs(transform); + WindowingStrategy windowingStrategy = input.getWindowingStrategy(); Map, Integer> tagsToLabels = - transformTupleTagsToLabels(transform.getMainOutputTag(), outputs); - - List> sideInputs = transform.getSideInputs(); + transformTupleTagsToLabels(mainOutputTag, outputs); SingleOutputStreamOperator unionOutputStream; - @SuppressWarnings("unchecked") - PCollection inputPCollection = (PCollection) context.getInput(transform); + Coder> inputCoder = context.getCoder(input); - Coder> inputCoder = context.getCoder(inputPCollection); + DataStream> inputDataStream = context.getInputDataStream(input); - DataStream> inputDataStream = - context.getInputDataStream(context.getInput(transform)); Coder keyCoder = null; boolean stateful = false; - DoFnSignature signature = DoFnSignatures.getSignature(transform.getFn().getClass()); + DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass()); if (signature.stateDeclarations().size() > 0 || signature.timerDeclarations().size() > 0) { // Based on the fact that the signature is stateful, DoFnSignatures ensures // that it is also keyed - keyCoder = ((KvCoder) inputPCollection.getCoder()).getKeyCoder(); + keyCoder = ((KvCoder) input.getCoder()).getKeyCoder(); inputDataStream = inputDataStream.keyBy(new KvToByteBufferKeySelector(keyCoder)); stateful = true; + } else if (doFn instanceof SplittableParDo.ProcessFn) { + // we know that it is keyed on String + keyCoder = StringUtf8Coder.of(); + stateful = true; } if (sideInputs.isEmpty()) { DoFnOperator doFnOperator = - new DoFnOperator<>( - transform.getFn(), - inputCoder, - transform.getMainOutputTag(), - transform.getSideOutputTags().getAll(), - new DoFnOperator.MultiOutputOutputManagerFactory(tagsToLabels), + doFnOperatorFactory.createDoFnOperator( + doFn, + sideInputs, + mainOutputTag, + sideOutputTags, + context, windowingStrategy, - new HashMap>(), /* side-input mapping */ - Collections.>emptyList(), /* side inputs */ - context.getPipelineOptions(), - keyCoder); + tagsToLabels, + inputCoder, + keyCoder, + new HashMap>() /* side-input mapping */); UnionCoder outputUnionCoder = createUnionCoder(outputs); @@ -460,24 +474,24 @@ class FlinkStreamingTransformTranslators { new CoderTypeInformation<>(outputUnionCoder); unionOutputStream = inputDataStream - .transform(transform.getName(), outputUnionTypeInformation, doFnOperator); + .transform(transformName, outputUnionTypeInformation, doFnOperator); } else { Tuple2>, DataStream> transformedSideInputs = transformSideInputs(sideInputs, context); DoFnOperator doFnOperator = - new DoFnOperator<>( - transform.getFn(), - inputCoder, - transform.getMainOutputTag(), - transform.getSideOutputTags().getAll(), - new DoFnOperator.MultiOutputOutputManagerFactory(tagsToLabels), - windowingStrategy, - transformedSideInputs.f0, + doFnOperatorFactory.createDoFnOperator( + doFn, sideInputs, - context.getPipelineOptions(), - keyCoder); + mainOutputTag, + sideOutputTags, + context, + windowingStrategy, + tagsToLabels, + inputCoder, + keyCoder, + transformedSideInputs.f0); UnionCoder outputUnionCoder = createUnionCoder(outputs); @@ -494,7 +508,7 @@ class FlinkStreamingTransformTranslators { WindowedValue> rawFlinkTransform = new TwoInputTransformation( keyedStream.getTransformation(), transformedSideInputs.f1.broadcast().getTransformation(), - transform.getName(), + transformName, (TwoInputStreamOperator) doFnOperator, outputUnionTypeInformation, keyedStream.getParallelism()); @@ -511,7 +525,7 @@ class FlinkStreamingTransformTranslators { } else { unionOutputStream = inputDataStream .connect(transformedSideInputs.f1.broadcast()) - .transform(transform.getName(), outputUnionTypeInformation, doFnOperator); + .transform(transformName, outputUnionTypeInformation, doFnOperator); } } @@ -541,7 +555,7 @@ class FlinkStreamingTransformTranslators { } } - private Map, Integer> transformTupleTagsToLabels( + private static Map, Integer> transformTupleTagsToLabels( TupleTag mainTag, List allTaggedValues) { @@ -556,7 +570,7 @@ class FlinkStreamingTransformTranslators { return tagToLabelMap; } - private UnionCoder createUnionCoder(Collection taggedCollections) { + private static UnionCoder createUnionCoder(Collection taggedCollections) { List> outputCoders = Lists.newArrayList(); for (TaggedPValue taggedColl : taggedCollections) { checkArgument( @@ -575,6 +589,112 @@ class FlinkStreamingTransformTranslators { } } + private static class ParDoStreamingTranslator + extends FlinkStreamingPipelineTranslator.StreamTransformTranslator< + ParDo.MultiOutput> { + + @Override + public void translateNode( + ParDo.MultiOutput transform, + FlinkStreamingTranslationContext context) { + + ParDoTranslationHelper.translateParDo( + transform.getName(), + transform.getFn(), + (PCollection) context.getInput(transform), + transform.getSideInputs(), + context.getOutputs(transform), + transform.getMainOutputTag(), + transform.getSideOutputTags().getAll(), + context, + new ParDoTranslationHelper.DoFnOperatorFactory() { + @Override + public DoFnOperator createDoFnOperator( + DoFn doFn, + List> sideInputs, + TupleTag mainOutputTag, + List> sideOutputTags, + FlinkStreamingTranslationContext context, + WindowingStrategy windowingStrategy, + Map, Integer> tagsToLabels, + Coder> inputCoder, + Coder keyCoder, + Map> transformedSideInputs) { + return new DoFnOperator<>( + doFn, + inputCoder, + mainOutputTag, + sideOutputTags, + new DoFnOperator.MultiOutputOutputManagerFactory(tagsToLabels), + windowingStrategy, + transformedSideInputs, + sideInputs, + context.getPipelineOptions(), + keyCoder); + } + }); + } + } + + private static class SplittableProcessElementsStreamingTranslator< + InputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker> + extends FlinkStreamingPipelineTranslator.StreamTransformTranslator< + SplittableParDo.ProcessElements> { + + @Override + public void translateNode( + SplittableParDo.ProcessElements transform, + FlinkStreamingTranslationContext context) { + + ParDoTranslationHelper.translateParDo( + transform.getName(), + transform.newProcessFn(transform.getFn()), + (PCollection>>) + context.getInput(transform), + transform.getSideInputs(), + context.getOutputs(transform), + transform.getMainOutputTag(), + transform.getSideOutputTags().getAll(), + context, + new ParDoTranslationHelper.DoFnOperatorFactory< + KeyedWorkItem>, OutputT>() { + @Override + public DoFnOperator< + KeyedWorkItem>, + OutputT, + RawUnionValue> createDoFnOperator( + DoFn< + KeyedWorkItem>, + OutputT> doFn, + List> sideInputs, + TupleTag mainOutputTag, + List> sideOutputTags, + FlinkStreamingTranslationContext context, + WindowingStrategy windowingStrategy, + Map, Integer> tagsToLabels, + Coder< + WindowedValue< + KeyedWorkItem< + String, + ElementAndRestriction>>> inputCoder, + Coder keyCoder, + Map> transformedSideInputs) { + return new SplittableDoFnOperator<>( + doFn, + inputCoder, + mainOutputTag, + sideOutputTags, + new DoFnOperator.MultiOutputOutputManagerFactory(tagsToLabels), + windowingStrategy, + transformedSideInputs, + sideInputs, + context.getPipelineOptions(), + keyCoder); + } + }); + } + } + private static class CreateViewStreamingTranslator extends FlinkStreamingPipelineTranslator.StreamTransformTranslator< FlinkStreamingViewOverrides.CreateFlinkPCollectionView> { @@ -677,7 +797,7 @@ class FlinkStreamingTransformTranslators { DataStream>> workItemStream = inputDataStream - .flatMap(new CombinePerKeyTranslator.ToKeyedWorkItem()) + .flatMap(new ToKeyedWorkItem()) .returns(workItemTypeInfo).name("ToKeyedWorkItem"); KeyedStream< @@ -861,30 +981,56 @@ class FlinkStreamingTransformTranslators { context.setOutputDataStream(context.getOutput(transform), outDataStream); } } + } - private static class ToKeyedWorkItem - extends RichFlatMapFunction< - WindowedValue>, - WindowedValue>> { - - @Override - public void flatMap( - WindowedValue> inWithMultipleWindows, - Collector>> out) throws Exception { - - // we need to wrap each one work item per window for now - // since otherwise the PushbackSideInputRunner will not correctly - // determine whether side inputs are ready - for (WindowedValue> in : inWithMultipleWindows.explodeWindows()) { - SingletonKeyedWorkItem workItem = - new SingletonKeyedWorkItem<>( - in.getValue().getKey(), - in.withValue(in.getValue().getValue())); - - in.withValue(workItem); - out.collect(in.withValue(workItem)); - } - } + private static class GBKIntoKeyedWorkItemsTranslator + extends FlinkStreamingPipelineTranslator.StreamTransformTranslator< + SplittableParDo.GBKIntoKeyedWorkItems> { + + @Override + boolean canTranslate( + SplittableParDo.GBKIntoKeyedWorkItems transform, + FlinkStreamingTranslationContext context) { + return true; + } + + @Override + public void translateNode( + SplittableParDo.GBKIntoKeyedWorkItems transform, + FlinkStreamingTranslationContext context) { + + PCollection> input = context.getInput(transform); + + KvCoder inputKvCoder = (KvCoder) input.getCoder(); + + SingletonKeyedWorkItemCoder workItemCoder = SingletonKeyedWorkItemCoder.of( + inputKvCoder.getKeyCoder(), + inputKvCoder.getValueCoder(), + input.getWindowingStrategy().getWindowFn().windowCoder()); + + + WindowedValue. + FullWindowedValueCoder> windowedWorkItemCoder = + WindowedValue.getFullCoder( + workItemCoder, + input.getWindowingStrategy().getWindowFn().windowCoder()); + + CoderTypeInformation>> workItemTypeInfo = + new CoderTypeInformation<>(windowedWorkItemCoder); + + DataStream>> inputDataStream = context.getInputDataStream(input); + + DataStream>> workItemStream = + inputDataStream + .flatMap(new ToKeyedWorkItem()) + .returns(workItemTypeInfo).name("ToKeyedWorkItem"); + + KeyedStream< + WindowedValue< + SingletonKeyedWorkItem>, ByteBuffer> keyedWorkItemStream = workItemStream + .keyBy(new WorkItemKeySelector(inputKvCoder.getKeyCoder())); + + context.setOutputDataStream(context.getOutput(transform), keyedWorkItemStream); } } @@ -931,4 +1077,31 @@ class FlinkStreamingTransformTranslators { } } } + + private static class ToKeyedWorkItem + extends RichFlatMapFunction< + WindowedValue>, + WindowedValue>> { + + @Override + public void flatMap( + WindowedValue> inWithMultipleWindows, + Collector>> out) throws Exception { + + // we need to wrap each one work item per window for now + // since otherwise the PushbackSideInputRunner will not correctly + // determine whether side inputs are ready + // + // this is tracked as https://issues.apache.org/jira/browse/BEAM-1850 + for (WindowedValue> in : inWithMultipleWindows.explodeWindows()) { + SingletonKeyedWorkItem workItem = + new SingletonKeyedWorkItem<>( + in.getValue().getKey(), + in.withValue(in.getValue().getValue())); + + out.collect(in.withValue(workItem)); + } + } + } + } http://git-wip-us.apache.org/repos/asf/beam/blob/5824bb4b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java new file mode 100644 index 0000000..0724ac2 --- /dev/null +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.wrappers.streaming; + +import static com.google.common.base.Preconditions.checkState; + +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Executors; +import org.apache.beam.runners.core.ElementAndRestriction; +import org.apache.beam.runners.core.KeyedWorkItem; +import org.apache.beam.runners.core.KeyedWorkItems; +import org.apache.beam.runners.core.OutputAndTimeBoundedSplittableProcessElementInvoker; +import org.apache.beam.runners.core.OutputWindowedValue; +import org.apache.beam.runners.core.SplittableParDo; +import org.apache.beam.runners.core.StateInternals; +import org.apache.beam.runners.core.StateInternalsFactory; +import org.apache.beam.runners.core.TimerInternals; +import org.apache.beam.runners.core.TimerInternalsFactory; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.flink.streaming.api.operators.InternalTimer; +import org.joda.time.Duration; +import org.joda.time.Instant; + +/** + * Flink operator for executing splittable {@link DoFn DoFns}. Specifically, for executing + * the {@code @ProcessElement} method of a splittable {@link DoFn}. + */ +public class SplittableDoFnOperator< + InputT, FnOutputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker> + extends DoFnOperator< + KeyedWorkItem>, FnOutputT, OutputT> { + + public SplittableDoFnOperator( + DoFn>, FnOutputT> doFn, + Coder< + WindowedValue< + KeyedWorkItem>>> inputCoder, + TupleTag mainOutputTag, + List> sideOutputTags, + OutputManagerFactory outputManagerFactory, + WindowingStrategy windowingStrategy, + Map> sideInputTagMapping, + Collection> sideInputs, + PipelineOptions options, + Coder keyCoder) { + super( + doFn, + inputCoder, + mainOutputTag, + sideOutputTags, + outputManagerFactory, + windowingStrategy, + sideInputTagMapping, + sideInputs, + options, + keyCoder); + + } + + @Override + public void open() throws Exception { + super.open(); + + checkState(doFn instanceof SplittableParDo.ProcessFn); + + StateInternalsFactory stateInternalsFactory = new StateInternalsFactory() { + @Override + public StateInternals stateInternalsForKey(String key) { + //this will implicitly be keyed by the key of the incoming + // element or by the key of a firing timer + return (StateInternals) stateInternals; + } + }; + TimerInternalsFactory timerInternalsFactory = new TimerInternalsFactory() { + @Override + public TimerInternals timerInternalsForKey(String key) { + //this will implicitly be keyed like the StateInternalsFactory + return timerInternals; + } + }; + + ((SplittableParDo.ProcessFn) doFn).setStateInternalsFactory(stateInternalsFactory); + ((SplittableParDo.ProcessFn) doFn).setTimerInternalsFactory(timerInternalsFactory); + ((SplittableParDo.ProcessFn) doFn).setProcessElementInvoker( + new OutputAndTimeBoundedSplittableProcessElementInvoker<>( + doFn, + serializedOptions.getPipelineOptions(), + new OutputWindowedValue() { + @Override + public void outputWindowedValue( + FnOutputT output, + Instant timestamp, + Collection windows, + PaneInfo pane) { + outputManager.output( + mainOutputTag, + WindowedValue.of(output, timestamp, windows, pane)); + } + + @Override + public void sideOutputWindowedValue( + TupleTag tag, + SideOutputT output, + Instant timestamp, + Collection windows, + PaneInfo pane) { + outputManager.output(tag, WindowedValue.of(output, timestamp, windows, pane)); + } + }, + sideInputReader, + Executors.newSingleThreadScheduledExecutor(Executors.defaultThreadFactory()), + 10000, + Duration.standardSeconds(10))); + } + + @Override + public void fireTimer(InternalTimer timer) { + pushbackDoFnRunner.processElement(WindowedValue.valueInGlobalWindow( + KeyedWorkItems.>timersWorkItem( + (String) stateInternals.getKey(), + Collections.singletonList(timer.getNamespace())))); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/5824bb4b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java index acd5584..d926f6c 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java @@ -34,6 +34,7 @@ import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.testing.TestStream; import org.apache.beam.sdk.testing.UsesSplittableParDo; +import org.apache.beam.sdk.testing.UsesTestStream; import org.apache.beam.sdk.testing.ValidatesRunner; import org.apache.beam.sdk.transforms.DoFn.BoundedPerElement; import org.apache.beam.sdk.transforms.splittabledofn.OffsetRange; @@ -292,7 +293,7 @@ public class SplittableDoFnTest { } @Test - @Category({ValidatesRunner.class, UsesSplittableParDo.class}) + @Category({ValidatesRunner.class, UsesSplittableParDo.class, UsesTestStream.class}) public void testLateData() throws Exception { Instant base = Instant.now();