Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 16F4B200C86 for ; Tue, 25 Apr 2017 19:30:00 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 15E0F160BC7; Tue, 25 Apr 2017 17:30:00 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 6DBD5160BB9 for ; Tue, 25 Apr 2017 19:29:57 +0200 (CEST) Received: (qmail 78351 invoked by uid 500); 25 Apr 2017 17:29:56 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 78220 invoked by uid 99); 25 Apr 2017 17:29:56 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 25 Apr 2017 17:29:56 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 4DA88DFBDA; Tue, 25 Apr 2017 17:29:56 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: kenn@apache.org To: commits@beam.apache.org Date: Tue, 25 Apr 2017 17:30:00 -0000 Message-Id: <5c73d922d4cc4d1caeee818160d9e1c7@git.apache.org> In-Reply-To: <2a7781abd4eb43c7936fde76997d1479@git.apache.org> References: <2a7781abd4eb43c7936fde76997d1479@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [05/50] [abbrv] beam git commit: Rename DoFn.Context#sideOutput to output archived-at: Tue, 25 Apr 2017 17:30:00 -0000 http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java index 9a66a2f..5496f71 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java @@ -97,7 +97,7 @@ import org.joda.time.Instant; * @param the input type of the {@link DoFn} * @param the output type of the {@link DoFn} * @param the output type of the operator, this can be different from the fn output - * type when we have side outputs + * type when we have additional tagged outputs */ public class DoFnOperator extends AbstractStreamOperator @@ -110,7 +110,7 @@ public class DoFnOperator protected final SerializedPipelineOptions serializedOptions; protected final TupleTag mainOutputTag; - protected final List> sideOutputTags; + protected final List> additionalOutputTags; protected final Collection> sideInputs; protected final Map> sideInputTagMapping; @@ -155,7 +155,7 @@ public class DoFnOperator DoFn doFn, Coder> inputCoder, TupleTag mainOutputTag, - List> sideOutputTags, + List> additionalOutputTags, OutputManagerFactory outputManagerFactory, WindowingStrategy windowingStrategy, Map> sideInputTagMapping, @@ -165,7 +165,7 @@ public class DoFnOperator this.doFn = doFn; this.inputCoder = inputCoder; this.mainOutputTag = mainOutputTag; - this.sideOutputTags = sideOutputTags; + this.additionalOutputTags = additionalOutputTags; this.sideInputTagMapping = sideInputTagMapping; this.sideInputs = sideInputs; this.serializedOptions = new SerializedPipelineOptions(options); @@ -275,7 +275,7 @@ public class DoFnOperator sideInputReader, outputManager, mainOutputTag, - sideOutputTags, + additionalOutputTags, stepContext, aggregatorFactory, windowingStrategy); @@ -619,7 +619,7 @@ public class DoFnOperator return new DoFnRunners.OutputManager() { @Override public void output(TupleTag tag, WindowedValue value) { - // with side outputs we can't get around this because we don't + // with tagged outputs we can't get around this because we don't // know our own output type... @SuppressWarnings("unchecked") OutputT castValue = (OutputT) value; @@ -675,7 +675,7 @@ public class DoFnOperator public void noteOutput(WindowedValue output) {} @Override - public void noteSideOutput(TupleTag tag, WindowedValue output) {} + public void noteOutput(TupleTag tag, WindowedValue output) {} @Override public void writePCollectionViewData( http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java index 0724ac2..1a636c9 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java @@ -63,7 +63,7 @@ public class SplittableDoFnOperator< WindowedValue< KeyedWorkItem>>> inputCoder, TupleTag mainOutputTag, - List> sideOutputTags, + List> additionalOutputTags, OutputManagerFactory outputManagerFactory, WindowingStrategy windowingStrategy, Map> sideInputTagMapping, @@ -74,7 +74,7 @@ public class SplittableDoFnOperator< doFn, inputCoder, mainOutputTag, - sideOutputTags, + additionalOutputTags, outputManagerFactory, windowingStrategy, sideInputTagMapping, @@ -125,9 +125,9 @@ public class SplittableDoFnOperator< } @Override - public void sideOutputWindowedValue( - TupleTag tag, - SideOutputT output, + public void outputWindowedValue( + TupleTag tag, + AdditionalOutputT output, Instant timestamp, Collection windows, PaneInfo pane) { http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java index b015f66..8bbc6ef 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java @@ -57,7 +57,7 @@ public class WindowDoFnOperator SystemReduceFn systemReduceFn, Coder>> inputCoder, TupleTag> mainOutputTag, - List> sideOutputTags, + List> additionalOutputTags, OutputManagerFactory>> outputManagerFactory, WindowingStrategy windowingStrategy, Map> sideInputTagMapping, @@ -68,7 +68,7 @@ public class WindowDoFnOperator null, inputCoder, mainOutputTag, - sideOutputTags, + additionalOutputTags, outputManagerFactory, windowingStrategy, sideInputTagMapping, http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java index c1fdea3..4c826d1 100644 --- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java +++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java @@ -144,19 +144,19 @@ public class DoFnOperatorTest { WindowedValue.getValueOnlyCoder(StringUtf8Coder.of()); TupleTag mainOutput = new TupleTag<>("main-output"); - TupleTag sideOutput1 = new TupleTag<>("side-output-1"); - TupleTag sideOutput2 = new TupleTag<>("side-output-2"); + TupleTag additionalOutput1 = new TupleTag<>("output-1"); + TupleTag additionalOutput2 = new TupleTag<>("output-2"); ImmutableMap, Integer> outputMapping = ImmutableMap., Integer>builder() .put(mainOutput, 1) - .put(sideOutput1, 2) - .put(sideOutput2, 3) + .put(additionalOutput1, 2) + .put(additionalOutput2, 3) .build(); DoFnOperator doFnOperator = new DoFnOperator<>( - new MultiOutputDoFn(sideOutput1, sideOutput2), + new MultiOutputDoFn(additionalOutput1, additionalOutput2), windowedValueCoder, mainOutput, - ImmutableList.>of(sideOutput1, sideOutput2), + ImmutableList.>of(additionalOutput1, additionalOutput2), new DoFnOperator.MultiOutputOutputManagerFactory(outputMapping), WindowingStrategy.globalDefault(), new HashMap>(), /* side-input mapping */ @@ -176,8 +176,8 @@ public class DoFnOperatorTest { assertThat( this.stripStreamRecordFromRawUnion(testHarness.getOutput()), contains( - new RawUnionValue(2, WindowedValue.valueInGlobalWindow("side: one")), - new RawUnionValue(3, WindowedValue.valueInGlobalWindow("side: two")), + new RawUnionValue(2, WindowedValue.valueInGlobalWindow("extra: one")), + new RawUnionValue(3, WindowedValue.valueInGlobalWindow("extra: two")), new RawUnionValue(1, WindowedValue.valueInGlobalWindow("got: hello")), new RawUnionValue(2, WindowedValue.valueInGlobalWindow("got: hello")), new RawUnionValue(3, WindowedValue.valueInGlobalWindow("got: hello")))); @@ -542,24 +542,24 @@ public class DoFnOperatorTest { } private static class MultiOutputDoFn extends DoFn { - private TupleTag sideOutput1; - private TupleTag sideOutput2; + private TupleTag additionalOutput1; + private TupleTag additionalOutput2; - public MultiOutputDoFn(TupleTag sideOutput1, TupleTag sideOutput2) { - this.sideOutput1 = sideOutput1; - this.sideOutput2 = sideOutput2; + public MultiOutputDoFn(TupleTag additionalOutput1, TupleTag additionalOutput2) { + this.additionalOutput1 = additionalOutput1; + this.additionalOutput2 = additionalOutput2; } @ProcessElement public void processElement(ProcessContext c) throws Exception { if (c.element().equals("one")) { - c.sideOutput(sideOutput1, "side: one"); + c.output(additionalOutput1, "extra: one"); } else if (c.element().equals("two")) { - c.sideOutput(sideOutput2, "side: two"); + c.output(additionalOutput2, "extra: two"); } else { c.output("got: " + c.element()); - c.sideOutput(sideOutput1, "got: " + c.element()); - c.sideOutput(sideOutput2, "got: " + c.element()); + c.output(additionalOutput1, "got: " + c.element()); + c.output(additionalOutput2, "got: " + c.element()); } } } http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java index 3ded079..73f3728 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java @@ -175,7 +175,7 @@ public class BatchStatefulParDoOverrides { ParDo.of(new BatchStatefulDoFn(fn)) .withSideInputs(originalParDo.getSideInputs()) .withOutputTags( - originalParDo.getMainOutputTag(), originalParDo.getSideOutputTags()); + originalParDo.getMainOutputTag(), originalParDo.getAdditionalOutputTags()); return input.apply(new GbkBeforeStatefulParDo()).apply(statefulParDo); } http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java index 86bfeb6..ead2712 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java @@ -494,7 +494,7 @@ class BatchViewOverrides { */ private void outputMetadataRecordForSize( ProcessContext c, KV, WindowedValue> value, long uniqueKeyCount) { - c.sideOutput(outputForSize, + c.output(outputForSize, KV.of(ismCoder.hash(ImmutableList.of(IsmFormat.getMetadataKey(), value.getKey().getValue())), KV.of(value.getKey().getValue(), uniqueKeyCount))); @@ -503,7 +503,7 @@ class BatchViewOverrides { /** This outputs records which will be used to construct the entry set. */ private void outputMetadataRecordForEntrySet( ProcessContext c, KV, WindowedValue> value) { - c.sideOutput(outputForEntrySet, + c.output(outputForEntrySet, KV.of(ismCoder.hash(ImmutableList.of(IsmFormat.getMetadataKey(), value.getKey().getValue())), KV.of(value.getKey().getValue(), value.getKey().getKey()))); @@ -773,7 +773,7 @@ class BatchViewOverrides { coderForMapLike(windowCoder, inputCoder.getKeyCoder(), inputCoder.getValueCoder()); // Create the various output tags representing the main output containing the data stream - // and the side outputs containing the metadata about the size and entry set. + // and the additional outputs containing the metadata about the size and entry set. TupleTag>> mainOutputTag = new TupleTag<>(); TupleTag>> outputForSizeTag = new TupleTag<>(); TupleTag>> outputForEntrySetTag = new TupleTag<>(); http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchViewOverridesTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchViewOverridesTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchViewOverridesTest.java index cd12c92..87395e6 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchViewOverridesTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchViewOverridesTest.java @@ -280,7 +280,7 @@ public class BatchViewOverridesTest { // Verify the number of unique keys per window. assertThat( - doFnTester.takeSideOutputElements(outputForSizeTag), + doFnTester.takeOutputElements(outputForSizeTag), contains( KV.of( ismCoder.hash(ImmutableList.of(IsmFormat.getMetadataKey(), windowA)), @@ -294,7 +294,7 @@ public class BatchViewOverridesTest { // Verify the output for the unique keys. assertThat( - doFnTester.takeSideOutputElements(outputForEntrySetTag), + doFnTester.takeOutputElements(outputForEntrySetTag), contains( KV.of( ismCoder.hash(ImmutableList.of(IsmFormat.getMetadataKey(), windowA)), http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java index 0e74fa2..029c28a 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java @@ -412,12 +412,14 @@ public class SparkGroupAlsoByWindowViaWindowSet { } @Override - public void sideOutputWindowedValue( - TupleTag tag, - SideOutputT output, Instant timestamp, + public void outputWindowedValue( + TupleTag tag, + AdditionalOutputT output, + Instant timestamp, Collection windows, PaneInfo pane) { - throw new UnsupportedOperationException("Side outputs are not allowed in GroupAlsoByWindow."); + throw new UnsupportedOperationException( + "Tagged outputs are not allowed in GroupAlsoByWindow."); } } http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java index a761954..4cd1683 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java @@ -43,8 +43,8 @@ import scala.Tuple2; /** - * DoFunctions ignore side outputs. MultiDoFunctions deal with side outputs by enriching the - * underlying data with multiple TupleTags. + * DoFunctions ignore outputs that are not the main output. MultiDoFunctions deal with additional + * outputs by enriching the underlying data with multiple TupleTags. * * @param Input type for DoFunction. * @param Output type for DoFunction. http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java index d19c4a9..ccc0fa3 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java @@ -164,12 +164,12 @@ public class SparkGroupAlsoByWindowViaOutputBufferFn void sideOutputWindowedValue( - TupleTag tag, - SideOutputT output, + public void outputWindowedValue( + TupleTag tag, + AdditionalOutputT output, Instant timestamp, Collection windows, PaneInfo pane) { - throw new UnsupportedOperationException("GroupAlsoByWindow should not use side outputs."); + throw new UnsupportedOperationException("GroupAlsoByWindow should not use tagged outputs."); } Iterable>>> getOutputs() { http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java index 4f8a1a5..3e8dde5 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java @@ -113,7 +113,7 @@ class SparkProcessContext { public void noteOutput(WindowedValue output) { } @Override - public void noteSideOutput(TupleTag tag, WindowedValue output) { } + public void noteOutput(TupleTag tag, WindowedValue output) { } @Override public void writePCollectionViewData( http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java index 65892d2..000eada 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java @@ -385,7 +385,7 @@ public final class StreamingTransformTranslator { JavaDStream> dStream = unboundedDataset.getDStream(); final String stepName = context.getCurrentTransform().getFullName(); - if (transform.getSideOutputTags().size() == 0) { + if (transform.getAdditionalOutputTags().size() == 0) { JavaPairDStream, WindowedValue> all = dStream.transformToPair( new Function< http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java index 8fe4831..58d65d0 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java @@ -2191,7 +2191,7 @@ public class Combine { c.output(kv); } else { int nonce = counter++ % spread; - c.sideOutput(hot, KV.of(KV.of(kv.getKey(), nonce), kv.getValue())); + c.output(hot, KV.of(KV.of(kv.getKey(), nonce), kv.getValue())); } } }) http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java index 74a1348..d3da251 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java @@ -153,14 +153,14 @@ public abstract class DoFn implements Serializable, HasDisplayD public abstract void outputWithTimestamp(OutputT output, Instant timestamp); /** - * Adds the given element to the side output {@code PCollection} with the + * Adds the given element to the output {@code PCollection} with the * given tag. * - *

