Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 99E34200C68 for ; Wed, 19 Apr 2017 03:12:28 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 9805E160BB5; Wed, 19 Apr 2017 01:12:28 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id E9027160BB2 for ; Wed, 19 Apr 2017 03:12:27 +0200 (CEST) Received: (qmail 89064 invoked by uid 500); 19 Apr 2017 01:12:27 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 88884 invoked by uid 99); 19 Apr 2017 01:12:27 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 19 Apr 2017 01:12:27 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id A1AB6DFDAC; Wed, 19 Apr 2017 01:12:25 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jkff@apache.org To: commits@beam.apache.org Date: Wed, 19 Apr 2017 01:12:27 -0000 Message-Id: <810bd74bfb76482f88c0fb6bbaf64173@git.apache.org> In-Reply-To: <1f3496f22cd34135813c6c528028423e@git.apache.org> References: <1f3496f22cd34135813c6c528028423e@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [3/7] beam git commit: Separates side input test and side output test archived-at: Wed, 19 Apr 2017 01:12:28 -0000 Separates side input test and side output test Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a51bdd26 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a51bdd26 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a51bdd26 Branch: refs/heads/master Commit: a51bdd266f9c877cb407de986a465fc9c7de76ff Parents: a9bcc8b Author: Eugene Kirpichov Authored: Sat Apr 15 16:38:35 2017 -0700 Committer: Eugene Kirpichov Committed: Tue Apr 18 18:02:06 2017 -0700 ---------------------------------------------------------------------- .../beam/sdk/transforms/SplittableDoFnTest.java | 63 ++++++++++++++------ 1 file changed, 44 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/a51bdd26/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java index 9e8c12e..30329f4 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -61,7 +62,7 @@ import org.junit.runners.JUnit4; * Tests for sideInput = p.apply("side input", Create.of("foo")).apply(View.asSingleton()); - TupleTag mainOutputTag = new TupleTag<>("main"); - TupleTag additionalOutputTag = new TupleTag<>("additional"); + + PCollection res = + p.apply("input", Create.of(0, 1, 2)) + .apply(ParDo.of(new SDFWithSideInput(sideInput)).withSideInputs(sideInput)); + + PAssert.that(res).containsInAnyOrder(Arrays.asList("foo:0", "foo:1", "foo:2")); + + p.run(); + } + + private static class SDFWithAdditionalOutput extends DoFn { + private final TupleTag additionalOutput; + + private SDFWithAdditionalOutput(TupleTag additionalOutput) { + this.additionalOutput = additionalOutput; + } + + @ProcessElement + public void process(ProcessContext c, OffsetRangeTracker tracker) { + checkState(tracker.tryClaim(tracker.currentRestriction().getFrom())); + c.output("main:" + c.element()); + c.output(additionalOutput, "additional:" + c.element()); + } + + @GetInitialRestriction + public OffsetRange getInitialRestriction(Integer value) { + return new OffsetRange(0, 1); + } + } + + @Test + @Category({ValidatesRunner.class, UsesSplittableParDo.class}) + public void testAdditionalOutput() throws Exception { + TupleTag mainOutputTag = new TupleTag("main") {}; + TupleTag additionalOutputTag = new TupleTag("additional") {}; PCollectionTuple res = p.apply("input", Create.of(0, 1, 2)) .apply( - ParDo.of(new SDFWithSideInputsAndOutputs(sideInput, additionalOutputTag)) - .withSideInputs(sideInput) + ParDo.of(new SDFWithAdditionalOutput(additionalOutputTag)) .withOutputTags(mainOutputTag, TupleTagList.of(additionalOutputTag))); - res.get(mainOutputTag).setCoder(StringUtf8Coder.of()); - res.get(additionalOutputTag).setCoder(StringUtf8Coder.of()); PAssert.that(res.get(mainOutputTag)) - .containsInAnyOrder(Arrays.asList("main:foo:0", "main:foo:1", "main:foo:2")); + .containsInAnyOrder(Arrays.asList("main:0", "main:1", "main:2")); PAssert.that(res.get(additionalOutputTag)) - .containsInAnyOrder( - Arrays.asList("additional:foo:0", "additional:foo:1", "additional:foo:2")); + .containsInAnyOrder(Arrays.asList("additional:0", "additional:1", "additional:2")); p.run(); }