Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 3B132200B27 for ; Wed, 8 Jun 2016 04:35:20 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 39050160A4F; Wed, 8 Jun 2016 02:35:20 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 0CDC9160A36 for ; Wed, 8 Jun 2016 04:35:18 +0200 (CEST) Received: (qmail 36297 invoked by uid 500); 8 Jun 2016 02:35:18 -0000 Mailing-List: contact commits-help@beam.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.incubator.apache.org Delivered-To: mailing list commits@beam.incubator.apache.org Received: (qmail 36288 invoked by uid 99); 8 Jun 2016 02:35:18 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 08 Jun 2016 02:35:18 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id AC0F6C02CA for ; Wed, 8 Jun 2016 02:35:17 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.646 X-Spam-Level: X-Spam-Status: No, score=-4.646 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-1.426] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id brex82UlwTHm for ; Wed, 8 Jun 2016 02:35:16 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 926695FAD8 for ; Wed, 8 Jun 2016 02:35:14 +0000 (UTC) Received: (qmail 36185 invoked by uid 99); 8 Jun 2016 02:35:13 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 08 Jun 2016 02:35:13 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 9C0A6E0200; Wed, 8 Jun 2016 02:35:13 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: kenn@apache.org To: commits@beam.incubator.apache.org Date: Wed, 08 Jun 2016 02:35:13 -0000 Message-Id: <8eccae3e0731439ab19a4783e5a43a81@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/2] incubator-beam git commit: Relocate Immutability Enforcement Tests archived-at: Wed, 08 Jun 2016 02:35:20 -0000 Repository: incubator-beam Updated Branches: refs/heads/master 787b3351a -> 9a8cb95db Relocate Immutability Enforcement Tests These tests are of runner behavior rather than the model, and should be tested as a runner test. Stop wrapping IllegalMutationExceptions to surface failures due to model invariant violations directly, rather than going through a PipelineExecutionException. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/de00bd82 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/de00bd82 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/de00bd82 Branch: refs/heads/master Commit: de00bd8269d03ed6eee8fa51c6f7c803e384fd50 Parents: 2173000 Author: Thomas Groh Authored: Tue May 24 18:05:16 2016 -0700 Committer: Thomas Groh Committed: Tue Jun 7 15:14:20 2016 -0700 ---------------------------------------------------------------------- .../ImmutabilityCheckingBundleFactory.java | 22 ++- .../ImmutabilityCheckingBundleFactoryTest.java | 11 +- .../direct/InProcessPipelineRunnerTest.java | 138 ++++++++++++++++++- .../apache/beam/sdk/transforms/ParDoTest.java | 136 +----------------- 4 files changed, 148 insertions(+), 159 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/de00bd82/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java index 92a57dd..2a965ed 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java @@ -27,7 +27,6 @@ import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.util.IllegalMutationException; import org.apache.beam.sdk.util.MutationDetector; import org.apache.beam.sdk.util.MutationDetectors; -import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PCollection; @@ -111,17 +110,16 @@ class ImmutabilityCheckingBundleFactory implements BundleFactory { try { detector.verifyUnmodified(); } catch (IllegalMutationException exn) { - throw UserCodeException.wrap( - new IllegalMutationException( - String.format( - "PTransform %s mutated value %s after it was output (new value was %s)." - + " Values must not be mutated in any way after being output.", - underlying.getPCollection().getProducingTransformInternal().getFullName(), - exn.getSavedValue(), - exn.getNewValue()), - exn.getSavedValue(), - exn.getNewValue(), - exn)); + throw new IllegalMutationException( + String.format( + "PTransform %s mutated value %s after it was output (new value was %s)." + + " Values must not be mutated in any way after being output.", + underlying.getPCollection().getProducingTransformInternal().getFullName(), + exn.getSavedValue(), + exn.getNewValue()), + exn.getSavedValue(), + exn.getNewValue(), + exn); } } return underlying.commit(synchronizedProcessingTime); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/de00bd82/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java index 2e7847d..20670ca 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java @@ -18,7 +18,6 @@ package org.apache.beam.runners.direct; import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.hamcrest.Matchers.isA; import static org.junit.Assert.assertThat; import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle; @@ -32,7 +31,6 @@ import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.IllegalMutationException; -import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PCollection; @@ -168,8 +166,7 @@ public class ImmutabilityCheckingBundleFactoryTest { root.add(WindowedValue.valueInGlobalWindow(array)); array[1] = 2; - thrown.expect(UserCodeException.class); - thrown.expectCause(isA(IllegalMutationException.class)); + thrown.expect(IllegalMutationException.class); thrown.expectMessage("Values must not be mutated in any way after being output"); CommittedBundle committed = root.commit(Instant.now()); } @@ -191,8 +188,7 @@ public class ImmutabilityCheckingBundleFactoryTest { keyed.add(windowedArray); array[0] = Byte.MAX_VALUE; - thrown.expect(UserCodeException.class); - thrown.expectCause(isA(IllegalMutationException.class)); + thrown.expect(IllegalMutationException.class); thrown.expectMessage("Values must not be mutated in any way after being output"); CommittedBundle committed = keyed.commit(Instant.now()); } @@ -212,8 +208,7 @@ public class ImmutabilityCheckingBundleFactoryTest { intermediate.add(windowedArray); array[2] = -3; - thrown.expect(UserCodeException.class); - thrown.expectCause(isA(IllegalMutationException.class)); + thrown.expect(IllegalMutationException.class); thrown.expectMessage("Values must not be mutated in any way after being output"); CommittedBundle committed = intermediate.commit(Instant.now()); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/de00bd82/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessPipelineRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessPipelineRunnerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessPipelineRunnerTest.java index 9314f5e..5c26ac3 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessPipelineRunnerTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessPipelineRunnerTest.java @@ -23,6 +23,7 @@ import static org.junit.Assert.fail; import org.apache.beam.runners.direct.InProcessPipelineRunner.InProcessPipelineResult; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.ListCoder; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; @@ -37,6 +38,7 @@ import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.util.CoderUtils; +import org.apache.beam.sdk.util.IllegalMutationException; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionList; @@ -44,6 +46,7 @@ import org.apache.beam.sdk.values.TypeDescriptor; import com.google.common.collect.ImmutableMap; + import com.fasterxml.jackson.annotation.JsonValue; import org.junit.Rule; @@ -54,6 +57,8 @@ import org.junit.runner.RunWith; import org.junit.runners.JUnit4; import java.io.Serializable; +import java.util.Arrays; +import java.util.List; import java.util.Map; /** @@ -63,6 +68,14 @@ import java.util.Map; public class InProcessPipelineRunnerTest implements Serializable { @Rule public transient ExpectedException thrown = ExpectedException.none(); + private Pipeline getPipeline() { + PipelineOptions opts = PipelineOptionsFactory.create(); + opts.setRunner(InProcessPipelineRunner.class); + + Pipeline p = Pipeline.create(opts); + return p; + } + @Test public void wordCountShouldSucceed() throws Throwable { Pipeline p = getPipeline(); @@ -192,11 +205,126 @@ public class InProcessPipelineRunnerTest implements Serializable { } - private Pipeline getPipeline() { - PipelineOptions opts = PipelineOptionsFactory.create(); - opts.setRunner(InProcessPipelineRunner.class); + /** + * Tests that a {@link DoFn} that mutates an output with a good equals() fails in the + * {@link InProcessPipelineRunner}. + */ + @Test + public void testMutatingOutputThenOutputDoFnError() throws Exception { + Pipeline pipeline = getPipeline(); - Pipeline p = Pipeline.create(opts); - return p; + pipeline + .apply(Create.of(42)) + .apply(ParDo.of(new DoFn>() { + @Override public void processElement(ProcessContext c) { + List outputList = Arrays.asList(1, 2, 3, 4); + c.output(outputList); + outputList.set(0, 37); + c.output(outputList); + } + })); + + thrown.expect(IllegalMutationException.class); + thrown.expectMessage("output"); + thrown.expectMessage("must not be mutated"); + pipeline.run(); + } + + /** + * Tests that a {@link DoFn} that mutates an output with a good equals() fails in the + * {@link InProcessPipelineRunner}. + */ + @Test + public void testMutatingOutputThenTerminateDoFnError() throws Exception { + Pipeline pipeline = getPipeline(); + + pipeline + .apply(Create.of(42)) + .apply(ParDo.of(new DoFn>() { + @Override public void processElement(ProcessContext c) { + List outputList = Arrays.asList(1, 2, 3, 4); + c.output(outputList); + outputList.set(0, 37); + } + })); + + thrown.expect(IllegalMutationException.class); + thrown.expectMessage("output"); + thrown.expectMessage("must not be mutated"); + pipeline.run(); + } + + /** + * Tests that a {@link DoFn} that mutates an output with a bad equals() still fails + * in the {@link InProcessPipelineRunner}. + */ + @Test + public void testMutatingOutputCoderDoFnError() throws Exception { + Pipeline pipeline = getPipeline(); + + pipeline + .apply(Create.of(42)) + .apply(ParDo.of(new DoFn() { + @Override public void processElement(ProcessContext c) { + byte[] outputArray = new byte[]{0x1, 0x2, 0x3}; + c.output(outputArray); + outputArray[0] = 0xa; + c.output(outputArray); + } + })); + + thrown.expect(IllegalMutationException.class); + thrown.expectMessage("output"); + thrown.expectMessage("must not be mutated"); + pipeline.run(); + } + + /** + * Tests that a {@link DoFn} that mutates its input with a good equals() fails in the + * {@link InProcessPipelineRunner}. + */ + @Test + public void testMutatingInputDoFnError() throws Exception { + Pipeline pipeline = getPipeline(); + + pipeline + .apply(Create.of(Arrays.asList(1, 2, 3), Arrays.asList(4, 5, 6)) + .withCoder(ListCoder.of(VarIntCoder.of()))) + .apply(ParDo.of(new DoFn, Integer>() { + @Override public void processElement(ProcessContext c) { + List inputList = c.element(); + inputList.set(0, 37); + c.output(12); + } + })); + + thrown.expect(IllegalMutationException.class); + thrown.expectMessage("Input"); + thrown.expectMessage("must not be mutated"); + pipeline.run(); + } + + /** + * Tests that a {@link DoFn} that mutates an input with a bad equals() still fails + * in the {@link InProcessPipelineRunner}. + */ + @Test + public void testMutatingInputCoderDoFnError() throws Exception { + Pipeline pipeline = getPipeline(); + + pipeline + .apply(Create.of(new byte[]{0x1, 0x2, 0x3}, new byte[]{0x4, 0x5, 0x6})) + .apply(ParDo.of(new DoFn() { + @Override public void processElement(ProcessContext c) { + byte[] inputArray = c.element(); + inputArray[0] = 0xa; + c.output(13); + } + })); + + thrown.expect(IllegalMutationException.class); + thrown.expectMessage("Input"); + thrown.expectMessage("must not be mutated"); + pipeline.run(); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/de00bd82/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java index 5f0f8ec..03ecf6f 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java @@ -24,24 +24,19 @@ import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.include import static org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray; import static org.apache.beam.sdk.util.StringUtils.byteArrayToJsonString; import static org.apache.beam.sdk.util.StringUtils.jsonStringToByteArray; - import static org.hamcrest.Matchers.allOf; -import static org.hamcrest.Matchers.isA; +import static org.hamcrest.Matchers.anyOf; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder; import static org.hamcrest.collection.IsIterableContainingInOrder.contains; -import static org.hamcrest.core.AnyOf.anyOf; -import static org.hamcrest.core.IsEqual.equalTo; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.Pipeline.PipelineExecutionException; import org.apache.beam.sdk.coders.AtomicCoder; import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.coders.ListCoder; import org.apache.beam.sdk.coders.VarIntCoder; -import org.apache.beam.sdk.runners.DirectPipelineRunner; import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.RunnableOnService; @@ -53,7 +48,6 @@ import org.apache.beam.sdk.transforms.display.DisplayData.Builder; import org.apache.beam.sdk.transforms.display.DisplayDataMatchers; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.Window; -import org.apache.beam.sdk.util.IllegalMutationException; import org.apache.beam.sdk.util.common.ElementByteSizeObserver; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; @@ -1420,132 +1414,6 @@ public class ParDoTest implements Serializable { thrown.expectMessage("WindowFn attempted to access input timestamp when none was available"); pipeline.run(); } - - /** - * Tests that a {@link DoFn} that mutates an output with a good equals() fails in the - * {@link DirectPipelineRunner}. - */ - @Test - public void testMutatingOutputThenOutputDoFnError() throws Exception { - Pipeline pipeline = TestPipeline.create(); - - pipeline - .apply(Create.of(42)) - .apply(ParDo.of(new DoFn>() { - @Override public void processElement(ProcessContext c) { - List outputList = Arrays.asList(1, 2, 3, 4); - c.output(outputList); - outputList.set(0, 37); - c.output(outputList); - } - })); - - thrown.expect(PipelineExecutionException.class); - thrown.expectCause(isA(IllegalMutationException.class)); - thrown.expectMessage("output"); - thrown.expectMessage("must not be mutated"); - pipeline.run(); - } - - /** - * Tests that a {@link DoFn} that mutates an output with a good equals() fails in the - * {@link DirectPipelineRunner}. - */ - @Test - public void testMutatingOutputThenTerminateDoFnError() throws Exception { - Pipeline pipeline = TestPipeline.create(); - - pipeline - .apply(Create.of(42)) - .apply(ParDo.of(new DoFn>() { - @Override public void processElement(ProcessContext c) { - List outputList = Arrays.asList(1, 2, 3, 4); - c.output(outputList); - outputList.set(0, 37); - } - })); - - thrown.expect(IllegalMutationException.class); - thrown.expectMessage("output"); - thrown.expectMessage("must not be mutated"); - pipeline.run(); - } - - /** - * Tests that a {@link DoFn} that mutates an output with a bad equals() still fails - * in the {@link DirectPipelineRunner}. - */ - @Test - public void testMutatingOutputCoderDoFnError() throws Exception { - Pipeline pipeline = TestPipeline.create(); - - pipeline - .apply(Create.of(42)) - .apply(ParDo.of(new DoFn() { - @Override public void processElement(ProcessContext c) { - byte[] outputArray = new byte[]{0x1, 0x2, 0x3}; - c.output(outputArray); - outputArray[0] = 0xa; - c.output(outputArray); - } - })); - - thrown.expect(PipelineExecutionException.class); - thrown.expectCause(isA(IllegalMutationException.class)); - thrown.expectMessage("output"); - thrown.expectMessage("must not be mutated"); - pipeline.run(); - } - - /** - * Tests that a {@link DoFn} that mutates its input with a good equals() fails in the - * {@link DirectPipelineRunner}. - */ - @Test - public void testMutatingInputDoFnError() throws Exception { - Pipeline pipeline = TestPipeline.create(); - - pipeline - .apply(Create.of(Arrays.asList(1, 2, 3), Arrays.asList(4, 5, 6)) - .withCoder(ListCoder.of(VarIntCoder.of()))) - .apply(ParDo.of(new DoFn, Integer>() { - @Override public void processElement(ProcessContext c) { - List inputList = c.element(); - inputList.set(0, 37); - c.output(12); - } - })); - - thrown.expect(IllegalMutationException.class); - thrown.expectMessage("input"); - thrown.expectMessage("must not be mutated"); - pipeline.run(); - } - - /** - * Tests that a {@link DoFn} that mutates an input with a bad equals() still fails - * in the {@link DirectPipelineRunner}. - */ - @Test - public void testMutatingInputCoderDoFnError() throws Exception { - Pipeline pipeline = TestPipeline.create(); - - pipeline - .apply(Create.of(new byte[]{0x1, 0x2, 0x3}, new byte[]{0x4, 0x5, 0x6})) - .apply(ParDo.of(new DoFn() { - @Override public void processElement(ProcessContext c) { - byte[] inputArray = c.element(); - inputArray[0] = 0xa; - c.output(13); - } - })); - - thrown.expect(IllegalMutationException.class); - thrown.expectMessage("input"); - thrown.expectMessage("must not be mutated"); - pipeline.run(); - } - @Test public void testDoFnDisplayData() { DoFn fn = new DoFn() {