Once passed to {@code sideOutput} the element should not be modified + *

Once passed to {@code output} the element should not be modified * in any way. * *

The caller of {@code ParDo} uses {@link ParDo.SingleOutput#withOutputTags} to - * specify the tags of side outputs that it consumes. Non-consumed side + * specify the tags of outputs that it consumes. Non-consumed * outputs, e.g., outputs for monitoring purposes only, don't necessarily * need to be specified. * @@ -180,13 +180,13 @@ public abstract class DoFn implements Serializable, HasDisplayD * * @see ParDo.SingleOutput#withOutputTags */ - public abstract void sideOutput(TupleTag tag, T output); + public abstract void output(TupleTag tag, T output); /** - * Adds the given element to the specified side output {@code PCollection}, + * Adds the given element to the specified output {@code PCollection}, * with the given timestamp. * - *

Once passed to {@code sideOutputWithTimestamp} the element should not be + *

Once passed to {@code outputWithTimestamp} the element should not be * modified in any way. * *

If invoked from {@link ProcessElement}), the timestamp @@ -207,7 +207,7 @@ public abstract class DoFn implements Serializable, HasDisplayD * * @see ParDo.SingleOutput#withOutputTags */ - public abstract void sideOutputWithTimestamp( + public abstract void outputWithTimestamp( TupleTag tag, T output, Instant timestamp); /** http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java index 88f4035..5446431 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java @@ -65,7 +65,7 @@ import org.joda.time.Instant; * * // Set arguments shared across all bundles: * fnTester.setSideInputs(...); // If fn takes side inputs. - * fnTester.setSideOutputTags(...); // If fn writes to side outputs. + * fnTester.setOutputTags(...); // If fn writes to more than one output. * * // Process a bundle containing a single input element: * Input testInput = ...; @@ -464,14 +464,14 @@ public class DoFnTester implements AutoCloseable { } /** - * Returns the elements output so far to the side output with the + * Returns the elements output so far to the output with the * given tag. Does not clear them, so subsequent calls will * continue to include these elements. * - * @see #takeSideOutputElements - * @see #clearSideOutputElements + * @see #takeOutputElements + * @see #clearOutputElements */ - public List peekSideOutputElements(TupleTag tag) { + public List peekOutputElements(TupleTag tag) { // TODO: Should we return an unmodifiable list? return Lists.transform(getImmutableOutput(tag), new Function, T>() { @@ -483,24 +483,23 @@ public class DoFnTester implements AutoCloseable { } /** - * Clears the record of the elements output so far to the side - * output with the given tag. + * Clears the record of the elements output so far to the output with the given tag. * - * @see #peekSideOutputElements + * @see #peekOutputElements */ - public void clearSideOutputElements(TupleTag tag) { + public void clearOutputElements(TupleTag tag) { getMutableOutput(tag).clear(); } /** - * Returns the elements output so far to the side output with the given tag. + * Returns the elements output so far to the output with the given tag. * Clears the list so these elements don't appear in future calls. * - * @see #peekSideOutputElements + * @see #peekOutputElements */ - public List takeSideOutputElements(TupleTag tag) { - List resultElems = new ArrayList<>(peekSideOutputElements(tag)); - clearSideOutputElements(tag); + public List takeOutputElements(TupleTag tag) { + List resultElems = new ArrayList<>(peekOutputElements(tag)); + clearOutputElements(tag); return resultElems; } @@ -563,12 +562,12 @@ public class DoFnTester implements AutoCloseable { } @Override - public void sideOutputWithTimestamp(TupleTag tag, T output, Instant timestamp) { + public void outputWithTimestamp(TupleTag tag, T output, Instant timestamp) { throwUnsupportedOutputFromBundleMethods(); } @Override - public void sideOutput(TupleTag tag, T output) { + public void output(TupleTag tag, T output) { throwUnsupportedOutputFromBundleMethods(); } @@ -683,21 +682,21 @@ public class DoFnTester implements AutoCloseable { @Override public void output(OutputT output) { - sideOutput(mainOutputTag, output); + output(mainOutputTag, output); } @Override public void outputWithTimestamp(OutputT output, Instant timestamp) { - sideOutputWithTimestamp(mainOutputTag, output, timestamp); + outputWithTimestamp(mainOutputTag, output, timestamp); } @Override - public void sideOutput(TupleTag tag, T output) { - sideOutputWithTimestamp(tag, output, element.getTimestamp()); + public void output(TupleTag tag, T output) { + outputWithTimestamp(tag, output, element.getTimestamp()); } @Override - public void sideOutputWithTimestamp(TupleTag tag, T output, Instant timestamp) { + public void outputWithTimestamp(TupleTag tag, T output, Instant timestamp) { getMutableOutput(tag) .add(ValueInSingleWindow.of(output, timestamp, element.getWindow(), element.getPane())); } http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java index 3de845b..e3777ac 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java @@ -35,6 +35,7 @@ import org.apache.beam.sdk.coders.CoderRegistry; import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.display.DisplayData.Builder; +import org.apache.beam.sdk.transforms.display.DisplayData.ItemSpec; import org.apache.beam.sdk.transforms.display.HasDisplayData; import org.apache.beam.sdk.transforms.reflect.DoFnSignature; import org.apache.beam.sdk.transforms.reflect.DoFnSignature.MethodWithExtraParameters; @@ -103,7 +104,7 @@ import org.apache.beam.sdk.values.TypedPValue; *

