Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 1461D200C8C for ; Tue, 6 Jun 2017 15:28:48 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 12E64160BC3; Tue, 6 Jun 2017 13:28:48 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 8B5E2160BC6 for ; Tue, 6 Jun 2017 15:28:46 +0200 (CEST) Received: (qmail 37818 invoked by uid 500); 6 Jun 2017 13:28:45 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 37679 invoked by uid 99); 6 Jun 2017 13:28:45 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 06 Jun 2017 13:28:45 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 93027E0202; Tue, 6 Jun 2017 13:28:45 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: aljoscha@apache.org To: commits@beam.apache.org Date: Tue, 06 Jun 2017 13:28:46 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [2/3] beam git commit: [BEAM-1498] Use Flink-native side outputs archived-at: Tue, 06 Jun 2017 13:28:48 -0000 [BEAM-1498] Use Flink-native side outputs Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b0601fd4 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b0601fd4 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b0601fd4 Branch: refs/heads/master Commit: b0601fd43e0929e8b925dbe566e564460f91d9fc Parents: 88f78fa Author: JingsongLi Authored: Sun Jun 4 21:56:10 2017 +0800 Committer: Aljoscha Krettek Committed: Tue Jun 6 14:33:36 2017 +0200 ---------------------------------------------------------------------- .../FlinkStreamingTransformTranslators.java | 145 ++++++------------- .../wrappers/streaming/DoFnOperator.java | 40 +++-- .../wrappers/streaming/WindowDoFnOperator.java | 4 +- .../beam/runners/flink/PipelineOptionsTest.java | 5 +- .../flink/streaming/DoFnOperatorTest.java | 65 +++++---- 5 files changed, 112 insertions(+), 147 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/b0601fd4/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java index 00e9934..d8c3049 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java @@ -18,9 +18,6 @@ package org.apache.beam.runners.flink; -import static com.google.common.base.Preconditions.checkArgument; - -import com.google.common.collect.Lists; import com.google.common.collect.Maps; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -29,7 +26,6 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Map.Entry; import org.apache.beam.runners.core.KeyedWorkItem; import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems; import org.apache.beam.runners.core.SystemReduceFn; @@ -84,16 +80,15 @@ import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.GenericTypeInfo; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; -import org.apache.flink.streaming.api.collector.selector.OutputSelector; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; -import org.apache.flink.streaming.api.datastream.SplitStream; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; import org.apache.flink.streaming.api.transformations.TwoInputTransformation; import org.apache.flink.util.Collector; +import org.apache.flink.util.OutputTag; /** * This class contains all the mappings between Beam and Flink @@ -337,7 +332,7 @@ class FlinkStreamingTransformTranslators { static class ParDoTranslationHelper { interface DoFnOperatorFactory { - DoFnOperator createDoFnOperator( + DoFnOperator createDoFnOperator( DoFn doFn, String stepName, List> sideInputs, @@ -345,7 +340,7 @@ class FlinkStreamingTransformTranslators { List> additionalOutputTags, FlinkStreamingTranslationContext context, WindowingStrategy windowingStrategy, - Map, Integer> tagsToLabels, + Map, OutputTag>> tagsToLabels, Coder> inputCoder, Coder keyCoder, Map> transformedSideInputs); @@ -354,7 +349,6 @@ class FlinkStreamingTransformTranslators { static void translateParDo( String transformName, DoFn doFn, - String stepName, PCollection input, List> sideInputs, Map, PValue> outputs, @@ -366,10 +360,15 @@ class FlinkStreamingTransformTranslators { // we assume that the transformation does not change the windowing strategy. WindowingStrategy windowingStrategy = input.getWindowingStrategy(); - Map, Integer> tagsToLabels = - transformTupleTagsToLabels(mainOutputTag, outputs); + Map, OutputTag>> tagsToOutputTags = Maps.newHashMap(); + for (Map.Entry, PValue> entry : outputs.entrySet()) { + if (!tagsToOutputTags.containsKey(entry.getKey())) { + tagsToOutputTags.put(entry.getKey(), new OutputTag<>(entry.getKey().getId(), + (TypeInformation) context.getTypeInfo((PCollection) entry.getValue()))); + } + } - SingleOutputStreamOperator unionOutputStream; + SingleOutputStreamOperator> outputStream; Coder> inputCoder = context.getCoder(input); @@ -391,8 +390,12 @@ class FlinkStreamingTransformTranslators { stateful = true; } + CoderTypeInformation> outputTypeInformation = + new CoderTypeInformation<>( + context.getCoder((PCollection) outputs.get(mainOutputTag))); + if (sideInputs.isEmpty()) { - DoFnOperator doFnOperator = + DoFnOperator doFnOperator = doFnOperatorFactory.createDoFnOperator( doFn, context.getCurrentTransform().getFullName(), @@ -401,24 +404,19 @@ class FlinkStreamingTransformTranslators { additionalOutputTags, context, windowingStrategy, - tagsToLabels, + tagsToOutputTags, inputCoder, keyCoder, new HashMap>() /* side-input mapping */); - UnionCoder outputUnionCoder = createUnionCoder(outputs); - - CoderTypeInformation outputUnionTypeInformation = - new CoderTypeInformation<>(outputUnionCoder); - - unionOutputStream = inputDataStream - .transform(transformName, outputUnionTypeInformation, doFnOperator); + outputStream = inputDataStream + .transform(transformName, outputTypeInformation, doFnOperator); } else { Tuple2>, DataStream> transformedSideInputs = transformSideInputs(sideInputs, context); - DoFnOperator doFnOperator = + DoFnOperator doFnOperator = doFnOperatorFactory.createDoFnOperator( doFn, context.getCurrentTransform().getFullName(), @@ -427,16 +425,11 @@ class FlinkStreamingTransformTranslators { additionalOutputTags, context, windowingStrategy, - tagsToLabels, + tagsToOutputTags, inputCoder, keyCoder, transformedSideInputs.f0); - UnionCoder outputUnionCoder = createUnionCoder(outputs); - - CoderTypeInformation outputUnionTypeInformation = - new CoderTypeInformation<>(outputUnionCoder); - if (stateful) { // we have to manually contruct the two-input transform because we're not // allowed to have only one input keyed, normally. @@ -448,83 +441,35 @@ class FlinkStreamingTransformTranslators { keyedStream.getTransformation(), transformedSideInputs.f1.broadcast().getTransformation(), transformName, - (TwoInputStreamOperator) doFnOperator, - outputUnionTypeInformation, + doFnOperator, + outputTypeInformation, keyedStream.getParallelism()); rawFlinkTransform.setStateKeyType(keyedStream.getKeyType()); rawFlinkTransform.setStateKeySelectors(keyedStream.getKeySelector(), null); - unionOutputStream = new SingleOutputStreamOperator( - keyedStream.getExecutionEnvironment(), - rawFlinkTransform) {}; // we have to cheat around the ctor being protected + outputStream = new SingleOutputStreamOperator( + keyedStream.getExecutionEnvironment(), + rawFlinkTransform) { + }; // we have to cheat around the ctor being protected keyedStream.getExecutionEnvironment().addOperator(rawFlinkTransform); } else { - unionOutputStream = inputDataStream + outputStream = inputDataStream .connect(transformedSideInputs.f1.broadcast()) - .transform(transformName, outputUnionTypeInformation, doFnOperator); + .transform(transformName, outputTypeInformation, doFnOperator); } } - SplitStream splitStream = unionOutputStream - .split(new OutputSelector() { - @Override - public Iterable select(RawUnionValue value) { - return Collections.singletonList(Integer.toString(value.getUnionTag())); - } - }); - - for (Entry, PValue> output : outputs.entrySet()) { - final int outputTag = tagsToLabels.get(output.getKey()); - - TypeInformation outputTypeInfo = context.getTypeInfo((PCollection) output.getValue()); - - @SuppressWarnings("unchecked") - DataStream unwrapped = splitStream.select(String.valueOf(outputTag)) - .flatMap(new FlatMapFunction() { - @Override - public void flatMap(RawUnionValue value, Collector out) throws Exception { - out.collect(value.getValue()); - } - }).returns(outputTypeInfo); - - context.setOutputDataStream(output.getValue(), unwrapped); - } - } - - private static Map, Integer> transformTupleTagsToLabels( - TupleTag mainTag, - Map, PValue> allTaggedValues) { + context.setOutputDataStream(outputs.get(mainOutputTag), outputStream); - Map, Integer> tagToLabelMap = Maps.newHashMap(); - int count = 0; - tagToLabelMap.put(mainTag, count++); - for (TupleTag key : allTaggedValues.keySet()) { - if (!tagToLabelMap.containsKey(key)) { - tagToLabelMap.put(key, count++); + for (Map.Entry, PValue> entry : outputs.entrySet()) { + if (!entry.getKey().equals(mainOutputTag)) { + context.setOutputDataStream(entry.getValue(), + outputStream.getSideOutput(tagsToOutputTags.get(entry.getKey()))); } } - return tagToLabelMap; - } - - private static UnionCoder createUnionCoder(Map, PValue> taggedCollections) { - List> outputCoders = Lists.newArrayList(); - for (PValue taggedColl : taggedCollections.values()) { - checkArgument( - taggedColl instanceof PCollection, - "A Union Coder can only be created for a Collection of Tagged %s. Got %s", - PCollection.class.getSimpleName(), - taggedColl.getClass().getSimpleName()); - PCollection coll = (PCollection) taggedColl; - WindowedValue.FullWindowedValueCoder windowedValueCoder = - WindowedValue.getFullCoder( - coll.getCoder(), - coll.getWindowingStrategy().getWindowFn().windowCoder()); - outputCoders.add(windowedValueCoder); - } - return UnionCoder.of(outputCoders); } } @@ -540,7 +485,6 @@ class FlinkStreamingTransformTranslators { ParDoTranslationHelper.translateParDo( transform.getName(), transform.getFn(), - context.getCurrentTransform().getFullName(), (PCollection) context.getInput(transform), transform.getSideInputs(), context.getOutputs(transform), @@ -549,7 +493,7 @@ class FlinkStreamingTransformTranslators { context, new ParDoTranslationHelper.DoFnOperatorFactory() { @Override - public DoFnOperator createDoFnOperator( + public DoFnOperator createDoFnOperator( DoFn doFn, String stepName, List> sideInputs, @@ -557,7 +501,7 @@ class FlinkStreamingTransformTranslators { List> additionalOutputTags, FlinkStreamingTranslationContext context, WindowingStrategy windowingStrategy, - Map, Integer> tagsToLabels, + Map, OutputTag>> tagsToOutputTags, Coder> inputCoder, Coder keyCoder, Map> transformedSideInputs) { @@ -567,7 +511,7 @@ class FlinkStreamingTransformTranslators { inputCoder, mainOutputTag, additionalOutputTags, - new DoFnOperator.MultiOutputOutputManagerFactory(tagsToLabels), + new DoFnOperator.MultiOutputOutputManagerFactory(mainOutputTag, tagsToOutputTags), windowingStrategy, transformedSideInputs, sideInputs, @@ -592,7 +536,6 @@ class FlinkStreamingTransformTranslators { ParDoTranslationHelper.translateParDo( transform.getName(), transform.newProcessFn(transform.getFn()), - context.getCurrentTransform().getFullName(), context.getInput(transform), transform.getSideInputs(), context.getOutputs(transform), @@ -604,8 +547,7 @@ class FlinkStreamingTransformTranslators { @Override public DoFnOperator< KeyedWorkItem>, - OutputT, - RawUnionValue> createDoFnOperator( + OutputT, OutputT> createDoFnOperator( DoFn< KeyedWorkItem>, OutputT> doFn, @@ -615,7 +557,7 @@ class FlinkStreamingTransformTranslators { List> additionalOutputTags, FlinkStreamingTranslationContext context, WindowingStrategy windowingStrategy, - Map, Integer> tagsToLabels, + Map, OutputTag>> tagsToOutputTags, Coder< WindowedValue< KeyedWorkItem< @@ -629,7 +571,7 @@ class FlinkStreamingTransformTranslators { inputCoder, mainOutputTag, additionalOutputTags, - new DoFnOperator.MultiOutputOutputManagerFactory(tagsToLabels), + new DoFnOperator.MultiOutputOutputManagerFactory(mainOutputTag, tagsToOutputTags), windowingStrategy, transformedSideInputs, sideInputs, @@ -756,8 +698,7 @@ class FlinkStreamingTransformTranslators { TypeInformation>>> outputTypeInfo = context.getTypeInfo(context.getOutput(transform)); - DoFnOperator.DefaultOutputManagerFactory< - WindowedValue>>> outputManagerFactory = + DoFnOperator.DefaultOutputManagerFactory>> outputManagerFactory = new DoFnOperator.DefaultOutputManagerFactory<>(); WindowDoFnOperator> doFnOperator = @@ -868,7 +809,7 @@ class FlinkStreamingTransformTranslators { (Coder) windowedWorkItemCoder, new TupleTag>("main output"), Collections.>emptyList(), - new DoFnOperator.DefaultOutputManagerFactory>>(), + new DoFnOperator.DefaultOutputManagerFactory>(), windowingStrategy, new HashMap>(), /* side-input mapping */ Collections.>emptyList(), /* side inputs */ @@ -894,7 +835,7 @@ class FlinkStreamingTransformTranslators { (Coder) windowedWorkItemCoder, new TupleTag>("main output"), Collections.>emptyList(), - new DoFnOperator.DefaultOutputManagerFactory>>(), + new DoFnOperator.DefaultOutputManagerFactory>(), windowingStrategy, transformSideInputs.f0, sideInputs, http://git-wip-us.apache.org/repos/asf/beam/blob/b0601fd4/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java index 594fe0e..8c27ed9 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java @@ -87,6 +87,7 @@ import org.apache.flink.streaming.api.operators.Triggerable; import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.OutputTag; import org.joda.time.Instant; /** @@ -98,9 +99,9 @@ import org.joda.time.Instant; * type when we have additional tagged outputs */ public class DoFnOperator - extends AbstractStreamOperator - implements OneInputStreamOperator, OutputT>, - TwoInputStreamOperator, RawUnionValue, OutputT>, + extends AbstractStreamOperator> + implements OneInputStreamOperator, WindowedValue>, + TwoInputStreamOperator, RawUnionValue, WindowedValue>, KeyGroupCheckpointedOperator, Triggerable { protected DoFn doFn; @@ -662,7 +663,7 @@ public class DoFnOperator * a Flink {@link Output}. */ interface OutputManagerFactory extends Serializable { - DoFnRunners.OutputManager create(Output> output); + DoFnRunners.OutputManager create(Output>> output); } /** @@ -673,14 +674,15 @@ public class DoFnOperator public static class DefaultOutputManagerFactory implements OutputManagerFactory { @Override - public DoFnRunners.OutputManager create(final Output> output) { + public DoFnRunners.OutputManager create( + final Output>> output) { return new DoFnRunners.OutputManager() { @Override public void output(TupleTag tag, WindowedValue value) { // with tagged outputs we can't get around this because we don't // know our own output type... @SuppressWarnings("unchecked") - OutputT castValue = (OutputT) value; + WindowedValue castValue = (WindowedValue) value; output.collect(new StreamRecord<>(castValue)); } }; @@ -692,22 +694,34 @@ public class DoFnOperator * {@link DoFnRunners.OutputManager} that can write to multiple logical * outputs by unioning them in a {@link RawUnionValue}. */ - public static class MultiOutputOutputManagerFactory - implements OutputManagerFactory { + public static class MultiOutputOutputManagerFactory + implements OutputManagerFactory { - Map, Integer> mapping; + private TupleTag mainTag; + Map, OutputTag>> mapping; - public MultiOutputOutputManagerFactory(Map, Integer> mapping) { + public MultiOutputOutputManagerFactory( + TupleTag mainTag, + Map, OutputTag>> mapping) { + this.mainTag = mainTag; this.mapping = mapping; } @Override - public DoFnRunners.OutputManager create(final Output> output) { + public DoFnRunners.OutputManager create( + final Output>> output) { return new DoFnRunners.OutputManager() { @Override public void output(TupleTag tag, WindowedValue value) { - int intTag = mapping.get(tag); - output.collect(new StreamRecord<>(new RawUnionValue(intTag, value))); + if (tag.equals(mainTag)) { + @SuppressWarnings("unchecked") + WindowedValue outputValue = (WindowedValue) value; + output.collect(new StreamRecord<>(outputValue)); + } else { + @SuppressWarnings("unchecked") + OutputTag> outputTag = (OutputTag) mapping.get(tag); + output.>collect(outputTag, new StreamRecord<>(value)); + } } }; } http://git-wip-us.apache.org/repos/asf/beam/blob/b0601fd4/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java index bf64ede..ea578b9 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java @@ -46,7 +46,7 @@ import org.apache.flink.streaming.api.operators.InternalTimer; * Flink operator for executing window {@link DoFn DoFns}. */ public class WindowDoFnOperator - extends DoFnOperator, KV, WindowedValue>> { + extends DoFnOperator, KV, KV> { private final SystemReduceFn systemReduceFn; @@ -56,7 +56,7 @@ public class WindowDoFnOperator Coder>> inputCoder, TupleTag> mainOutputTag, List> additionalOutputTags, - OutputManagerFactory>> outputManagerFactory, + OutputManagerFactory> outputManagerFactory, WindowingStrategy windowingStrategy, Map> sideInputTagMapping, Collection> sideInputs, http://git-wip-us.apache.org/repos/asf/beam/blob/b0601fd4/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java index 8382a2d..bc0b1c2 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java @@ -173,13 +173,12 @@ public class PipelineOptionsTest { final byte[] serialized = SerializationUtils.serialize(doFnOperator); @SuppressWarnings("unchecked") - DoFnOperator deserialized = - (DoFnOperator) SerializationUtils.deserialize(serialized); + DoFnOperator deserialized = SerializationUtils.deserialize(serialized); TypeInformation> typeInformation = TypeInformation.of( new TypeHint>() {}); - OneInputStreamOperatorTestHarness, Object> testHarness = + OneInputStreamOperatorTestHarness, WindowedValue> testHarness = new OneInputStreamOperatorTestHarness<>(deserialized, typeInformation.createSerializer(new ExecutionConfig())); http://git-wip-us.apache.org/repos/asf/beam/blob/b0601fd4/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java index 79bc0e0..132242e 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java @@ -65,6 +65,7 @@ import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness; +import org.apache.flink.util.OutputTag; import org.joda.time.Duration; import org.joda.time.Instant; import org.junit.Test; @@ -123,7 +124,7 @@ public class DoFnOperatorTest { PipelineOptionsFactory.as(FlinkPipelineOptions.class), null); - OneInputStreamOperatorTestHarness, String> testHarness = + OneInputStreamOperatorTestHarness, WindowedValue> testHarness = new OneInputStreamOperatorTestHarness<>(doFnOperator); testHarness.open(); @@ -147,26 +148,27 @@ public class DoFnOperatorTest { TupleTag mainOutput = new TupleTag<>("main-output"); TupleTag additionalOutput1 = new TupleTag<>("output-1"); TupleTag additionalOutput2 = new TupleTag<>("output-2"); - ImmutableMap, Integer> outputMapping = ImmutableMap., Integer>builder() - .put(mainOutput, 1) - .put(additionalOutput1, 2) - .put(additionalOutput2, 3) - .build(); + ImmutableMap, OutputTag> outputMapping = + ImmutableMap., OutputTag>builder() + .put(mainOutput, new OutputTag(mainOutput.getId()){}) + .put(additionalOutput1, new OutputTag(additionalOutput1.getId()){}) + .put(additionalOutput2, new OutputTag(additionalOutput2.getId()){}) + .build(); - DoFnOperator doFnOperator = new DoFnOperator<>( + DoFnOperator doFnOperator = new DoFnOperator<>( new MultiOutputDoFn(additionalOutput1, additionalOutput2), "stepName", windowedValueCoder, mainOutput, ImmutableList.>of(additionalOutput1, additionalOutput2), - new DoFnOperator.MultiOutputOutputManagerFactory(outputMapping), + new DoFnOperator.MultiOutputOutputManagerFactory(mainOutput, outputMapping), WindowingStrategy.globalDefault(), new HashMap>(), /* side-input mapping */ Collections.>emptyList(), /* side inputs */ PipelineOptionsFactory.as(FlinkPipelineOptions.class), null); - OneInputStreamOperatorTestHarness, RawUnionValue> testHarness = + OneInputStreamOperatorTestHarness, WindowedValue> testHarness = new OneInputStreamOperatorTestHarness<>(doFnOperator); testHarness.open(); @@ -176,17 +178,26 @@ public class DoFnOperatorTest { testHarness.processElement(new StreamRecord<>(WindowedValue.valueInGlobalWindow("hello"))); assertThat( - this.stripStreamRecordFromRawUnion(testHarness.getOutput()), + this.stripStreamRecord(testHarness.getOutput()), + contains( + WindowedValue.valueInGlobalWindow("got: hello"))); + + assertThat( + this.stripStreamRecord(testHarness.getSideOutput(outputMapping.get(additionalOutput1))), contains( - new RawUnionValue(2, WindowedValue.valueInGlobalWindow("extra: one")), - new RawUnionValue(3, WindowedValue.valueInGlobalWindow("extra: two")), - new RawUnionValue(1, WindowedValue.valueInGlobalWindow("got: hello")), - new RawUnionValue(2, WindowedValue.valueInGlobalWindow("got: hello")), - new RawUnionValue(3, WindowedValue.valueInGlobalWindow("got: hello")))); + WindowedValue.valueInGlobalWindow("extra: one"), + WindowedValue.valueInGlobalWindow("got: hello"))); + + assertThat( + this.stripStreamRecord(testHarness.getSideOutput(outputMapping.get(additionalOutput2))), + contains( + WindowedValue.valueInGlobalWindow("extra: two"), + WindowedValue.valueInGlobalWindow("got: hello"))); testHarness.close(); } + @Test public void testLateDroppingForStatefulFn() throws Exception { @@ -212,13 +223,13 @@ public class DoFnOperatorTest { TupleTag outputTag = new TupleTag<>("main-output"); - DoFnOperator> doFnOperator = new DoFnOperator<>( + DoFnOperator doFnOperator = new DoFnOperator<>( fn, "stepName", windowedValueCoder, outputTag, Collections.>emptyList(), - new DoFnOperator.DefaultOutputManagerFactory>(), + new DoFnOperator.DefaultOutputManagerFactory(), windowingStrategy, new HashMap>(), /* side-input mapping */ Collections.>emptyList(), /* side inputs */ @@ -325,14 +336,14 @@ public class DoFnOperatorTest { TupleTag> outputTag = new TupleTag<>("main-output"); DoFnOperator< - KV, KV, WindowedValue>> doFnOperator = + KV, KV, KV> doFnOperator = new DoFnOperator<>( fn, "stepName", windowedValueCoder, outputTag, Collections.>emptyList(), - new DoFnOperator.DefaultOutputManagerFactory>>(), + new DoFnOperator.DefaultOutputManagerFactory>(), windowingStrategy, new HashMap>(), /* side-input mapping */ Collections.>emptyList(), /* side inputs */ @@ -435,8 +446,8 @@ public class DoFnOperatorTest { PipelineOptionsFactory.as(FlinkPipelineOptions.class), keyCoder); - TwoInputStreamOperatorTestHarness, RawUnionValue, String> testHarness = - new TwoInputStreamOperatorTestHarness<>(doFnOperator); + TwoInputStreamOperatorTestHarness, RawUnionValue, WindowedValue> + testHarness = new TwoInputStreamOperatorTestHarness<>(doFnOperator); if (keyed) { // we use a dummy key for the second input since it is considered to be broadcast @@ -527,19 +538,19 @@ public class DoFnOperatorTest { }); } - private Iterable stripStreamRecordFromRawUnion(Iterable input) { + private Iterable> stripStreamRecord(Iterable input) { return FluentIterable.from(input).filter(new Predicate() { @Override public boolean apply(@Nullable Object o) { - return o instanceof StreamRecord && ((StreamRecord) o).getValue() instanceof RawUnionValue; + return o instanceof StreamRecord; } - }).transform(new Function() { + }).transform(new Function>() { @Nullable @Override @SuppressWarnings({"unchecked", "rawtypes"}) - public RawUnionValue apply(@Nullable Object o) { - if (o instanceof StreamRecord && ((StreamRecord) o).getValue() instanceof RawUnionValue) { - return (RawUnionValue) ((StreamRecord) o).getValue(); + public WindowedValue apply(@Nullable Object o) { + if (o instanceof StreamRecord) { + return (WindowedValue) ((StreamRecord) o).getValue(); } throw new RuntimeException("unreachable"); }