Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 4C409200C73 for ; Tue, 25 Apr 2017 19:30:00 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 4B278160BC3; Tue, 25 Apr 2017 17:30:00 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 99A3C160BBB for ; Tue, 25 Apr 2017 19:29:57 +0200 (CEST) Received: (qmail 78702 invoked by uid 500); 25 Apr 2017 17:29:56 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 78536 invoked by uid 99); 25 Apr 2017 17:29:56 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 25 Apr 2017 17:29:56 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 51B74DFC31; Tue, 25 Apr 2017 17:29:56 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: kenn@apache.org To: commits@beam.apache.org Date: Tue, 25 Apr 2017 17:30:01 -0000 Message-Id: <41eea1fcacbf400aa6043a2d96d44eae@git.apache.org> In-Reply-To: <2a7781abd4eb43c7936fde76997d1479@git.apache.org> References: <2a7781abd4eb43c7936fde76997d1479@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [06/50] [abbrv] beam git commit: Rename DoFn.Context#sideOutput to output archived-at: Tue, 25 Apr 2017 17:30:00 -0000 Rename DoFn.Context#sideOutput to output Having two methods, both named output, one which takes the "main output type" and one that takes a tag to specify the type more clearly communicates the actual behavior - sideOutput isn't a "special" way to output, it's the same as output(T), just to a specified PCollection. This will help pipeline authors understand the actual behavior of outputting to a tag, and detangle it from "sideInput", which is a special way to receive input. Giving them the same name means that it's not even strange to call output and provide the main output type, which is what we want - it's a more specific way to output, but does not have different restrictions or capabilities. Rename internal references to SideOutput, SideOutputT, etc to (largely) AdditionalOutput(T). Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/113471d6 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/113471d6 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/113471d6 Branch: refs/heads/jstorm-runner Commit: 113471d6457b4afa2523afc74b40be09935292d0 Parents: 89ff0b1 Author: Thomas Groh Authored: Mon Apr 10 17:14:15 2017 -0700 Committer: Thomas Groh Committed: Fri Apr 14 14:40:57 2017 -0700 ---------------------------------------------------------------------- .../apex/translation/ParDoTranslator.java | 6 +- .../operators/ApexGroupByKeyOperator.java | 19 +- .../operators/ApexParDoOperator.java | 48 +-- .../apex/translation/utils/NoOpStepContext.java | 2 +- .../apex/translation/ParDoTranslatorTest.java | 18 +- .../beam/runners/core/BaseExecutionContext.java | 13 +- .../apache/beam/runners/core/DoFnAdapters.java | 16 +- .../apache/beam/runners/core/DoFnRunners.java | 8 +- .../beam/runners/core/ExecutionContext.java | 13 +- .../GroupAlsoByWindowViaWindowSetNewDoFn.java | 6 +- .../org/apache/beam/runners/core/OldDoFn.java | 38 +-- ...eBoundedSplittableProcessElementInvoker.java | 8 +- .../beam/runners/core/OutputWindowedValue.java | 10 +- .../beam/runners/core/SimpleDoFnRunner.java | 54 ++-- .../beam/runners/core/SimpleOldDoFnRunner.java | 63 ++-- .../beam/runners/core/SplittableParDo.java | 21 +- .../beam/runners/core/WindowingInternals.java | 8 +- .../core/WindowingInternalsAdapters.java | 8 +- .../core/GroupAlsoByWindowsProperties.java | 10 +- .../apache/beam/runners/core/NoOpOldDoFn.java | 4 +- .../apache/beam/runners/core/OldDoFnTest.java | 4 +- ...ndedSplittableProcessElementInvokerTest.java | 6 +- .../beam/runners/core/ReduceFnTester.java | 8 +- .../runners/core/SimpleOldDoFnRunnerTest.java | 4 +- .../beam/runners/core/SplittableParDoTest.java | 8 +- .../GroupAlsoByWindowEvaluatorFactory.java | 8 +- .../beam/runners/direct/ParDoEvaluator.java | 4 +- .../runners/direct/ParDoEvaluatorFactory.java | 10 +- .../direct/ParDoMultiOverrideFactory.java | 2 +- ...littableProcessElementsEvaluatorFactory.java | 8 +- .../direct/StatefulParDoEvaluatorFactory.java | 2 +- .../beam/runners/direct/ParDoEvaluatorTest.java | 6 +- .../FlinkStreamingTransformTranslators.java | 20 +- .../functions/FlinkDoFnFunction.java | 4 +- .../functions/FlinkNoOpStepContext.java | 2 +- .../functions/FlinkStatefulDoFnFunction.java | 4 +- .../wrappers/streaming/DoFnOperator.java | 14 +- .../streaming/SplittableDoFnOperator.java | 10 +- .../wrappers/streaming/WindowDoFnOperator.java | 4 +- .../flink/streaming/DoFnOperatorTest.java | 34 +-- .../dataflow/BatchStatefulParDoOverrides.java | 2 +- .../runners/dataflow/BatchViewOverrides.java | 6 +- .../dataflow/BatchViewOverridesTest.java | 4 +- .../SparkGroupAlsoByWindowViaWindowSet.java | 10 +- .../spark/translation/MultiDoFnFunction.java | 4 +- ...SparkGroupAlsoByWindowViaOutputBufferFn.java | 8 +- .../spark/translation/SparkProcessContext.java | 2 +- .../streaming/StreamingTransformTranslator.java | 2 +- .../org/apache/beam/sdk/transforms/Combine.java | 2 +- .../org/apache/beam/sdk/transforms/DoFn.java | 14 +- .../apache/beam/sdk/transforms/DoFnTester.java | 41 ++- .../org/apache/beam/sdk/transforms/ParDo.java | 77 +++-- .../apache/beam/sdk/transforms/Partition.java | 2 +- .../beam/sdk/values/PCollectionTuple.java | 3 +- .../org/apache/beam/sdk/values/TupleTag.java | 26 +- .../apache/beam/sdk/values/TupleTagList.java | 2 +- .../org/apache/beam/sdk/values/TypedPValue.java | 4 +- .../apache/beam/sdk/metrics/MetricsTest.java | 2 +- .../apache/beam/sdk/transforms/ParDoTest.java | 293 ++++++++++--------- .../beam/sdk/transforms/SplittableDoFnTest.java | 21 +- .../beam/sdk/values/PCollectionTupleTest.java | 8 +- .../apache/beam/sdk/values/TypedPValueTest.java | 46 +-- .../beam/fn/harness/fake/FakeStepContext.java | 2 +- .../control/ProcessBundleHandlerTest.java | 30 +- .../sdk/io/gcp/bigquery/WritePartition.java | 6 +- .../sdk/io/gcp/bigquery/BigQueryIOTest.java | 4 +- 66 files changed, 578 insertions(+), 578 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java index 9213c1f..2e3d902 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java @@ -94,7 +94,7 @@ class ParDoTranslator context.getPipelineOptions(), doFn, transform.getMainOutputTag(), - transform.getSideOutputTags().getAll(), + transform.getAdditionalOutputTags().getAll(), input.getWindowingStrategy(), sideInputs, wvInputCoder, @@ -114,9 +114,9 @@ class ParDoTranslator ports.put(pc, operator.output); } else { int portIndex = 0; - for (TupleTag tag : transform.getSideOutputTags().getAll()) { + for (TupleTag tag : transform.getAdditionalOutputTags().getAll()) { if (tag.equals(output.getKey())) { - ports.put(pc, operator.sideOutputPorts[portIndex]); + ports.put(pc, operator.additionalOutputPorts[portIndex]); break; } portIndex++; http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java index 230082e..1697921 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java @@ -353,13 +353,14 @@ public class ApexGroupByKeyOperator implements Operator { } @Override - public void sideOutputWindowedValue( - TupleTag tag, - SideOutputT output, + public void outputWindowedValue( + TupleTag tag, + AdditionalOutputT output, Instant timestamp, Collection windows, PaneInfo pane) { - throw new UnsupportedOperationException("GroupAlsoByWindow should not use side outputs"); + throw new UnsupportedOperationException( + "GroupAlsoByWindow should not use tagged outputs"); } @Override @@ -390,15 +391,13 @@ public class ApexGroupByKeyOperator implements Operator { } @Override - public void sideOutput(TupleTag tag, T output) { - // ignore the side output, this can happen when a user does not register - // side outputs but then outputs using a freshly created TupleTag. - throw new RuntimeException("sideOutput() is not available when grouping by window."); + public void output(TupleTag tag, T output) { + throw new RuntimeException("output() is not available when grouping by window."); } @Override - public void sideOutputWithTimestamp(TupleTag tag, T output, Instant timestamp) { - sideOutput(tag, output); + public void outputWithTimestamp(TupleTag tag, T output, Instant timestamp) { + output(tag, output); } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java index 1fc91c8..bad5be2 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java @@ -88,7 +88,7 @@ public class ApexParDoOperator extends BaseOperator implements @Bind(JavaSerializer.class) private final TupleTag mainOutputTag; @Bind(JavaSerializer.class) - private final List> sideOutputTags; + private final List> additionalOutputTags; @Bind(JavaSerializer.class) private final WindowingStrategy windowingStrategy; @Bind(JavaSerializer.class) @@ -108,15 +108,15 @@ public class ApexParDoOperator extends BaseOperator implements private transient PushbackSideInputDoFnRunner pushbackDoFnRunner; private transient SideInputHandler sideInputHandler; - private transient Map, DefaultOutputPort>> sideOutputPortMapping = - Maps.newHashMapWithExpectedSize(5); + private transient Map, DefaultOutputPort>> + additionalOutputPortMapping = Maps.newHashMapWithExpectedSize(5); private transient DoFnInvoker doFnInvoker; public ApexParDoOperator( ApexPipelineOptions pipelineOptions, DoFn doFn, TupleTag mainOutputTag, - List> sideOutputTags, + List> additionalOutputTags, WindowingStrategy windowingStrategy, List> sideInputs, Coder> inputCoder, @@ -125,15 +125,15 @@ public class ApexParDoOperator extends BaseOperator implements this.pipelineOptions = new SerializablePipelineOptions(pipelineOptions); this.doFn = doFn; this.mainOutputTag = mainOutputTag; - this.sideOutputTags = sideOutputTags; + this.additionalOutputTags = additionalOutputTags; this.windowingStrategy = windowingStrategy; this.sideInputs = sideInputs; this.sideInputStateInternals = new StateInternalsProxy<>( stateBackend.newStateInternalsFactory(VoidCoder.of())); - if (sideOutputTags.size() > sideOutputPorts.length) { - String msg = String.format("Too many side outputs (currently only supporting %s).", - sideOutputPorts.length); + if (additionalOutputTags.size() > additionalOutputPorts.length) { + String msg = String.format("Too many additional outputs (currently only supporting %s).", + additionalOutputPorts.length); throw new UnsupportedOperationException(msg); } @@ -148,7 +148,7 @@ public class ApexParDoOperator extends BaseOperator implements this.pipelineOptions = null; this.doFn = null; this.mainOutputTag = null; - this.sideOutputTags = null; + this.additionalOutputTags = null; this.windowingStrategy = null; this.sideInputs = null; this.pushedBack = null; @@ -218,29 +218,31 @@ public class ApexParDoOperator extends BaseOperator implements public final transient DefaultOutputPort> output = new DefaultOutputPort<>(); @OutputPortFieldAnnotation(optional = true) - public final transient DefaultOutputPort> sideOutput1 = + public final transient DefaultOutputPort> additionalOutput1 = new DefaultOutputPort<>(); @OutputPortFieldAnnotation(optional = true) - public final transient DefaultOutputPort> sideOutput2 = + public final transient DefaultOutputPort> additionalOutput2 = new DefaultOutputPort<>(); @OutputPortFieldAnnotation(optional = true) - public final transient DefaultOutputPort> sideOutput3 = + public final transient DefaultOutputPort> additionalOutput3 = new DefaultOutputPort<>(); @OutputPortFieldAnnotation(optional = true) - public final transient DefaultOutputPort> sideOutput4 = + public final transient DefaultOutputPort> additionalOutput4 = new DefaultOutputPort<>(); @OutputPortFieldAnnotation(optional = true) - public final transient DefaultOutputPort> sideOutput5 = + public final transient DefaultOutputPort> additionalOutput5 = new DefaultOutputPort<>(); - public final transient DefaultOutputPort[] sideOutputPorts = {sideOutput1, sideOutput2, - sideOutput3, sideOutput4, sideOutput5}; + public final transient DefaultOutputPort[] additionalOutputPorts = { + additionalOutput1, additionalOutput2, additionalOutput3, additionalOutput4, additionalOutput5 + }; @Override public void output(TupleTag tag, WindowedValue tuple) { - DefaultOutputPort> sideOutputPort = sideOutputPortMapping.get(tag); - if (sideOutputPort != null) { - sideOutputPort.emit(ApexStreamTuple.DataTuple.of(tuple)); + DefaultOutputPort> additionalOutputPort = + additionalOutputPortMapping.get(tag); + if (additionalOutputPort != null) { + additionalOutputPort.emit(ApexStreamTuple.DataTuple.of(tuple)); } else { output.emit(ApexStreamTuple.DataTuple.of(tuple)); } @@ -306,11 +308,11 @@ public class ApexParDoOperator extends BaseOperator implements sideInputReader = sideInputHandler; } - for (int i = 0; i < sideOutputTags.size(); i++) { + for (int i = 0; i < additionalOutputTags.size(); i++) { @SuppressWarnings("unchecked") DefaultOutputPort> port = (DefaultOutputPort>) - sideOutputPorts[i]; - sideOutputPortMapping.put(sideOutputTags.get(i), port); + additionalOutputPorts[i]; + additionalOutputPortMapping.put(additionalOutputTags.get(i), port); } NoOpStepContext stepContext = new NoOpStepContext() { @@ -332,7 +334,7 @@ public class ApexParDoOperator extends BaseOperator implements sideInputReader, this, mainOutputTag, - sideOutputTags, + additionalOutputTags, stepContext, new NoOpAggregatorFactory(), windowingStrategy http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java index ad4de97..cc64c7c 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java @@ -48,7 +48,7 @@ public class NoOpStepContext implements ExecutionContext.StepContext, Serializab } @Override - public void noteSideOutput(TupleTag tag, WindowedValue output) { + public void noteOutput(TupleTag tag, WindowedValue output) { } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoTranslatorTest.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoTranslatorTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoTranslatorTest.java index 2760d06..1a5c8be 100644 --- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoTranslatorTest.java +++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoTranslatorTest.java @@ -267,7 +267,7 @@ public class ParDoTranslatorTest { List inputs = Arrays.asList(3, -42, 666); final TupleTag mainOutputTag = new TupleTag<>("main"); - final TupleTag sideOutputTag = new TupleTag<>("sideOutput"); + final TupleTag additionalOutputTag = new TupleTag<>("output"); PCollectionView sideInput1 = pipeline .apply("CreateSideInput1", Create.of(11)) @@ -288,10 +288,10 @@ public class ParDoTranslatorTest { .withSideInputs(sideInput1) .withSideInputs(sideInputUnread) .withSideInputs(sideInput2) - .withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag))); + .withOutputTags(mainOutputTag, TupleTagList.of(additionalOutputTag))); outputs.get(mainOutputTag).apply(ParDo.of(new EmbeddedCollector())); - outputs.get(sideOutputTag).setCoder(VoidCoder.of()); + outputs.get(additionalOutputTag).setCoder(VoidCoder.of()); ApexRunnerResult result = (ApexRunnerResult) pipeline.run(); HashSet expected = Sets.newHashSet("processing: 3: [11, 222]", @@ -312,12 +312,12 @@ public class ParDoTranslatorTest { private static final long serialVersionUID = 1L; final List> sideInputViews = new ArrayList<>(); - final List> sideOutputTupleTags = new ArrayList<>(); + final List> additionalOutputTupleTags = new ArrayList<>(); public TestMultiOutputWithSideInputsFn(List> sideInputViews, - List> sideOutputTupleTags) { + List> additionalOutputTupleTags) { this.sideInputViews.addAll(sideInputViews); - this.sideOutputTupleTags.addAll(sideOutputTupleTags); + this.additionalOutputTupleTags.addAll(additionalOutputTupleTags); } @ProcessElement @@ -334,9 +334,9 @@ public class ParDoTranslatorTest { value += ": " + sideInputValues; } c.output(value); - for (TupleTag sideOutputTupleTag : sideOutputTupleTags) { - c.sideOutput(sideOutputTupleTag, - sideOutputTupleTag.getId() + ": " + value); + for (TupleTag additionalOutputTupleTag : additionalOutputTupleTags) { + c.output(additionalOutputTupleTag, + additionalOutputTupleTag.getId() + ": " + value); } } http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java index 0f23fea..cc7b574 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java @@ -23,6 +23,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.transforms.DoFn.Context; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.TupleTag; @@ -106,19 +107,17 @@ public abstract class BaseExecutionContext output) {} /** * Hook for subclasses to implement that will be called whenever - * {@code DoFn.Context#sideOutput} - * is called. + * {@link Context#output(TupleTag, Object)} is called. */ @Override - public void noteSideOutput(TupleTag tag, WindowedValue output) {} + public void noteOutput(TupleTag tag, WindowedValue output) {} /** * Base class for implementations of {@link ExecutionContext.StepContext}. @@ -153,8 +152,8 @@ public abstract class BaseExecutionContext tag, WindowedValue output) { - executionContext.noteSideOutput(tag, output); + public void noteOutput(TupleTag tag, WindowedValue output) { + executionContext.noteOutput(tag, output); } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java index deb3b7e..66ad736 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java @@ -162,13 +162,13 @@ public class DoFnAdapters { } @Override - public void sideOutput(TupleTag tag, T output) { - context.sideOutput(tag, output); + public void output(TupleTag tag, T output) { + context.output(tag, output); } @Override - public void sideOutputWithTimestamp(TupleTag tag, T output, Instant timestamp) { - context.sideOutputWithTimestamp(tag, output, timestamp); + public void outputWithTimestamp(TupleTag tag, T output, Instant timestamp) { + context.outputWithTimestamp(tag, output, timestamp); } @Override @@ -255,13 +255,13 @@ public class DoFnAdapters { } @Override - public void sideOutput(TupleTag tag, T output) { - context.sideOutput(tag, output); + public void output(TupleTag tag, T output) { + context.output(tag, output); } @Override - public void sideOutputWithTimestamp(TupleTag tag, T output, Instant timestamp) { - context.sideOutputWithTimestamp(tag, output, timestamp); + public void outputWithTimestamp(TupleTag tag, T output, Instant timestamp) { + context.outputWithTimestamp(tag, output, timestamp); } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java index a1b7c8b..b09ee08 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java @@ -59,7 +59,7 @@ public class DoFnRunners { SideInputReader sideInputReader, OutputManager outputManager, TupleTag mainOutputTag, - List> sideOutputTags, + List> additionalOutputTags, StepContext stepContext, AggregatorFactory aggregatorFactory, WindowingStrategy windowingStrategy) { @@ -69,7 +69,7 @@ public class DoFnRunners { sideInputReader, outputManager, mainOutputTag, - sideOutputTags, + additionalOutputTags, stepContext, aggregatorFactory, windowingStrategy); @@ -86,7 +86,7 @@ public class DoFnRunners { SideInputReader sideInputReader, OutputManager outputManager, TupleTag mainOutputTag, - List> sideOutputTags, + List> additionalOutputTags, StepContext stepContext, AggregatorFactory aggregatorFactory, WindowingStrategy windowingStrategy) { @@ -96,7 +96,7 @@ public class DoFnRunners { sideInputReader, outputManager, mainOutputTag, - sideOutputTags, + additionalOutputTags, stepContext, aggregatorFactory, windowingStrategy); http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/runners/core-java/src/main/java/org/apache/beam/runners/core/ExecutionContext.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ExecutionContext.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ExecutionContext.java index 40c0798..ecd30c0 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ExecutionContext.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ExecutionContext.java @@ -20,6 +20,7 @@ package org.apache.beam.runners.core; import java.io.IOException; import java.util.Collection; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.transforms.DoFn.Context; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.TupleTag; @@ -41,17 +42,15 @@ public interface ExecutionContext { /** * Hook for subclasses to implement that will be called whenever - * {@link org.apache.beam.sdk.transforms.DoFn.Context#output} - * is called. + * {@link Context#output(TupleTag, Object)} is called. */ void noteOutput(WindowedValue output); /** * Hook for subclasses to implement that will be called whenever - * {@link org.apache.beam.sdk.transforms.DoFn.Context#sideOutput} - * is called. + * {@link Context#output(TupleTag, Object)} is called. */ - void noteSideOutput(TupleTag tag, WindowedValue output); + void noteOutput(TupleTag tag, WindowedValue output); /** * Per-step, per-key context used for retrieving state. @@ -77,10 +76,10 @@ public interface ExecutionContext { /** * Hook for subclasses to implement that will be called whenever - * {@link org.apache.beam.sdk.transforms.DoFn.Context#sideOutput} + * {@link org.apache.beam.sdk.transforms.DoFn.Context#output} * is called. */ - void noteSideOutput(TupleTag tag, WindowedValue output); + void noteOutput(TupleTag tag, WindowedValue output); /** * Writes the given {@code PCollectionView} data to a globally accessible location. http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java index 8fff0e4..0cf6e2d 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java @@ -104,9 +104,9 @@ public class GroupAlsoByWindowViaWindowSetNewDoFn< } @Override - public void sideOutputWindowedValue( - TupleTag tag, - SideOutputT output, + public void outputWindowedValue( + TupleTag tag, + AdditionalOutputT output, Instant timestamp, Collection windows, PaneInfo pane) { http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/runners/core-java/src/main/java/org/apache/beam/runners/core/OldDoFn.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/OldDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/OldDoFn.java index e9d4740..507ee50 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/OldDoFn.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/OldDoFn.java @@ -135,16 +135,15 @@ public abstract class OldDoFn implements Serializable, HasDispl public abstract void outputWithTimestamp(OutputT output, Instant timestamp); /** - * Adds the given element to the side output {@code PCollection} with the + * Adds the given element to the output {@code PCollection} with the * given tag. * - *