Each of the calls to any of the {@link DoFn DoFn's} processing * methods can produce zero or more output elements. All of the * of output elements from all of the {@link DoFn} instances - * are included in the output {@link PCollection}. + * are included in an output {@link PCollection}. * *

For example: * @@ -180,20 +181,20 @@ import org.apache.beam.sdk.values.TypedPValue; * }})); * } * - *

Side Outputs

+ *

Additional Outputs

* *

Optionally, a {@link ParDo} transform can produce multiple * output {@link PCollection PCollections}, both a "main output" - * {@code PCollection} plus any number of "side output" + * {@code PCollection} plus any number of additional output * {@link PCollection PCollections}, each keyed by a distinct {@link TupleTag}, * and bundled in a {@link PCollectionTuple}. The {@link TupleTag TupleTags} * to be used for the output {@link PCollectionTuple} are specified by - * invoking {@link SingleOutput#withOutputTags}. Unconsumed side outputs do not + * invoking {@link SingleOutput#withOutputTags}. Unconsumed outputs do not * necessarily need to be explicitly specified, even if the {@link DoFn} * generates them. Within the {@link DoFn}, an element is added to the * main output {@link PCollection} as normal, using - * {@link DoFn.Context#output}, while an element is added to a side output - * {@link PCollection} using {@link DoFn.Context#sideOutput}. For example: + * {@link DoFn.Context#output(Object)}, while an element is added to any additional output + * {@link PCollection} using {@link DoFn.Context#output(TupleTag, Object)}. For example: * *

{@code
  * PCollection words = ...;
@@ -201,7 +202,7 @@ import org.apache.beam.sdk.values.TypedPValue;
  * // plus the lengths of words that are above the cut off.
  * // Also select words starting with "MARKER".
  * final int wordLengthCutOff = 10;
- * // Create tags to use for the main and side outputs.
+ * // Create tags to use for the main and additional outputs.
  * final TupleTag wordsBelowCutOffTag =
  *     new TupleTag(){};
  * final TupleTag wordLengthsAboveCutOffTag =
@@ -212,7 +213,7 @@ import org.apache.beam.sdk.values.TypedPValue;
  *     words.apply(
  *         ParDo
  *         .of(new DoFn() {
- *             // Create a tag for the unconsumed side output.
+ *             // Create a tag for the unconsumed output.
  *             final TupleTag specialWordsTag =
  *                 new TupleTag(){};
  *            {@literal @}ProcessElement
@@ -222,19 +223,19 @@ import org.apache.beam.sdk.values.TypedPValue;
  *                 // Emit this short word to the main output.
  *                 c.output(word);
  *               } else {
- *                 // Emit this long word's length to a side output.
- *                 c.sideOutput(wordLengthsAboveCutOffTag, word.length());
+ *                 // Emit this long word's length to a specified output.
+ *                 c.output(wordLengthsAboveCutOffTag, word.length());
  *               }
  *               if (word.startsWith("MARKER")) {
- *                 // Emit this word to a different side output.
- *                 c.sideOutput(markedWordsTag, word);
+ *                 // Emit this word to a different specified output.
+ *                 c.output(markedWordsTag, word);
  *               }
  *               if (word.startsWith("SPECIAL")) {
- *                 // Emit this word to the unconsumed side output.
- *                 c.sideOutput(specialWordsTag, word);
+ *                 // Emit this word to the unconsumed output.
+ *                 c.output(specialWordsTag, word);
  *               }
  *             }})
- *             // Specify the main and consumed side output tags of the
+ *             // Specify the main and consumed output tags of the
  *             // PCollectionTuple result:
  *         .withOutputTags(wordsBelowCutOffTag,
  *             TupleTagList.of(wordLengthsAboveCutOffTag)
@@ -254,9 +255,9 @@ import org.apache.beam.sdk.values.TypedPValue;
  * elements of the main output {@link PCollection PCollection<OutputT>} is
  * inferred from the concrete type of the {@link DoFn DoFn<InputT, OutputT>}.
  *
- * 

By default, the {@link Coder Coder<SideOutputT>} for the elements of - * a side output {@link PCollection PCollection<SideOutputT>} is inferred - * from the concrete type of the corresponding {@link TupleTag TupleTag<SideOutputT>}. + *

By default, the {@link Coder Coder<AdditionalOutputT>} for the elements of + * an output {@link PCollection PCollection<AdditionalOutputT>} is inferred + * from the concrete type of the corresponding {@link TupleTag TupleTag<AdditionalOutputT>}. * To be successful, the {@link TupleTag} should be created as an instance * of a trivial anonymous subclass, with {@code {}} suffixed to the * constructor call. Such uses block Java's generic type parameter @@ -265,12 +266,12 @@ import org.apache.beam.sdk.values.TypedPValue; *

 {@code
  * // A TupleTag to use for a side input can be written concisely:
  * final TupleTag sideInputag = new TupleTag<>();
- * // A TupleTag to use for a side output should be written with "{}",
+ * // A TupleTag to use for an output should be written with "{}",
  * // and explicit generic parameter type:
- * final TupleTag sideOutputTag = new TupleTag(){};
+ * final TupleTag additionalOutputTag = new TupleTag(){};
  * } 
* This style of {@code TupleTag} instantiation is used in the example of - * multiple side outputs, above. + * {@link ParDo ParDos} that produce multiple outputs, above. * *

Serializability of {@link DoFn DoFns}

* @@ -358,7 +359,7 @@ import org.apache.beam.sdk.values.TypedPValue; * that state across Java processes. All information should be * communicated to {@link DoFn} instances via main and side inputs and * serialized state, and all output should be communicated from a - * {@link DoFn} instance via main and side outputs, in the absence of + * {@link DoFn} instance via output {@link PCollection PCollections}, in the absence of * external communication mechanisms written by user code. * *

Fault Tolerance

@@ -602,14 +603,14 @@ public class ParDo { /** * Returns a new multi-output {@link ParDo} {@link PTransform} that's like this {@link - * PTransform} but with the specified main and side output tags. Does not modify this {@link + * PTransform} but with the specified output tags. Does not modify this {@link * PTransform}. * - *

See the discussion of Side Outputs above for more explanation. + *

See the discussion of Additional Outputs above for more explanation. */ public MultiOutput withOutputTags( - TupleTag mainOutputTag, TupleTagList sideOutputTags) { - return new MultiOutput<>(fn, sideInputs, mainOutputTag, sideOutputTags, fnDisplayData); + TupleTag mainOutputTag, TupleTagList additionalOutputTags) { + return new MultiOutput<>(fn, sideInputs, mainOutputTag, additionalOutputTags, fnDisplayData); } @Override @@ -671,11 +672,9 @@ public class ParDo { } /** - * A {@link PTransform} that, when applied to a - * {@code PCollection}, invokes a user-specified - * {@code DoFn} on all its elements, which can emit elements - * to any of the {@link PTransform}'s main and side output - * {@code PCollection}s, which are bundled into a result + * A {@link PTransform} that, when applied to a {@code PCollection}, invokes a + * user-specified {@code DoFn} on all its elements, which can emit elements to + * any of the {@link PTransform}'s output {@code PCollection}s, which are bundled into a result * {@code PCollectionTuple}. * * @param the type of the (main) input {@code PCollection} elements @@ -685,7 +684,7 @@ public class ParDo { extends PTransform, PCollectionTuple> { private final List> sideInputs; private final TupleTag mainOutputTag; - private final TupleTagList sideOutputTags; + private final TupleTagList additionalOutputTags; private final DisplayData.ItemSpec> fnDisplayData; private final DoFn fn; @@ -693,11 +692,11 @@ public class ParDo { DoFn fn, List> sideInputs, TupleTag mainOutputTag, - TupleTagList sideOutputTags, - DisplayData.ItemSpec> fnDisplayData) { + TupleTagList additionalOutputTags, + ItemSpec> fnDisplayData) { this.sideInputs = sideInputs; this.mainOutputTag = mainOutputTag; - this.sideOutputTags = sideOutputTags; + this.additionalOutputTags = additionalOutputTags; this.fn = SerializableUtils.clone(fn); this.fnDisplayData = fnDisplayData; } @@ -730,7 +729,7 @@ public class ParDo { .addAll(sideInputs) .build(), mainOutputTag, - sideOutputTags, + additionalOutputTags, fnDisplayData); } @@ -745,7 +744,7 @@ public class ParDo { PCollectionTuple outputs = PCollectionTuple.ofPrimitiveOutputsInternal( input.getPipeline(), - TupleTagList.of(mainOutputTag).and(sideOutputTags.getAll()), + TupleTagList.of(mainOutputTag).and(additionalOutputTags.getAll()), input.getWindowingStrategy(), input.isBounded()); @@ -794,8 +793,8 @@ public class ParDo { return mainOutputTag; } - public TupleTagList getSideOutputTags() { - return sideOutputTags; + public TupleTagList getAdditionalOutputTags() { + return additionalOutputTags; } public List> getSideInputs() { http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java index 2031bc9..595d18c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java @@ -169,7 +169,7 @@ public class Partition extends PTransform, PCollectionList> if (0 <= partition && partition < numPartitions) { @SuppressWarnings("unchecked") TupleTag typedTag = (TupleTag) outputTags.get(partition); - c.sideOutput(typedTag, input); + c.output(typedTag, input); } else { throw new IndexOutOfBoundsException( "Partition function returned out of bounds index: " http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionTuple.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionTuple.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionTuple.java index 0ab26ca..ce67e94 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionTuple.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionTuple.java @@ -37,8 +37,7 @@ import org.apache.beam.sdk.values.PCollection.IsBounded; * {@link PTransform} taking * or producing multiple PCollection inputs or outputs that can be of * different types, for instance a - * {@link ParDo} with side - * outputs. + * {@link ParDo} with multiple outputs. * *

A {@link PCollectionTuple} can be created and accessed like follows: *

 {@code

http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TupleTag.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TupleTag.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TupleTag.java
index a6b63ab..37d41f7 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TupleTag.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TupleTag.java
@@ -31,25 +31,23 @@ import org.apache.beam.sdk.util.CloudObject;
 import org.apache.beam.sdk.util.PropertyNames;
 
 /**
- * A {@link TupleTag} is a typed tag to use as the key of a
- * heterogeneously typed tuple, like {@link PCollectionTuple}.
- * Its generic type parameter allows tracking
- * the static type of things stored in tuples.
+ * A {@link TupleTag} is a typed tag to use as the key of a heterogeneously typed tuple, like {@link
+ * PCollectionTuple}. Its generic type parameter allows tracking the static type of things stored in
+ * tuples.
  *
- * 

To aid in assigning default {@link org.apache.beam.sdk.coders.Coder Coders} for results of - * side outputs of {@link ParDo}, an output - * {@link TupleTag} should be instantiated with an extra {@code {}} so - * it is an instance of an anonymous subclass without generic type - * parameters. Input {@link TupleTag TupleTags} require no such extra - * instantiation (although it doesn't hurt). For example: + *

To aid in assigning default {@link org.apache.beam.sdk.coders.Coder Coders} for results of a + * {@link ParDo}, an output {@link TupleTag} should be instantiated with an extra {@code {}} so it + * is an instance of an anonymous subclass without generic type parameters. Input {@link TupleTag + * TupleTags} require no such extra instantiation (although it doesn't hurt). For example: * - *

 {@code
+ * 
{@code
  * TupleTag inputTag = new TupleTag<>();
  * TupleTag outputTag = new TupleTag(){};
- * } 
+ * } + *
* - * @param the type of the elements or values of the tagged thing, - * e.g., a {@code PCollection}. + * @param the type of the elements or values of the tagged thing, e.g., a {@code + * PCollection}. */ public class TupleTag implements Serializable { /** http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TupleTagList.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TupleTagList.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TupleTagList.java index b4ce941..5aeff5e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TupleTagList.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TupleTagList.java @@ -28,7 +28,7 @@ import org.apache.beam.sdk.transforms.ParDo; /** * A {@link TupleTagList} is an immutable list of heterogeneously * typed {@link TupleTag TupleTags}. A {@link TupleTagList} is used, for instance, to - * specify the tags of the side outputs of a + * specify the tags of the additional outputs of a * {@link ParDo}. * *

A {@link TupleTagList} can be created and accessed like follows: http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypedPValue.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypedPValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypedPValue.java index d353835..54af747 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypedPValue.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypedPValue.java @@ -148,14 +148,14 @@ public abstract class TypedPValue extends PValueBase implements PValue { return new CoderOrFailure<>(registry.getDefaultCoder(token), null); } catch (CannotProvideCoderException exc) { inferFromTokenException = exc; - // Attempt to detect when the token came from a TupleTag used for a ParDo side output, + // Attempt to detect when the token came from a TupleTag used for a ParDo output, // and provide a better error message if so. Unfortunately, this information is not // directly available from the TypeDescriptor, so infer based on the type of the PTransform // and the error message itself. if (transform instanceof ParDo.MultiOutput && exc.getReason() == ReasonCode.TYPE_ERASURE) { inferFromTokenException = new CannotProvideCoderException(exc.getMessage() - + " If this error occurs for a side output of the producing ParDo, verify that the " + + " If this error occurs for an output of the producing ParDo, verify that the " + "TupleTag for this output is constructed with proper type information (see " + "TupleTag Javadoc) or explicitly set the Coder to use if this is not possible."); } http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java index 3555db3..afe384d 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java @@ -221,7 +221,7 @@ public class MetricsTest implements Serializable { values.update(element); gauge.set(12L); c.output(element); - c.sideOutput(output2, element); + c.output(output2, element); } }) .withOutputTags(output1, TupleTagList.of(output2))); http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java index b429eab..589c744 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java @@ -153,15 +153,15 @@ public class ParDoTest implements Serializable { State state = State.NOT_SET_UP; final List> sideInputViews = new ArrayList<>(); - final List> sideOutputTupleTags = new ArrayList<>(); + final List> additionalOutputTupleTags = new ArrayList<>(); public TestDoFn() { } public TestDoFn(List> sideInputViews, - List> sideOutputTupleTags) { + List> additionalOutputTupleTags) { this.sideInputViews.addAll(sideInputViews); - this.sideOutputTupleTags.addAll(sideOutputTupleTags); + this.additionalOutputTupleTags.addAll(additionalOutputTupleTags); } @Setup @@ -197,9 +197,9 @@ public class ParDoTest implements Serializable { private void outputToAll(Context c, String value) { c.output(value); - for (TupleTag sideOutputTupleTag : sideOutputTupleTags) { - c.sideOutput(sideOutputTupleTag, - sideOutputTupleTag.getId() + ": " + value); + for (TupleTag additionalOutputTupleTag : additionalOutputTupleTags) { + c.output(additionalOutputTupleTag, + additionalOutputTupleTag.getId() + ": " + value); } } @@ -212,9 +212,9 @@ public class ParDoTest implements Serializable { value += ": " + sideInputValues; } c.output(value); - for (TupleTag sideOutputTupleTag : sideOutputTupleTags) { - c.sideOutput(sideOutputTupleTag, - sideOutputTupleTag.getId() + ": " + value); + for (TupleTag additionalOutputTupleTag : additionalOutputTupleTags) { + c.output(additionalOutputTupleTag, + additionalOutputTupleTag.getId() + ": " + value); } } } @@ -389,90 +389,90 @@ public class ParDoTest implements Serializable { @Test @Category(ValidatesRunner.class) - public void testParDoWithSideOutputs() { + public void testParDoWithTaggedOutput() { List inputs = Arrays.asList(3, -42, 666); TupleTag mainOutputTag = new TupleTag("main"){}; - TupleTag sideOutputTag1 = new TupleTag("side1"){}; - TupleTag sideOutputTag2 = new TupleTag("side2"){}; - TupleTag sideOutputTag3 = new TupleTag("side3"){}; - TupleTag sideOutputTagUnwritten = new TupleTag("sideUnwritten"){}; + TupleTag additionalOutputTag1 = new TupleTag("additional1"){}; + TupleTag additionalOutputTag2 = new TupleTag("additional2"){}; + TupleTag additionalOutputTag3 = new TupleTag("additional3"){}; + TupleTag additionalOutputTagUnwritten = new TupleTag("unwrittenOutput"){}; PCollectionTuple outputs = pipeline .apply(Create.of(inputs)) .apply(ParDo .of(new TestDoFn( Arrays.>asList(), - Arrays.asList(sideOutputTag1, sideOutputTag2, sideOutputTag3))) + Arrays.asList(additionalOutputTag1, additionalOutputTag2, additionalOutputTag3))) .withOutputTags( mainOutputTag, - TupleTagList.of(sideOutputTag3) - .and(sideOutputTag1) - .and(sideOutputTagUnwritten) - .and(sideOutputTag2))); + TupleTagList.of(additionalOutputTag3) + .and(additionalOutputTag1) + .and(additionalOutputTagUnwritten) + .and(additionalOutputTag2))); PAssert.that(outputs.get(mainOutputTag)) .satisfies(ParDoTest.HasExpectedOutput.forInput(inputs)); - PAssert.that(outputs.get(sideOutputTag1)) + PAssert.that(outputs.get(additionalOutputTag1)) .satisfies(ParDoTest.HasExpectedOutput.forInput(inputs) - .fromSideOutput(sideOutputTag1)); - PAssert.that(outputs.get(sideOutputTag2)) + .fromOutput(additionalOutputTag1)); + PAssert.that(outputs.get(additionalOutputTag2)) .satisfies(ParDoTest.HasExpectedOutput.forInput(inputs) - .fromSideOutput(sideOutputTag2)); - PAssert.that(outputs.get(sideOutputTag3)) + .fromOutput(additionalOutputTag2)); + PAssert.that(outputs.get(additionalOutputTag3)) .satisfies(ParDoTest.HasExpectedOutput.forInput(inputs) - .fromSideOutput(sideOutputTag3)); - PAssert.that(outputs.get(sideOutputTagUnwritten)).empty(); + .fromOutput(additionalOutputTag3)); + PAssert.that(outputs.get(additionalOutputTagUnwritten)).empty(); pipeline.run(); } @Test @Category(ValidatesRunner.class) - public void testParDoEmptyWithSideOutputs() { + public void testParDoEmptyWithTaggedOutput() { TupleTag mainOutputTag = new TupleTag("main"){}; - TupleTag sideOutputTag1 = new TupleTag("side1"){}; - TupleTag sideOutputTag2 = new TupleTag("side2"){}; - TupleTag sideOutputTag3 = new TupleTag("side3"){}; - TupleTag sideOutputTagUnwritten = new TupleTag("sideUnwritten"){}; + TupleTag additionalOutputTag1 = new TupleTag("additional1"){}; + TupleTag additionalOutputTag2 = new TupleTag("additional2"){}; + TupleTag additionalOutputTag3 = new TupleTag("additional3"){}; + TupleTag additionalOutputTagUnwritten = new TupleTag("unwrittenOutput"){}; PCollectionTuple outputs = pipeline .apply(Create.empty(VarIntCoder.of())) .apply(ParDo .of(new TestDoFn( Arrays.>asList(), - Arrays.asList(sideOutputTag1, sideOutputTag2, sideOutputTag3))) + Arrays.asList(additionalOutputTag1, additionalOutputTag2, additionalOutputTag3))) .withOutputTags( mainOutputTag, - TupleTagList.of(sideOutputTag3).and(sideOutputTag1) - .and(sideOutputTagUnwritten).and(sideOutputTag2))); + TupleTagList.of(additionalOutputTag3).and(additionalOutputTag1) + .and(additionalOutputTagUnwritten).and(additionalOutputTag2))); List inputs = Collections.emptyList(); PAssert.that(outputs.get(mainOutputTag)) .satisfies(ParDoTest.HasExpectedOutput.forInput(inputs)); - PAssert.that(outputs.get(sideOutputTag1)) + PAssert.that(outputs.get(additionalOutputTag1)) .satisfies(ParDoTest.HasExpectedOutput.forInput(inputs) - .fromSideOutput(sideOutputTag1)); - PAssert.that(outputs.get(sideOutputTag2)) + .fromOutput(additionalOutputTag1)); + PAssert.that(outputs.get(additionalOutputTag2)) .satisfies(ParDoTest.HasExpectedOutput.forInput(inputs) - .fromSideOutput(sideOutputTag2)); - PAssert.that(outputs.get(sideOutputTag3)) + .fromOutput(additionalOutputTag2)); + PAssert.that(outputs.get(additionalOutputTag3)) .satisfies(ParDoTest.HasExpectedOutput.forInput(inputs) - .fromSideOutput(sideOutputTag3)); - PAssert.that(outputs.get(sideOutputTagUnwritten)).empty(); + .fromOutput(additionalOutputTag3)); + PAssert.that(outputs.get(additionalOutputTagUnwritten)).empty(); pipeline.run(); } @Test @Category(ValidatesRunner.class) - public void testParDoWithEmptySideOutputs() { + public void testParDoWithEmptyTaggedOutput() { TupleTag mainOutputTag = new TupleTag("main"){}; - TupleTag sideOutputTag1 = new TupleTag("side1"){}; - TupleTag sideOutputTag2 = new TupleTag("side2"){}; + TupleTag additionalOutputTag1 = new TupleTag("additional1"){}; + TupleTag additionalOutputTag2 = new TupleTag("additional2"){}; PCollectionTuple outputs = pipeline .apply(Create.empty(VarIntCoder.of())) @@ -480,12 +480,12 @@ public class ParDoTest implements Serializable { .of(new TestNoOutputDoFn()) .withOutputTags( mainOutputTag, - TupleTagList.of(sideOutputTag1).and(sideOutputTag2))); + TupleTagList.of(additionalOutputTag1).and(additionalOutputTag2))); PAssert.that(outputs.get(mainOutputTag)).empty(); - PAssert.that(outputs.get(sideOutputTag1)).empty(); - PAssert.that(outputs.get(sideOutputTag2)).empty(); + PAssert.that(outputs.get(additionalOutputTag1)).empty(); + PAssert.that(outputs.get(additionalOutputTag2)).empty(); pipeline.run(); } @@ -493,12 +493,12 @@ public class ParDoTest implements Serializable { @Test @Category(ValidatesRunner.class) - public void testParDoWithOnlySideOutputs() { + public void testParDoWithOnlyTaggedOutput() { List inputs = Arrays.asList(3, -42, 666); final TupleTag mainOutputTag = new TupleTag("main"){}; - final TupleTag sideOutputTag = new TupleTag("side"){}; + final TupleTag additionalOutputTag = new TupleTag("additional"){}; PCollectionTuple outputs = pipeline .apply(Create.of(inputs)) @@ -506,29 +506,29 @@ public class ParDoTest implements Serializable { .of(new DoFn(){ @ProcessElement public void processElement(ProcessContext c) { - c.sideOutput(sideOutputTag, c.element()); + c.output(additionalOutputTag, c.element()); }}) - .withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag))); + .withOutputTags(mainOutputTag, TupleTagList.of(additionalOutputTag))); PAssert.that(outputs.get(mainOutputTag)).empty(); - PAssert.that(outputs.get(sideOutputTag)).containsInAnyOrder(inputs); + PAssert.that(outputs.get(additionalOutputTag)).containsInAnyOrder(inputs); pipeline.run(); } @Test @Category(NeedsRunner.class) - public void testParDoWritingToUndeclaredSideOutput() { + public void testParDoWritingToUndeclaredTag() { List inputs = Arrays.asList(3, -42, 666); - TupleTag sideTag = new TupleTag("side"){}; + TupleTag notOutputTag = new TupleTag("additional"){}; PCollection output = pipeline .apply(Create.of(inputs)) .apply(ParDo.of(new TestDoFn( Arrays.>asList(), - Arrays.asList(sideTag)))); + Arrays.asList(notOutputTag)))); PAssert.that(output) .satisfies(ParDoTest.HasExpectedOutput.forInput(inputs)); @@ -539,7 +539,7 @@ public class ParDoTest implements Serializable { @Test // TODO: The exception thrown is runner-specific, even if the behavior is general @Category(NeedsRunner.class) - public void testParDoUndeclaredSideOutputLimit() { + public void testParDoUndeclaredTagLimit() { PCollection input = pipeline.apply(Create.of(Arrays.asList(3))); @@ -548,13 +548,13 @@ public class ParDoTest implements Serializable { .apply("Success1000", ParDo.of(new DoFn() { @ProcessElement public void processElement(ProcessContext c) { - TupleTag specialSideTag = new TupleTag(){}; - c.sideOutput(specialSideTag, "side"); - c.sideOutput(specialSideTag, "side"); - c.sideOutput(specialSideTag, "side"); + TupleTag specialOutputTag = new TupleTag(){}; + c.output(specialOutputTag, "special"); + c.output(specialOutputTag, "special"); + c.output(specialOutputTag, "special"); for (int i = 0; i < 998; i++) { - c.sideOutput(new TupleTag(){}, "side"); + c.output(new TupleTag(){}, "tag" + i); } }})); pipeline.run(); @@ -565,12 +565,12 @@ public class ParDoTest implements Serializable { @ProcessElement public void processElement(ProcessContext c) { for (int i = 0; i < 1000; i++) { - c.sideOutput(new TupleTag(){}, "side"); + c.output(new TupleTag(){}, "output" + i); } }})); thrown.expect(RuntimeException.class); - thrown.expectMessage("the number of side outputs has exceeded a limit"); + thrown.expectMessage("the number of outputs has exceeded a limit"); pipeline.run(); } @@ -647,7 +647,7 @@ public class ParDoTest implements Serializable { List inputs = Arrays.asList(3, -42, 666); final TupleTag mainOutputTag = new TupleTag("main"){}; - final TupleTag sideOutputTag = new TupleTag("sideOutput"){}; + final TupleTag additionalOutputTag = new TupleTag("output"){}; PCollectionView sideInput1 = pipeline .apply("CreateSideInput1", Create.of(11)) @@ -668,7 +668,7 @@ public class ParDoTest implements Serializable { .withSideInputs(sideInput1) .withSideInputs(sideInputUnread) .withSideInputs(sideInput2) - .withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag))); + .withOutputTags(mainOutputTag, TupleTagList.of(additionalOutputTag))); PAssert.that(outputs.get(mainOutputTag)) .satisfies(ParDoTest.HasExpectedOutput @@ -685,7 +685,7 @@ public class ParDoTest implements Serializable { List inputs = Arrays.asList(3, -42, 666); final TupleTag mainOutputTag = new TupleTag("main"){}; - final TupleTag sideOutputTag = new TupleTag("sideOutput"){}; + final TupleTag additionalOutputTag = new TupleTag("output"){}; PCollectionView sideInput1 = pipeline .apply("CreateSideInput1", Create.of(11)) @@ -706,7 +706,7 @@ public class ParDoTest implements Serializable { .withSideInputs(sideInput1) .withSideInputs(sideInputUnread) .withSideInputs(sideInput2) - .withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag))); + .withOutputTags(mainOutputTag, TupleTagList.of(additionalOutputTag))); PAssert.that(outputs.get(mainOutputTag)) .satisfies(ParDoTest.HasExpectedOutput @@ -853,37 +853,37 @@ public class ParDoTest implements Serializable { @Test public void testParDoMultiNameBasedDoFnWithTrimmerSuffix() { assertThat( - ParDo.of(new SideOutputDummyFn(null)).withOutputTags(null, null).getName(), - containsString("ParMultiDo(SideOutputDummy)")); + ParDo.of(new TaggedOutputDummyFn(null)).withOutputTags(null, null).getName(), + containsString("ParMultiDo(TaggedOutputDummy)")); } @Test - public void testParDoWithSideOutputsName() { + public void testParDoWithTaggedOutputName() { pipeline.enableAbandonedNodeEnforcement(false); TupleTag mainOutputTag = new TupleTag("main"){}; - TupleTag sideOutputTag1 = new TupleTag("side1"){}; - TupleTag sideOutputTag2 = new TupleTag("side2"){}; - TupleTag sideOutputTag3 = new TupleTag("side3"){}; - TupleTag sideOutputTagUnwritten = new TupleTag("sideUnwritten"){}; + TupleTag additionalOutputTag1 = new TupleTag("output1"){}; + TupleTag additionalOutputTag2 = new TupleTag("output2"){}; + TupleTag additionalOutputTag3 = new TupleTag("output3"){}; + TupleTag additionalOutputTagUnwritten = new TupleTag("unwrittenOutput"){}; PCollectionTuple outputs = pipeline .apply(Create.of(Arrays.asList(3, -42, 666))).setName("MyInput") .apply("MyParDo", ParDo .of(new TestDoFn( Arrays.>asList(), - Arrays.asList(sideOutputTag1, sideOutputTag2, sideOutputTag3))) + Arrays.asList(additionalOutputTag1, additionalOutputTag2, additionalOutputTag3))) .withOutputTags( mainOutputTag, - TupleTagList.of(sideOutputTag3).and(sideOutputTag1) - .and(sideOutputTagUnwritten).and(sideOutputTag2))); + TupleTagList.of(additionalOutputTag3).and(additionalOutputTag1) + .and(additionalOutputTagUnwritten).and(additionalOutputTag2))); assertEquals("MyParDo.main", outputs.get(mainOutputTag).getName()); - assertEquals("MyParDo.side1", outputs.get(sideOutputTag1).getName()); - assertEquals("MyParDo.side2", outputs.get(sideOutputTag2).getName()); - assertEquals("MyParDo.side3", outputs.get(sideOutputTag3).getName()); - assertEquals("MyParDo.sideUnwritten", - outputs.get(sideOutputTagUnwritten).getName()); + assertEquals("MyParDo.output1", outputs.get(additionalOutputTag1).getName()); + assertEquals("MyParDo.output2", outputs.get(additionalOutputTag2).getName()); + assertEquals("MyParDo.output3", outputs.get(additionalOutputTag3).getName()); + assertEquals("MyParDo.unwrittenOutput", + outputs.get(additionalOutputTagUnwritten).getName()); } @Test @@ -892,29 +892,29 @@ public class ParDoTest implements Serializable { PCollection longs = pipeline.apply(CountingInput.unbounded()); TupleTag mainOut = new TupleTag<>(); - final TupleTag sideOutOne = new TupleTag<>(); - final TupleTag sideOutTwo = new TupleTag<>(); + final TupleTag valueAsString = new TupleTag<>(); + final TupleTag valueAsInt = new TupleTag<>(); DoFn fn = new DoFn() { @ProcessElement public void processElement(ProcessContext cxt) { cxt.output(cxt.element()); - cxt.sideOutput(sideOutOne, Long.toString(cxt.element())); - cxt.sideOutput(sideOutTwo, Long.valueOf(cxt.element()).intValue()); + cxt.output(valueAsString, Long.toString(cxt.element())); + cxt.output(valueAsInt, Long.valueOf(cxt.element()).intValue()); } }; ParDo.MultiOutput parDo = - ParDo.of(fn).withOutputTags(mainOut, TupleTagList.of(sideOutOne).and(sideOutTwo)); + ParDo.of(fn).withOutputTags(mainOut, TupleTagList.of(valueAsString).and(valueAsInt)); PCollectionTuple firstApplication = longs.apply("first", parDo); PCollectionTuple secondApplication = longs.apply("second", parDo); assertThat(firstApplication, not(equalTo(secondApplication))); assertThat( firstApplication.getAll().keySet(), - Matchers.>containsInAnyOrder(mainOut, sideOutOne, sideOutTwo)); + Matchers.>containsInAnyOrder(mainOut, valueAsString, valueAsInt)); assertThat( secondApplication.getAll().keySet(), - Matchers.>containsInAnyOrder(mainOut, sideOutOne, sideOutTwo)); + Matchers.>containsInAnyOrder(mainOut, valueAsString, valueAsInt)); } @Test @@ -1017,28 +1017,28 @@ public class ParDoTest implements Serializable { } } - private static class SideOutputDummyFn extends DoFn { - private TupleTag sideTag; - public SideOutputDummyFn(TupleTag sideTag) { - this.sideTag = sideTag; + private static class TaggedOutputDummyFn extends DoFn { + private TupleTag dummyOutputTag; + public TaggedOutputDummyFn(TupleTag dummyOutputTag) { + this.dummyOutputTag = dummyOutputTag; } @ProcessElement public void processElement(ProcessContext c) { c.output(1); - c.sideOutput(sideTag, new TestDummy()); + c.output(dummyOutputTag, new TestDummy()); } } private static class MainOutputDummyFn extends DoFn { - private TupleTag sideTag; - public MainOutputDummyFn(TupleTag sideTag) { - this.sideTag = sideTag; + private TupleTag intOutputTag; + public MainOutputDummyFn(TupleTag intOutputTag) { + this.intOutputTag = intOutputTag; } @ProcessElement public void processElement(ProcessContext c) { c.output(new TestDummy()); - c.sideOutput(sideTag, 1); + c.output(intOutputTag, 1); } } @@ -1112,7 +1112,7 @@ public class ParDoTest implements Serializable { implements SerializableFunction, Void>, Serializable { private final List inputs; private final List sideInputs; - private final String sideOutput; + private final String additionalOutput; private final boolean ordered; public static HasExpectedOutput forInput(List inputs) { @@ -1125,11 +1125,11 @@ public class ParDoTest implements Serializable { private HasExpectedOutput(List inputs, List sideInputs, - String sideOutput, + String additionalOutput, boolean ordered) { this.inputs = inputs; this.sideInputs = sideInputs; - this.sideOutput = sideOutput; + this.additionalOutput = additionalOutput; this.ordered = ordered; } @@ -1138,18 +1138,18 @@ public class ParDoTest implements Serializable { for (Integer sideInputValue : sideInputValues) { sideInputs.add(sideInputValue); } - return new HasExpectedOutput(inputs, sideInputs, sideOutput, ordered); + return new HasExpectedOutput(inputs, sideInputs, additionalOutput, ordered); } - public HasExpectedOutput fromSideOutput(TupleTag sideOutputTag) { - return fromSideOutput(sideOutputTag.getId()); + public HasExpectedOutput fromOutput(TupleTag outputTag) { + return fromOutput(outputTag.getId()); } - public HasExpectedOutput fromSideOutput(String sideOutput) { - return new HasExpectedOutput(inputs, sideInputs, sideOutput, ordered); + public HasExpectedOutput fromOutput(String outputId) { + return new HasExpectedOutput(inputs, sideInputs, outputId, ordered); } public HasExpectedOutput inOrder() { - return new HasExpectedOutput(inputs, sideInputs, sideOutput, true); + return new HasExpectedOutput(inputs, sideInputs, additionalOutput, true); } @Override @@ -1174,17 +1174,17 @@ public class ParDoTest implements Serializable { sideInputsSuffix = ": " + sideInputs; } - String sideOutputPrefix; - if (sideOutput.isEmpty()) { - sideOutputPrefix = ""; + String additionalOutputPrefix; + if (additionalOutput.isEmpty()) { + additionalOutputPrefix = ""; } else { - sideOutputPrefix = sideOutput + ": "; + additionalOutputPrefix = additionalOutput + ": "; } List expectedProcesseds = new ArrayList<>(); for (Integer input : inputs) { expectedProcesseds.add( - sideOutputPrefix + "processing: " + input + sideInputsSuffix); + additionalOutputPrefix + "processing: " + input + sideInputsSuffix); } String[] expectedProcessedsArray = expectedProcesseds.toArray(new String[expectedProcesseds.size()]); @@ -1196,10 +1196,10 @@ public class ParDoTest implements Serializable { assertEquals(starteds.size(), finisheds.size()); for (String started : starteds) { - assertEquals(sideOutputPrefix + "started", started); + assertEquals(additionalOutputPrefix + "started", started); } for (String finished : finisheds) { - assertEquals(sideOutputPrefix + "finished", finished); + assertEquals(additionalOutputPrefix + "finished", finished); } return null; @@ -1208,15 +1208,15 @@ public class ParDoTest implements Serializable { @Test @Category(NeedsRunner.class) - public void testSideOutputUnknownCoder() throws Exception { + public void testTaggedOutputUnknownCoder() throws Exception { PCollection input = pipeline .apply(Create.of(Arrays.asList(1, 2, 3))); final TupleTag mainOutputTag = new TupleTag("main"); - final TupleTag sideOutputTag = new TupleTag("unknownSide"); - input.apply(ParDo.of(new SideOutputDummyFn(sideOutputTag)) - .withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag))); + final TupleTag additionalOutputTag = new TupleTag("unknownSide"); + input.apply(ParDo.of(new TaggedOutputDummyFn(additionalOutputTag)) + .withOutputTags(mainOutputTag, TupleTagList.of(additionalOutputTag))); thrown.expect(IllegalStateException.class); thrown.expectMessage("Unable to return a default Coder"); @@ -1224,26 +1224,27 @@ public class ParDoTest implements Serializable { } @Test - public void testSideOutputUnregisteredExplicitCoder() throws Exception { + public void testTaggedOutputUnregisteredExplicitCoder() throws Exception { pipeline.enableAbandonedNodeEnforcement(false); PCollection input = pipeline .apply(Create.of(Arrays.asList(1, 2, 3))); final TupleTag mainOutputTag = new TupleTag("main"); - final TupleTag sideOutputTag = new TupleTag("unregisteredSide"); - ParDo.MultiOutput pardo = ParDo.of(new SideOutputDummyFn(sideOutputTag)) - .withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag)); + final TupleTag additionalOutputTag = new TupleTag("unregisteredSide"); + ParDo.MultiOutput pardo = + ParDo.of(new TaggedOutputDummyFn(additionalOutputTag)) + .withOutputTags(mainOutputTag, TupleTagList.of(additionalOutputTag)); PCollectionTuple outputTuple = input.apply(pardo); - outputTuple.get(sideOutputTag).setCoder(new TestDummyCoder()); + outputTuple.get(additionalOutputTag).setCoder(new TestDummyCoder()); - outputTuple.get(sideOutputTag).apply(View.asSingleton()); + outputTuple.get(additionalOutputTag).apply(View.asSingleton()); - assertEquals(new TestDummyCoder(), outputTuple.get(sideOutputTag).getCoder()); - outputTuple.get(sideOutputTag).finishSpecifyingOutput(input, pardo); // Check for crashes + assertEquals(new TestDummyCoder(), outputTuple.get(additionalOutputTag).getCoder()); + outputTuple.get(additionalOutputTag).finishSpecifyingOutput(input, pardo); // Check for crashes assertEquals(new TestDummyCoder(), - outputTuple.get(sideOutputTag).getCoder()); // Check for corruption + outputTuple.get(additionalOutputTag).getCoder()); // Check for corruption } @Test @@ -1254,9 +1255,11 @@ public class ParDoTest implements Serializable { .apply(Create.of(Arrays.asList(1, 2, 3))); final TupleTag mainOutputTag = new TupleTag("unregisteredMain"); - final TupleTag sideOutputTag = new TupleTag("side") {}; - PCollectionTuple outputTuple = input.apply(ParDo.of(new MainOutputDummyFn(sideOutputTag)) - .withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag))); + final TupleTag additionalOutputTag = new TupleTag("additionalOutput") {}; + PCollectionTuple outputTuple = + input.apply( + ParDo.of(new MainOutputDummyFn(additionalOutputTag)) + .withOutputTags(mainOutputTag, TupleTagList.of(additionalOutputTag))); outputTuple.get(mainOutputTag).setCoder(new TestDummyCoder()); @@ -1265,13 +1268,13 @@ public class ParDoTest implements Serializable { @Test @Category(NeedsRunner.class) - public void testMainOutputApplySideOutputNoCoder() { + public void testMainOutputApplyTaggedOutputNoCoder() { // Regression test: applying a transform to the main output // should not cause a crash based on lack of a coder for the - // side output. + // additional output. final TupleTag mainOutputTag = new TupleTag("main"); - final TupleTag sideOutputTag = new TupleTag("side"); + final TupleTag additionalOutputTag = new TupleTag("additionalOutput"); PCollectionTuple tuple = pipeline .apply(Create.of(new TestDummy()) .withCoder(TestDummyCoder.of())) @@ -1282,14 +1285,14 @@ public class ParDoTest implements Serializable { public void processElement(ProcessContext context) { TestDummy element = context.element(); context.output(element); - context.sideOutput(sideOutputTag, element); + context.output(additionalOutputTag, element); } }) - .withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag)) + .withOutputTags(mainOutputTag, TupleTagList.of(additionalOutputTag)) ); // Before fix, tuple.get(mainOutputTag).apply(...) would indirectly trigger - // tuple.get(sideOutputTag).finishSpecifyingOutput(), which would crash + // tuple.get(additionalOutputTag).finishSpecifyingOutput(), which would crash // on a missing coder. tuple.get(mainOutputTag) .setCoder(TestDummyCoder.of()) @@ -1300,7 +1303,7 @@ public class ParDoTest implements Serializable { } })); - tuple.get(sideOutputTag).setCoder(TestDummyCoder.of()); + tuple.get(additionalOutputTag).setCoder(TestDummyCoder.of()); pipeline.run(); } @@ -1328,13 +1331,13 @@ public class ParDoTest implements Serializable { @Test @Category(NeedsRunner.class) - public void testParDoSideOutputWithTimestamp() { + public void testParDoTaggedOutputWithTimestamp() { PCollection input = pipeline.apply(Create.of(Arrays.asList(3, 42, 6))); final TupleTag mainOutputTag = new TupleTag("main"){}; - final TupleTag sideOutputTag = new TupleTag("side"){}; + final TupleTag additionalOutputTag = new TupleTag("additional"){}; PCollection output = input @@ -1342,11 +1345,11 @@ public class ParDoTest implements Serializable { new DoFn() { @ProcessElement public void processElement(ProcessContext c) { - c.sideOutputWithTimestamp( - sideOutputTag, c.element(), new Instant(c.element().longValue())); + c.outputWithTimestamp( + additionalOutputTag, c.element(), new Instant(c.element().longValue())); } - }).withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag))) - .get(sideOutputTag) + }).withOutputTags(mainOutputTag, TupleTagList.of(additionalOutputTag))) + .get(additionalOutputTag) .apply(ParDo.of(new TestShiftTimestampDoFn(Duration.ZERO, Duration.ZERO))) .apply(ParDo.of(new TestFormatTimestampDoFn())); @@ -1914,7 +1917,7 @@ public class ParDoTest implements Serializable { @Test @Category({ValidatesRunner.class, UsesStatefulParDo.class}) - public void testValueStateSideOutput() { + public void testValueStateTaggedOutput() { final String stateId = "foo"; final TupleTag evenTag = new TupleTag() {}; @@ -1934,7 +1937,7 @@ public class ParDoTest implements Serializable { if (currentValue % 2 == 0) { c.output(currentValue); } else { - c.sideOutput(oddTag, currentValue); + c.output(oddTag, currentValue); } state.write(currentValue + 1); } http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java index a122f67..9e8c12e 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java @@ -218,12 +218,12 @@ public class SplittableDoFnTest { private static class SDFWithSideInputsAndOutputs extends DoFn { private final PCollectionView sideInput; - private final TupleTag sideOutput; + private final TupleTag additionalOutput; private SDFWithSideInputsAndOutputs( - PCollectionView sideInput, TupleTag sideOutput) { + PCollectionView sideInput, TupleTag additionalOutput) { this.sideInput = sideInput; - this.sideOutput = sideOutput; + this.additionalOutput = additionalOutput; } @ProcessElement @@ -231,7 +231,7 @@ public class SplittableDoFnTest { checkState(tracker.tryClaim(tracker.currentRestriction().getFrom())); String side = c.sideInput(sideInput); c.output("main:" + side + ":" + c.element()); - c.sideOutput(sideOutput, "side:" + side + ":" + c.element()); + c.output(additionalOutput, "additional:" + side + ":" + c.element()); } @GetInitialRestriction @@ -247,21 +247,22 @@ public class SplittableDoFnTest { PCollectionView sideInput = p.apply("side input", Create.of("foo")).apply(View.asSingleton()); TupleTag mainOutputTag = new TupleTag<>("main"); - TupleTag sideOutputTag = new TupleTag<>("side"); + TupleTag additionalOutputTag = new TupleTag<>("additional"); PCollectionTuple res = p.apply("input", Create.of(0, 1, 2)) .apply( - ParDo.of(new SDFWithSideInputsAndOutputs(sideInput, sideOutputTag)) + ParDo.of(new SDFWithSideInputsAndOutputs(sideInput, additionalOutputTag)) .withSideInputs(sideInput) - .withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag))); + .withOutputTags(mainOutputTag, TupleTagList.of(additionalOutputTag))); res.get(mainOutputTag).setCoder(StringUtf8Coder.of()); - res.get(sideOutputTag).setCoder(StringUtf8Coder.of()); + res.get(additionalOutputTag).setCoder(StringUtf8Coder.of()); PAssert.that(res.get(mainOutputTag)) .containsInAnyOrder(Arrays.asList("main:foo:0", "main:foo:1", "main:foo:2")); - PAssert.that(res.get(sideOutputTag)) - .containsInAnyOrder(Arrays.asList("side:foo:0", "side:foo:1", "side:foo:2")); + PAssert.that(res.get(additionalOutputTag)) + .containsInAnyOrder( + Arrays.asList("additional:foo:0", "additional:foo:1", "additional:foo:2")); p.run(); } http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java index 0a0abd6..9df0512 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java @@ -82,7 +82,7 @@ public final class PCollectionTupleTest implements Serializable { TupleTag mainOutputTag = new TupleTag("main") {}; TupleTag emptyOutputTag = new TupleTag("empty") {}; - final TupleTag sideOutputTag = new TupleTag("side") {}; + final TupleTag additionalOutputTag = new TupleTag("extra") {}; PCollection mainInput = pipeline .apply(Create.of(inputs)); @@ -91,14 +91,14 @@ public final class PCollectionTupleTest implements Serializable { .of(new DoFn() { @ProcessElement public void processElement(ProcessContext c) { - c.sideOutput(sideOutputTag, c.element()); + c.output(additionalOutputTag, c.element()); }}) - .withOutputTags(emptyOutputTag, TupleTagList.of(sideOutputTag))); + .withOutputTags(emptyOutputTag, TupleTagList.of(additionalOutputTag))); assertNotNull("outputs.getPipeline()", outputs.getPipeline()); outputs = outputs.and(mainOutputTag, mainInput); PAssert.that(outputs.get(mainOutputTag)).containsInAnyOrder(inputs); - PAssert.that(outputs.get(sideOutputTag)).containsInAnyOrder(inputs); + PAssert.that(outputs.get(additionalOutputTag)).containsInAnyOrder(inputs); PAssert.that(outputs.get(emptyOutputTag)).empty(); pipeline.run();