Once passed to {@code sideOutput} the element should not be modified + *

Once passed to {@code output} the element should not be modified * in any way. * *

The caller of {@code ParDo} uses {@link ParDo.SingleOutput#withOutputTags withOutputTags} - * to specify the tags of side outputs that it consumes. Non-consumed side - * outputs, e.g., outputs for monitoring purposes only, don't necessarily - * need to be specified. + * to specify the tags of outputs that it consumes. Outputs that are not consumed, e.g., outputs + * for monitoring purposes only, don't necessarily need to be specified. * *

The output element will have the same timestamp and be in the same * windows as the input element passed to {@link OldDoFn#processElement processElement}. @@ -159,32 +158,27 @@ public abstract class OldDoFn implements Serializable, HasDispl * * @see ParDo.SingleOutput#withOutputTags */ - public abstract void sideOutput(TupleTag tag, T output); + public abstract void output(TupleTag tag, T output); /** - * Adds the given element to the specified side output {@code PCollection}, - * with the given timestamp. + * Adds the given element to the specified output {@code PCollection}, with the given timestamp. * - *

Once passed to {@code sideOutputWithTimestamp} the element should not be - * modified in any way. + *

Once passed to {@code outputWithTimestamp} the element should not be modified in any way. * - *

If invoked from {@link OldDoFn#processElement processElement}, the timestamp - * must not be older than the input element's timestamp minus - * {@link OldDoFn#getAllowedTimestampSkew getAllowedTimestampSkew}. The output element will - * be in the same windows as the input element. + *

If invoked from {@link OldDoFn#processElement processElement}, the timestamp must not be + * older than the input element's timestamp minus {@link OldDoFn#getAllowedTimestampSkew + * getAllowedTimestampSkew}. The output element will be in the same windows as the input + * element. * *

If invoked from {@link #startBundle startBundle} or {@link #finishBundle finishBundle}, - * this will attempt to use the - * {@link org.apache.beam.sdk.transforms.windowing.WindowFn} - * of the input {@code PCollection} to determine what windows the element - * should be in, throwing an exception if the {@code WindowFn} attempts - * to access any information about the input element except for the - * timestamp. + * this will attempt to use the {@link org.apache.beam.sdk.transforms.windowing.WindowFn} of the + * input {@code PCollection} to determine what windows the element should be in, throwing an + * exception if the {@code WindowFn} attempts to access any information about the input element + * except for the timestamp. * * @see ParDo.SingleOutput#withOutputTags */ - public abstract void sideOutputWithTimestamp( - TupleTag tag, T output, Instant timestamp); + public abstract void outputWithTimestamp(TupleTag tag, T output, Instant timestamp); /** * Creates an {@link Aggregator} in the {@link OldDoFn} context with the http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java index 27fd0a3..d132af6 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java @@ -254,13 +254,13 @@ public class OutputAndTimeBoundedSplittableProcessElementInvoker< } @Override - public void sideOutput(TupleTag tag, T value) { - sideOutputWithTimestamp(tag, value, element.getTimestamp()); + public void output(TupleTag tag, T value) { + outputWithTimestamp(tag, value, element.getTimestamp()); } @Override - public void sideOutputWithTimestamp(TupleTag tag, T value, Instant timestamp) { - output.sideOutputWindowedValue( + public void outputWithTimestamp(TupleTag tag, T value, Instant timestamp) { + output.outputWindowedValue( tag, value, timestamp, element.getWindows(), element.getPane()); noteOutput(); } http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputWindowedValue.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputWindowedValue.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputWindowedValue.java index 86eeb33..35d6737 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputWindowedValue.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputWindowedValue.java @@ -25,7 +25,7 @@ import org.joda.time.Instant; /** * An object that can output a value with all of its windowing information to the main output or - * a side output. + * any tagged output. */ public interface OutputWindowedValue { /** Outputs a value with windowing information to the main output. */ @@ -35,10 +35,10 @@ public interface OutputWindowedValue { Collection windows, PaneInfo pane); - /** Outputs a value with windowing information to a side output. */ - void sideOutputWindowedValue( - TupleTag tag, - SideOutputT output, + /** Outputs a value with windowing information to a tagged output. */ + void outputWindowedValue( + TupleTag tag, + AdditionalOutputT output, Instant timestamp, Collection windows, PaneInfo pane); http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java index 98d88b6..141bf20 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java @@ -106,7 +106,7 @@ public class SimpleDoFnRunner implements DoFnRunner mainOutputTag, - List> sideOutputTags, + List> additionalOutputTags, StepContext stepContext, AggregatorFactory aggregatorFactory, WindowingStrategy windowingStrategy) { @@ -133,7 +133,7 @@ public class SimpleDoFnRunner implements DoFnRunner implements DoFnRunner mainOutputTag, - List> sideOutputTags, + List> additionalOutputTags, StepContext stepContext, AggregatorFactory aggregatorFactory, WindowFn windowFn) { @@ -270,8 +270,8 @@ public class SimpleDoFnRunner implements DoFnRunner sideOutputTag : sideOutputTags) { - outputTags.add(sideOutputTag); + for (TupleTag additionalOutputTag : additionalOutputTags) { + outputTags.add(additionalOutputTag); } this.stepContext = stepContext; @@ -355,16 +355,16 @@ public class SimpleDoFnRunner implements DoFnRunner void sideOutputWindowedValue( + private void outputWindowedValue( TupleTag tag, T output, Instant timestamp, Collection windows, PaneInfo pane) { - sideOutputWindowedValue(tag, makeWindowedValue(output, timestamp, windows, pane)); + outputWindowedValue(tag, makeWindowedValue(output, timestamp, windows, pane)); } - private void sideOutputWindowedValue(TupleTag tag, WindowedValue windowedElem) { + private void outputWindowedValue(TupleTag tag, WindowedValue windowedElem) { if (!outputTags.contains(tag)) { // This tag wasn't declared nor was it seen before during this execution. // Thus, this must be a new, undeclared and unconsumed output. @@ -372,18 +372,18 @@ public class SimpleDoFnRunner implements DoFnRunner= MAX_SIDE_OUTPUTS) { throw new IllegalArgumentException( - "the number of side outputs has exceeded a limit of " + MAX_SIDE_OUTPUTS); + "the number of outputs has exceeded a limit of " + MAX_SIDE_OUTPUTS); } outputTags.add(tag); } outputManager.output(tag, windowedElem); if (stepContext != null) { - stepContext.noteSideOutput(tag, windowedElem); + stepContext.noteOutput(tag, windowedElem); } } - // Following implementations of output, outputWithTimestamp, and sideOutput + // Following implementations of output, outputWithTimestamp, and output // are only accessible in DoFn.startBundle and DoFn.finishBundle, and will be shadowed by // ProcessContext's versions in DoFn.processElement. @Override @@ -397,15 +397,15 @@ public class SimpleDoFnRunner implements DoFnRunner void sideOutput(TupleTag tag, T output) { - checkNotNull(tag, "TupleTag passed to sideOutput cannot be null"); - sideOutputWindowedValue(tag, output, null, null, PaneInfo.NO_FIRING); + public void output(TupleTag tag, T output) { + checkNotNull(tag, "TupleTag passed to output cannot be null"); + outputWindowedValue(tag, output, null, null, PaneInfo.NO_FIRING); } @Override - public void sideOutputWithTimestamp(TupleTag tag, T output, Instant timestamp) { - checkNotNull(tag, "TupleTag passed to sideOutputWithTimestamp cannot be null"); - sideOutputWindowedValue(tag, output, timestamp, null, PaneInfo.NO_FIRING); + public void outputWithTimestamp(TupleTag tag, T output, Instant timestamp) { + checkNotNull(tag, "TupleTag passed to outputWithTimestamp cannot be null"); + outputWindowedValue(tag, output, timestamp, null, PaneInfo.NO_FIRING); } @Override @@ -559,16 +559,16 @@ public class SimpleDoFnRunner implements DoFnRunner void sideOutput(TupleTag tag, T output) { - checkNotNull(tag, "Tag passed to sideOutput cannot be null"); - context.sideOutputWindowedValue(tag, windowedValue.withValue(output)); + public void output(TupleTag tag, T output) { + checkNotNull(tag, "Tag passed to output cannot be null"); + context.outputWindowedValue(tag, windowedValue.withValue(output)); } @Override - public void sideOutputWithTimestamp(TupleTag tag, T output, Instant timestamp) { - checkNotNull(tag, "Tag passed to sideOutputWithTimestamp cannot be null"); + public void outputWithTimestamp(TupleTag tag, T output, Instant timestamp) { + checkNotNull(tag, "Tag passed to outputWithTimestamp cannot be null"); checkTimestamp(timestamp); - context.sideOutputWindowedValue( + context.outputWindowedValue( tag, output, timestamp, windowedValue.getWindows(), windowedValue.getPane()); } @@ -787,14 +787,14 @@ public class SimpleDoFnRunner implements DoFnRunner void sideOutput(TupleTag tag, T output) { - context.sideOutputWindowedValue( + public void output(TupleTag tag, T output) { + context.outputWindowedValue( tag, output, timestamp, Collections.singleton(window()), PaneInfo.NO_FIRING); } @Override - public void sideOutputWithTimestamp(TupleTag tag, T output, Instant timestamp) { - context.sideOutputWindowedValue( + public void outputWithTimestamp(TupleTag tag, T output, Instant timestamp) { + context.outputWindowedValue( tag, output, timestamp, Collections.singleton(window()), PaneInfo.NO_FIRING); } http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java index c88f1c9..6320a3a 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java @@ -60,11 +60,16 @@ class SimpleOldDoFnRunner implements DoFnRunner context; - public SimpleOldDoFnRunner(PipelineOptions options, OldDoFn fn, + public SimpleOldDoFnRunner( + PipelineOptions options, + OldDoFn fn, SideInputReader sideInputReader, OutputManager outputManager, - TupleTag mainOutputTag, List> sideOutputTags, StepContext stepContext, - AggregatorFactory aggregatorFactory, WindowingStrategy windowingStrategy) { + TupleTag mainOutputTag, + List> additionalOutputTags, + StepContext stepContext, + AggregatorFactory aggregatorFactory, + WindowingStrategy windowingStrategy) { this.fn = fn; this.context = new DoFnContext<>( options, @@ -72,7 +77,7 @@ class SimpleOldDoFnRunner implements DoFnRunner implements DoFnRunner mainOutputTag, - List> sideOutputTags, + List> additionalOutputTags, StepContext stepContext, AggregatorFactory aggregatorFactory, WindowFn windowFn) { @@ -190,8 +195,8 @@ class SimpleOldDoFnRunner implements DoFnRunner sideOutputTag : sideOutputTags) { - outputTags.add(sideOutputTag); + for (TupleTag additionalOutputTag : additionalOutputTags) { + outputTags.add(additionalOutputTag); } this.stepContext = stepContext; @@ -273,15 +278,15 @@ class SimpleOldDoFnRunner implements DoFnRunner void sideOutputWindowedValue(TupleTag tag, + private void outputWindowedValue(TupleTag tag, T output, Instant timestamp, Collection windows, PaneInfo pane) { - sideOutputWindowedValue(tag, makeWindowedValue(output, timestamp, windows, pane)); + outputWindowedValue(tag, makeWindowedValue(output, timestamp, windows, pane)); } - private void sideOutputWindowedValue(TupleTag tag, WindowedValue windowedElem) { + private void outputWindowedValue(TupleTag tag, WindowedValue windowedElem) { if (!outputTags.contains(tag)) { // This tag wasn't declared nor was it seen before during this execution. // Thus, this must be a new, undeclared and unconsumed output. @@ -289,18 +294,18 @@ class SimpleOldDoFnRunner implements DoFnRunner= MAX_SIDE_OUTPUTS) { throw new IllegalArgumentException( - "the number of side outputs has exceeded a limit of " + MAX_SIDE_OUTPUTS); + "the number of outputs has exceeded a limit of " + MAX_SIDE_OUTPUTS); } outputTags.add(tag); } outputManager.output(tag, windowedElem); if (stepContext != null) { - stepContext.noteSideOutput(tag, windowedElem); + stepContext.noteOutput(tag, windowedElem); } } - // Following implementations of output, outputWithTimestamp, and sideOutput + // Following implementations of output, outputWithTimestamp, and output // are only accessible in OldDoFn.startBundle and OldDoFn.finishBundle, and will be shadowed by // ProcessContext's versions in OldDoFn.processElement. @Override @@ -314,15 +319,15 @@ class SimpleOldDoFnRunner implements DoFnRunner void sideOutput(TupleTag tag, T output) { - checkNotNull(tag, "TupleTag passed to sideOutput cannot be null"); - sideOutputWindowedValue(tag, output, null, null, PaneInfo.NO_FIRING); + public void output(TupleTag tag, T output) { + checkNotNull(tag, "TupleTag passed to output cannot be null"); + outputWindowedValue(tag, output, null, null, PaneInfo.NO_FIRING); } @Override - public void sideOutputWithTimestamp(TupleTag tag, T output, Instant timestamp) { - checkNotNull(tag, "TupleTag passed to sideOutputWithTimestamp cannot be null"); - sideOutputWindowedValue(tag, output, timestamp, null, PaneInfo.NO_FIRING); + public void outputWithTimestamp(TupleTag tag, T output, Instant timestamp) { + checkNotNull(tag, "TupleTag passed to outputWithTimestamp cannot be null"); + outputWindowedValue(tag, output, timestamp, null, PaneInfo.NO_FIRING); } @Override @@ -428,16 +433,16 @@ class SimpleOldDoFnRunner implements DoFnRunner void sideOutput(TupleTag tag, T output) { - checkNotNull(tag, "Tag passed to sideOutput cannot be null"); - context.sideOutputWindowedValue(tag, windowedValue.withValue(output)); + public void output(TupleTag tag, T output) { + checkNotNull(tag, "Tag passed to output cannot be null"); + context.outputWindowedValue(tag, windowedValue.withValue(output)); } @Override - public void sideOutputWithTimestamp(TupleTag tag, T output, Instant timestamp) { - checkNotNull(tag, "Tag passed to sideOutputWithTimestamp cannot be null"); + public void outputWithTimestamp(TupleTag tag, T output, Instant timestamp) { + checkNotNull(tag, "Tag passed to outputWithTimestamp cannot be null"); checkTimestamp(timestamp); - context.sideOutputWindowedValue( + context.outputWindowedValue( tag, output, timestamp, windowedValue.getWindows(), windowedValue.getPane()); } @@ -471,13 +476,13 @@ class SimpleOldDoFnRunner implements DoFnRunner void sideOutputWindowedValue( - TupleTag tag, - SideOutputT output, + public void outputWindowedValue( + TupleTag tag, + AdditionalOutputT output, Instant timestamp, Collection windows, PaneInfo pane) { - context.sideOutputWindowedValue(tag, output, timestamp, windows, pane); + context.outputWindowedValue(tag, output, timestamp, windows, pane); } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java index c16bf44..9cc965a 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java @@ -118,7 +118,7 @@ public class SplittableParDo input.getWindowingStrategy(), parDo.getSideInputs(), parDo.getMainOutputTag(), - parDo.getSideOutputTags())); + parDo.getAdditionalOutputTags())); } private static @@ -188,14 +188,15 @@ public class SplittableParDo private final WindowingStrategy windowingStrategy; private final List> sideInputs; private final TupleTag mainOutputTag; - private final TupleTagList sideOutputTags; + private final TupleTagList additionalOutputTags; /** * @param fn the splittable {@link DoFn}. * @param windowingStrategy the {@link WindowingStrategy} of the input collection. * @param sideInputs list of side inputs that should be available to the {@link DoFn}. * @param mainOutputTag {@link TupleTag Tag} of the {@link DoFn DoFn's} main output. - * @param sideOutputTags {@link TupleTagList Tags} of the {@link DoFn DoFn's} side outputs. + * @param additionalOutputTags {@link TupleTagList Tags} of the {@link DoFn DoFn's} additional + * outputs. */ public ProcessElements( DoFn fn, @@ -204,14 +205,14 @@ public class SplittableParDo WindowingStrategy windowingStrategy, List> sideInputs, TupleTag mainOutputTag, - TupleTagList sideOutputTags) { + TupleTagList additionalOutputTags) { this.fn = fn; this.elementCoder = elementCoder; this.restrictionCoder = restrictionCoder; this.windowingStrategy = windowingStrategy; this.sideInputs = sideInputs; this.mainOutputTag = mainOutputTag; - this.sideOutputTags = sideOutputTags; + this.additionalOutputTags = additionalOutputTags; } public DoFn getFn() { @@ -226,8 +227,8 @@ public class SplittableParDo return mainOutputTag; } - public TupleTagList getSideOutputTags() { - return sideOutputTags; + public TupleTagList getAdditionalOutputTags() { + return additionalOutputTags; } public ProcessFn newProcessFn( @@ -244,7 +245,7 @@ public class SplittableParDo PCollectionTuple outputs = PCollectionTuple.ofPrimitiveOutputsInternal( input.getPipeline(), - TupleTagList.of(mainOutputTag).and(sideOutputTags.getAll()), + TupleTagList.of(mainOutputTag).and(additionalOutputTags.getAll()), windowingStrategy, input.isBounded().and(signature.isBoundedPerElement())); @@ -522,12 +523,12 @@ public class SplittableParDo } @Override - public void sideOutput(TupleTag tag, T output) { + public void output(TupleTag tag, T output) { throwUnsupportedOutput(); } @Override - public void sideOutputWithTimestamp(TupleTag tag, T output, Instant timestamp) { + public void outputWithTimestamp(TupleTag tag, T output, Instant timestamp) { throwUnsupportedOutput(); } http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternals.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternals.java index 8dc0bfc..5005065 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternals.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternals.java @@ -49,11 +49,11 @@ public interface WindowingInternals { Collection windows, PaneInfo pane); /** - * Output the value to a side output at the specified timestamp in the listed windows. + * Output the value to a tagged output at the specified timestamp in the listed windows. */ - void sideOutputWindowedValue( - TupleTag tag, - SideOutputT output, + void outputWindowedValue( + TupleTag tag, + AdditionalOutputT output, Instant timestamp, Collection windows, PaneInfo pane); http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternalsAdapters.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternalsAdapters.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternalsAdapters.java index 48a39d6..1b36bf9 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternalsAdapters.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternalsAdapters.java @@ -62,13 +62,13 @@ public class WindowingInternalsAdapters { } @Override - public void sideOutputWindowedValue( - TupleTag tag, - SideOutputT output, + public void outputWindowedValue( + TupleTag tag, + AdditionalOutputT output, Instant timestamp, Collection windows, PaneInfo pane) { - windowingInternals.sideOutputWindowedValue(tag, output, timestamp, windows, pane); + windowingInternals.outputWindowedValue(tag, output, timestamp, windows, pane); } }; } http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java index 6c7c4e0..d0a8923 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java @@ -677,9 +677,9 @@ public class GroupAlsoByWindowsProperties { } @Override - public void sideOutputWindowedValue( - TupleTag tag, - SideOutputT output, + public void outputWindowedValue( + TupleTag tag, + AdditionalOutputT output, Instant timestamp, Collection windows, PaneInfo pane) { @@ -729,12 +729,12 @@ public class GroupAlsoByWindowsProperties { } @Override - public void sideOutput(TupleTag tag, T output) { + public void output(TupleTag tag, T output) { throw new UnsupportedOperationException(); } @Override - public void sideOutputWithTimestamp(TupleTag tag, T output, Instant timestamp) { + public void outputWithTimestamp(TupleTag tag, T output, Instant timestamp) { throw new UnsupportedOperationException(); } http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/runners/core-java/src/test/java/org/apache/beam/runners/core/NoOpOldDoFn.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/NoOpOldDoFn.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/NoOpOldDoFn.java index 5cbea8c..2e5cd6d 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/NoOpOldDoFn.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/NoOpOldDoFn.java @@ -57,10 +57,10 @@ class NoOpOldDoFn extends OldDoFn { public void outputWithTimestamp(OutputT output, Instant timestamp) { } @Override - public void sideOutput(TupleTag tag, T output) { + public void output(TupleTag tag, T output) { } @Override - public void sideOutputWithTimestamp(TupleTag tag, T output, + public void outputWithTimestamp(TupleTag tag, T output, Instant timestamp) { } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnTest.java index 651bc72..425de07 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnTest.java @@ -160,12 +160,12 @@ public class OldDoFnTest implements Serializable { } @Override - public void sideOutput(TupleTag tag, T output) { + public void output(TupleTag tag, T output) { throw new UnsupportedOperationException(); } @Override - public void sideOutputWithTimestamp(TupleTag tag, T output, Instant timestamp) { + public void outputWithTimestamp(TupleTag tag, T output, Instant timestamp) { throw new UnsupportedOperationException(); } http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java index 965380b..541e238 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java @@ -80,9 +80,9 @@ public class OutputAndTimeBoundedSplittableProcessElementInvokerTest { PaneInfo pane) {} @Override - public void sideOutputWindowedValue( - TupleTag tag, - SideOutputT output, + public void outputWindowedValue( + TupleTag tag, + AdditionalOutputT output, Instant timestamp, Collection windows, PaneInfo pane) {} http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java index 512420f..914550e 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java @@ -574,13 +574,13 @@ public class ReduceFnTester { } @Override - public void sideOutputWindowedValue( - TupleTag tag, - SideOutputT output, + public void outputWindowedValue( + TupleTag tag, + AdditionalOutputT output, Instant timestamp, Collection windows, PaneInfo pane) { - throw new UnsupportedOperationException("GroupAlsoByWindow should not use side outputs"); + throw new UnsupportedOperationException("GroupAlsoByWindow should not use tagged outputs"); } } http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleOldDoFnRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleOldDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleOldDoFnRunnerTest.java index 28698ca..8ded2dc 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleOldDoFnRunnerTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleOldDoFnRunnerTest.java @@ -64,10 +64,10 @@ public class SimpleOldDoFnRunnerTest { private DoFnRunner createRunner(OldDoFn fn) { // Pass in only necessary parameters for the test - List> sideOutputTags = Arrays.asList(); + List> additionalOutputTags = Arrays.asList(); StepContext context = mock(StepContext.class); return new SimpleOldDoFnRunner<>( - null, fn, null, null, null, sideOutputTags, context, null, null); + null, fn, null, null, null, additionalOutputTags, context, null, null); } static class ThrowingDoFn extends OldDoFn { http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java index d301113..2c89543 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java @@ -356,13 +356,13 @@ public class SplittableParDoTest { Instant timestamp, Collection windows, PaneInfo pane) { - sideOutputWindowedValue(tester.getMainOutputTag(), output, timestamp, windows, pane); + outputWindowedValue(tester.getMainOutputTag(), output, timestamp, windows, pane); } @Override - public void sideOutputWindowedValue( - TupleTag tag, - SideOutputT output, + public void outputWindowedValue( + TupleTag tag, + AdditionalOutputT output, Instant timestamp, Collection windows, PaneInfo pane) { http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java index b4ca998..ce7b12a 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java @@ -264,13 +264,13 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory { } @Override - public void sideOutputWindowedValue( - TupleTag tag, - SideOutputT output, + public void outputWindowedValue( + TupleTag tag, + AdditionalOutputT output, Instant timestamp, Collection windows, PaneInfo pane) { - throw new UnsupportedOperationException("GroupAlsoByWindow should not use side outputs"); + throw new UnsupportedOperationException("GroupAlsoByWindow should not use tagged outputs"); } } } http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java index 328d139..49d0723 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java @@ -51,7 +51,7 @@ class ParDoEvaluator implements TransformEvaluator { StructuralKey key, List> sideInputs, TupleTag mainOutputTag, - List> sideOutputTags, + List> additionalOutputTags, Map, PCollection> outputs) { AggregatorContainer.Mutator aggregatorChanges = evaluationContext.getAggregatorMutator(); @@ -80,7 +80,7 @@ class ParDoEvaluator implements TransformEvaluator { sideInputReader, outputManager, mainOutputTag, - sideOutputTags, + additionalOutputTags, stepContext, aggregatorChanges, windowingStrategy); http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java index b8a13e2..0372295 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java @@ -80,7 +80,7 @@ final class ParDoEvaluatorFactory implements TransformEvaluator doFn, transform.getSideInputs(), transform.getMainOutputTag(), - transform.getSideOutputTags().getAll()); + transform.getAdditionalOutputTags().getAll()); return evaluator; } @@ -103,7 +103,7 @@ final class ParDoEvaluatorFactory implements TransformEvaluator DoFn doFn, List> sideInputs, TupleTag mainOutputTag, - List> sideOutputTags) + List> additionalOutputTags) throws Exception { String stepName = evaluationContext.getStepName(application); DirectStepContext stepContext = @@ -119,7 +119,7 @@ final class ParDoEvaluatorFactory implements TransformEvaluator inputBundleKey, sideInputs, mainOutputTag, - sideOutputTags, + additionalOutputTags, stepContext, fnManager.get(), fnManager), @@ -131,7 +131,7 @@ final class ParDoEvaluatorFactory implements TransformEvaluator StructuralKey key, List> sideInputs, TupleTag mainOutputTag, - List> sideOutputTags, + List> additionalOutputTags, DirectStepContext stepContext, DoFn fn, DoFnLifecycleManager fnManager) @@ -147,7 +147,7 @@ final class ParDoEvaluatorFactory implements TransformEvaluator key, sideInputs, mainOutputTag, - sideOutputTags, + additionalOutputTags, pcollections(application.getOutputs())); } catch (Exception e) { try { http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java index 00c0d6a..366777b 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java @@ -192,7 +192,7 @@ class ParDoMultiOverrideFactory PCollectionTuple.ofPrimitiveOutputsInternal( input.getPipeline(), TupleTagList.of(underlyingParDo.getMainOutputTag()) - .and(underlyingParDo.getSideOutputTags().getAll()), + .and(underlyingParDo.getAdditionalOutputTags().getAll()), input.getWindowingStrategy(), input.isBounded()); http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java index 07affd8..64cef35 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java @@ -105,7 +105,7 @@ class SplittableProcessElementsEvaluatorFactory< inputBundle.getKey(), transform.getSideInputs(), transform.getMainOutputTag(), - transform.getSideOutputTags().getAll(), + transform.getAdditionalOutputTags().getAll(), stepContext, processFn, fnManager); @@ -146,9 +146,9 @@ class SplittableProcessElementsEvaluatorFactory< } @Override - public void sideOutputWindowedValue( - TupleTag tag, - SideOutputT output, + public void outputWindowedValue( + TupleTag tag, + AdditionalOutputT output, Instant timestamp, Collection windows, PaneInfo pane) { http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java index f8fe3d6..be77ea1 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java @@ -120,7 +120,7 @@ final class StatefulParDoEvaluatorFactory implements Transfo doFn, application.getTransform().getUnderlyingParDo().getSideInputs(), application.getTransform().getUnderlyingParDo().getMainOutputTag(), - application.getTransform().getUnderlyingParDo().getSideOutputTags().getAll()); + application.getTransform().getUnderlyingParDo().getAdditionalOutputTags().getAll()); return new StatefulParDoEvaluator<>(delegateEvaluator); } http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java index 2a94d48..65a1248 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java @@ -70,7 +70,7 @@ public class ParDoEvaluatorTest { @Mock private EvaluationContext evaluationContext; private PCollection inputPc; private TupleTag mainOutputTag; - private List> sideOutputTags; + private List> additionalOutputTags; private BundleFactory bundleFactory; @Rule @@ -81,7 +81,7 @@ public class ParDoEvaluatorTest { MockitoAnnotations.initMocks(this); inputPc = p.apply(Create.of(1, 2, 3)); mainOutputTag = new TupleTag() {}; - sideOutputTags = TupleTagList.empty().getAll(); + additionalOutputTags = TupleTagList.empty().getAll(); bundleFactory = ImmutableListBundleFactory.create(); } @@ -168,7 +168,7 @@ public class ParDoEvaluatorTest { null /* key */, ImmutableList.>of(singletonView), mainOutputTag, - sideOutputTags, + additionalOutputTags, ImmutableMap., PCollection>of(mainOutputTag, output)); } http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java index af157f0..fbd7620 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java @@ -406,7 +406,7 @@ class FlinkStreamingTransformTranslators { DoFn doFn, List> sideInputs, TupleTag mainOutputTag, - List> sideOutputTags, + List> additionalOutputTags, FlinkStreamingTranslationContext context, WindowingStrategy windowingStrategy, Map, Integer> tagsToLabels, @@ -422,7 +422,7 @@ class FlinkStreamingTransformTranslators { List> sideInputs, Map, PValue> outputs, TupleTag mainOutputTag, - List> sideOutputTags, + List> additionalOutputTags, FlinkStreamingTranslationContext context, DoFnOperatorFactory doFnOperatorFactory) { @@ -460,7 +460,7 @@ class FlinkStreamingTransformTranslators { doFn, sideInputs, mainOutputTag, - sideOutputTags, + additionalOutputTags, context, windowingStrategy, tagsToLabels, @@ -485,7 +485,7 @@ class FlinkStreamingTransformTranslators { doFn, sideInputs, mainOutputTag, - sideOutputTags, + additionalOutputTags, context, windowingStrategy, tagsToLabels, @@ -605,7 +605,7 @@ class FlinkStreamingTransformTranslators { transform.getSideInputs(), context.getOutputs(transform), transform.getMainOutputTag(), - transform.getSideOutputTags().getAll(), + transform.getAdditionalOutputTags().getAll(), context, new ParDoTranslationHelper.DoFnOperatorFactory() { @Override @@ -613,7 +613,7 @@ class FlinkStreamingTransformTranslators { DoFn doFn, List> sideInputs, TupleTag mainOutputTag, - List> sideOutputTags, + List> additionalOutputTags, FlinkStreamingTranslationContext context, WindowingStrategy windowingStrategy, Map, Integer> tagsToLabels, @@ -624,7 +624,7 @@ class FlinkStreamingTransformTranslators { doFn, inputCoder, mainOutputTag, - sideOutputTags, + additionalOutputTags, new DoFnOperator.MultiOutputOutputManagerFactory(tagsToLabels), windowingStrategy, transformedSideInputs, @@ -654,7 +654,7 @@ class FlinkStreamingTransformTranslators { transform.getSideInputs(), context.getOutputs(transform), transform.getMainOutputTag(), - transform.getSideOutputTags().getAll(), + transform.getAdditionalOutputTags().getAll(), context, new ParDoTranslationHelper.DoFnOperatorFactory< KeyedWorkItem>, OutputT>() { @@ -668,7 +668,7 @@ class FlinkStreamingTransformTranslators { OutputT> doFn, List> sideInputs, TupleTag mainOutputTag, - List> sideOutputTags, + List> additionalOutputTags, FlinkStreamingTranslationContext context, WindowingStrategy windowingStrategy, Map, Integer> tagsToLabels, @@ -683,7 +683,7 @@ class FlinkStreamingTransformTranslators { doFn, inputCoder, mainOutputTag, - sideOutputTags, + additionalOutputTags, new DoFnOperator.MultiOutputOutputManagerFactory(tagsToLabels), windowingStrategy, transformedSideInputs, http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java index 9687478..51582af 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java @@ -87,7 +87,7 @@ public class FlinkDoFnFunction if (outputMap == null) { outputManager = new FlinkDoFnFunction.DoFnOutputManager(out); } else { - // it has some sideOutputs + // it has some additional outputs outputManager = new FlinkDoFnFunction.MultiDoFnOutputManager((Collector) out, outputMap); } @@ -97,7 +97,7 @@ public class FlinkDoFnFunction new FlinkSideInputReader(sideInputs, runtimeContext), outputManager, mainOutputTag, - // see SimpleDoFnRunner, just use it to limit number of side outputs + // see SimpleDoFnRunner, just use it to limit number of additional outputs Collections.>emptyList(), new FlinkNoOpStepContext(), new FlinkAggregatorFactory(runtimeContext), http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java index d901d8e..847a00a 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java @@ -47,7 +47,7 @@ public class FlinkNoOpStepContext implements StepContext { } @Override - public void noteSideOutput(TupleTag tag, WindowedValue output) { + public void noteOutput(TupleTag tag, WindowedValue output) { } http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java index 0d8399e..c8193d2 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java @@ -88,7 +88,7 @@ public class FlinkStatefulDoFnFunction if (outputMap == null) { outputManager = new FlinkDoFnFunction.DoFnOutputManager(out); } else { - // it has some sideOutputs + // it has some additional Outputs outputManager = new FlinkDoFnFunction.MultiDoFnOutputManager((Collector) out, outputMap); } @@ -114,7 +114,7 @@ public class FlinkStatefulDoFnFunction new FlinkSideInputReader(sideInputs, runtimeContext), outputManager, mainOutputTag, - // see SimpleDoFnRunner, just use it to limit number of side outputs + // see SimpleDoFnRunner, just use it to limit number of additional outputs Collections.>emptyList(), new FlinkNoOpStepContext() { @Override