Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 6D234200BA8 for ; Mon, 24 Oct 2016 18:11:06 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 6B9A7160AD7; Mon, 24 Oct 2016 16:11:06 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 18230160AEB for ; Mon, 24 Oct 2016 18:11:04 +0200 (CEST) Received: (qmail 33822 invoked by uid 500); 24 Oct 2016 16:11:04 -0000 Mailing-List: contact commits-help@beam.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.incubator.apache.org Delivered-To: mailing list commits@beam.incubator.apache.org Received: (qmail 33813 invoked by uid 99); 24 Oct 2016 16:11:04 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 24 Oct 2016 16:11:04 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id D0E19C059C for ; Mon, 24 Oct 2016 16:11:03 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -6.219 X-Spam-Level: X-Spam-Status: No, score=-6.219 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-2.999] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id xu9BXIOhyZvU for ; Mon, 24 Oct 2016 16:10:59 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 42E5F5FC55 for ; Mon, 24 Oct 2016 16:10:57 +0000 (UTC) Received: (qmail 32127 invoked by uid 99); 24 Oct 2016 16:10:56 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 24 Oct 2016 16:10:56 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 08FB3F16B8; Mon, 24 Oct 2016 16:10:56 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: kenn@apache.org To: commits@beam.incubator.apache.org Date: Mon, 24 Oct 2016 16:11:06 -0000 Message-Id: <73ecd227e18a4fbebcf83ff5206fb070@git.apache.org> In-Reply-To: <04e6314fca9540ac8eef9adb47a362b1@git.apache.org> References: <04e6314fca9540ac8eef9adb47a362b1@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [12/14] incubator-beam git commit: Port direct runner to use new DoFn directly archived-at: Mon, 24 Oct 2016 16:11:06 -0000 Port direct runner to use new DoFn directly Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1919d8b3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1919d8b3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1919d8b3 Branch: refs/heads/master Commit: 1919d8b3a850bd146137652546da851ee461cd28 Parents: f0c8d30 Author: Kenneth Knowles Authored: Thu Oct 20 20:55:00 2016 -0700 Committer: Kenneth Knowles Committed: Sun Oct 23 21:04:17 2016 -0700 ---------------------------------------------------------------------- .../runners/direct/DoFnLifecycleManager.java | 42 +++++++++-------- .../beam/runners/direct/ParDoEvaluator.java | 3 +- .../direct/ParDoMultiEvaluatorFactory.java | 6 +-- .../direct/ParDoSingleEvaluatorFactory.java | 5 +- ...leManagerRemovingTransformEvaluatorTest.java | 16 +++---- .../direct/DoFnLifecycleManagerTest.java | 12 ++--- .../direct/DoFnLifecycleManagersTest.java | 48 ++++++++++++++++---- .../direct/ParDoMultiEvaluatorFactoryTest.java | 11 +++++ .../direct/ParDoSingleEvaluatorFactoryTest.java | 11 +++++ .../beam/runners/direct/SplittableDoFnTest.java | 8 +++- .../org/apache/beam/sdk/transforms/OldDoFn.java | 23 ++++++++-- 11 files changed, 130 insertions(+), 55 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1919d8b3/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManager.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManager.java index 0e15c18..23460b6 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManager.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManager.java @@ -26,7 +26,9 @@ import java.util.Collection; import java.util.Iterator; import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.DoFn.Setup; +import org.apache.beam.sdk.transforms.DoFn.Teardown; +import org.apache.beam.sdk.transforms.reflect.DoFnInvokers; import org.apache.beam.sdk.util.SerializableUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,49 +37,49 @@ import org.slf4j.LoggerFactory; * Manages {@link DoFn} setup, teardown, and serialization. * *

{@link DoFnLifecycleManager} is similar to a {@link ThreadLocal} storing a {@link DoFn}, but - * calls the {@link DoFn} {@link Setup} the first time the {@link DoFn} is obtained and {@link - * Teardown} whenever the {@link DoFn} is removed, and provides a method for clearing all cached - * {@link DoFn DoFns}. + * calls the {@link DoFn} {@link Setup @Setup} method the first time the {@link DoFn} is obtained + * and {@link Teardown @Teardown} whenever the {@link DoFn} is removed, and provides a method for + * clearing all cached {@link DoFn DoFns}. */ class DoFnLifecycleManager { private static final Logger LOG = LoggerFactory.getLogger(DoFnLifecycleManager.class); - public static DoFnLifecycleManager of(OldDoFn original) { + public static DoFnLifecycleManager of(DoFn original) { return new DoFnLifecycleManager(original); } - private final LoadingCache> outstanding; + private final LoadingCache> outstanding; - private DoFnLifecycleManager(OldDoFn original) { + private DoFnLifecycleManager(DoFn original) { this.outstanding = CacheBuilder.newBuilder().build(new DeserializingCacheLoader(original)); } - public OldDoFn get() throws Exception { + public DoFn get() throws Exception { Thread currentThread = Thread.currentThread(); return outstanding.get(currentThread); } public void remove() throws Exception { Thread currentThread = Thread.currentThread(); - OldDoFn fn = outstanding.asMap().remove(currentThread); - fn.teardown(); + DoFn fn = outstanding.asMap().remove(currentThread); + DoFnInvokers.INSTANCE.invokerFor(fn).invokeTeardown(); } /** * Remove all {@link DoFn DoFns} from this {@link DoFnLifecycleManager}. Returns all exceptions * that were thrown while calling the remove methods. * - *

If the returned Collection is nonempty, an exception was thrown from at least one - * {@link DoFn#teardown()} method, and the {@link PipelineRunner} should throw an exception. + *

If the returned Collection is nonempty, an exception was thrown from at least one {@link + * DoFn.Teardown @Teardown} method, and the {@link PipelineRunner} should throw an exception. */ public Collection removeAll() throws Exception { - Iterator> fns = outstanding.asMap().values().iterator(); + Iterator> fns = outstanding.asMap().values().iterator(); Collection thrown = new ArrayList<>(); while (fns.hasNext()) { - OldDoFn fn = fns.next(); + DoFn fn = fns.next(); fns.remove(); try { - fn.teardown(); + DoFnInvokers.INSTANCE.invokerFor(fn).invokeTeardown(); } catch (Exception e) { thrown.add(e); } @@ -85,18 +87,18 @@ class DoFnLifecycleManager { return thrown; } - private class DeserializingCacheLoader extends CacheLoader> { + private class DeserializingCacheLoader extends CacheLoader> { private final byte[] original; - public DeserializingCacheLoader(OldDoFn original) { + public DeserializingCacheLoader(DoFn original) { this.original = SerializableUtils.serializeToByteArray(original); } @Override - public OldDoFn load(Thread key) throws Exception { - OldDoFn fn = (OldDoFn) SerializableUtils.deserializeFromByteArray(original, + public DoFn load(Thread key) throws Exception { + DoFn fn = (DoFn) SerializableUtils.deserializeFromByteArray(original, "DoFn Copy in thread " + key.getName()); - fn.setup(); + DoFnInvokers.INSTANCE.invokerFor(fn).invokeSetup(); return fn; } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1919d8b3/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java index a59fb4d..b524dfa 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java @@ -18,6 +18,7 @@ package org.apache.beam.runners.direct; import com.google.common.collect.ImmutableList; +import java.io.Serializable; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -45,7 +46,7 @@ class ParDoEvaluator implements TransformEvaluator { DirectStepContext stepContext, CommittedBundle inputBundle, AppliedPTransform, ?, ?> application, - Object fn, + Serializable fn, // may be OldDoFn or DoFn List> sideInputs, TupleTag mainOutputTag, List> sideOutputTags, http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1919d8b3/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java index d909e8b..02469ff 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java @@ -24,7 +24,7 @@ import java.util.Map; import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext; import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; import org.apache.beam.sdk.transforms.AppliedPTransform; -import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo.BoundMulti; import org.apache.beam.sdk.values.PCollection; @@ -50,7 +50,7 @@ class ParDoMultiEvaluatorFactory implements TransformEvaluatorFactory { public DoFnLifecycleManager load(AppliedPTransform key) throws Exception { BoundMulti bound = (BoundMulti) key.getTransform(); - return DoFnLifecycleManager.of(bound.getFn()); + return DoFnLifecycleManager.of(bound.getNewFn()); } }); } @@ -87,7 +87,7 @@ class ParDoMultiEvaluatorFactory implements TransformEvaluatorFactory { stepContext, inputBundle, application, - (OldDoFn) fnLocal.get(), + (DoFn) fnLocal.get(), application.getTransform().getSideInputs(), application.getTransform().getMainOutputTag(), application.getTransform().getSideOutputTags().getAll(), http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1919d8b3/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java index 1a06ea6..0584e41 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java @@ -25,7 +25,6 @@ import java.util.Collections; import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext; import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; import org.apache.beam.sdk.transforms.AppliedPTransform; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo.Bound; import org.apache.beam.sdk.values.PCollection; @@ -52,7 +51,7 @@ class ParDoSingleEvaluatorFactory implements TransformEvaluatorFactory { public DoFnLifecycleManager load(AppliedPTransform key) throws Exception { Bound bound = (Bound) key.getTransform(); - return DoFnLifecycleManager.of(bound.getFn()); + return DoFnLifecycleManager.of(bound.getNewFn()); } }); } @@ -92,7 +91,7 @@ class ParDoSingleEvaluatorFactory implements TransformEvaluatorFactory { stepContext, inputBundle, application, - (OldDoFn) fnLocal.get(), + fnLocal.get(), application.getTransform().getSideInputs(), mainOutputTag, Collections.>emptyList(), http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1919d8b3/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java index 2e4fee2..9e2732e 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java @@ -27,7 +27,7 @@ import static org.junit.Assert.fail; import java.util.ArrayList; import java.util.List; -import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.util.WindowedValue; import org.hamcrest.Matchers; import org.junit.Before; @@ -50,7 +50,7 @@ public class DoFnLifecycleManagerRemovingTransformEvaluatorTest { @Test public void delegatesToUnderlying() throws Exception { RecordingTransformEvaluator underlying = new RecordingTransformEvaluator(); - OldDoFn original = lifecycleManager.get(); + DoFn original = lifecycleManager.get(); TransformEvaluator evaluator = DoFnLifecycleManagerRemovingTransformEvaluator.wrapping(underlying, lifecycleManager); WindowedValue first = WindowedValue.valueInGlobalWindow(new Object()); @@ -67,7 +67,7 @@ public class DoFnLifecycleManagerRemovingTransformEvaluatorTest { @Test public void removesOnExceptionInProcessElement() throws Exception { ThrowingTransformEvaluator underlying = new ThrowingTransformEvaluator(); - OldDoFn original = lifecycleManager.get(); + DoFn original = lifecycleManager.get(); assertThat(original, not(nullValue())); TransformEvaluator evaluator = DoFnLifecycleManagerRemovingTransformEvaluator.wrapping(underlying, lifecycleManager); @@ -75,7 +75,7 @@ public class DoFnLifecycleManagerRemovingTransformEvaluatorTest { try { evaluator.processElement(WindowedValue.valueInGlobalWindow(new Object())); } catch (Exception e) { - assertThat(lifecycleManager.get(), not(Matchers.>theInstance(original))); + assertThat(lifecycleManager.get(), not(Matchers.>theInstance(original))); return; } fail("Expected ThrowingTransformEvaluator to throw on method call"); @@ -84,7 +84,7 @@ public class DoFnLifecycleManagerRemovingTransformEvaluatorTest { @Test public void removesOnExceptionInFinishBundle() throws Exception { ThrowingTransformEvaluator underlying = new ThrowingTransformEvaluator(); - OldDoFn original = lifecycleManager.get(); + DoFn original = lifecycleManager.get(); // the LifecycleManager is set when the evaluator starts assertThat(original, not(nullValue())); TransformEvaluator evaluator = @@ -94,7 +94,7 @@ public class DoFnLifecycleManagerRemovingTransformEvaluatorTest { evaluator.finishBundle(); } catch (Exception e) { assertThat(lifecycleManager.get(), - Matchers.not(Matchers.>theInstance(original))); + Matchers.not(Matchers.>theInstance(original))); return; } fail("Expected ThrowingTransformEvaluator to throw on method call"); @@ -134,8 +134,8 @@ public class DoFnLifecycleManagerRemovingTransformEvaluatorTest { } - private static class TestFn extends OldDoFn { - @Override + private static class TestFn extends DoFn { + @ProcessElement public void processElement(ProcessContext c) throws Exception { } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1919d8b3/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerTest.java index 1f0af99..aef9d29 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerTest.java @@ -33,7 +33,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.DoFn; import org.hamcrest.Matchers; import org.junit.Test; import org.junit.runner.RunWith; @@ -101,7 +101,7 @@ public class DoFnLifecycleManagerTest { assertThat(obtained.setupCalled, is(true)); assertThat(obtained.teardownCalled, is(true)); - assertThat(mgr.get(), not(Matchers.>theInstance(obtained))); + assertThat(mgr.get(), not(Matchers.>theInstance(obtained))); } @Test @@ -142,11 +142,11 @@ public class DoFnLifecycleManagerTest { } - private static class TestFn extends OldDoFn { + private static class TestFn extends DoFn { boolean setupCalled = false; boolean teardownCalled = false; - @Override + @Setup public void setup() { checkState(!setupCalled); checkState(!teardownCalled); @@ -154,11 +154,11 @@ public class DoFnLifecycleManagerTest { setupCalled = true; } - @Override + @ProcessElement public void processElement(ProcessContext c) throws Exception { } - @Override + @Teardown public void teardown() { checkState(setupCalled); checkState(!teardownCalled); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1919d8b3/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagersTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagersTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagersTest.java index 39a4a9d..a19ff99 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagersTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagersTest.java @@ -18,12 +18,15 @@ package org.apache.beam.runners.direct; +import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; import com.google.common.collect.ImmutableList; import java.util.ArrayList; import java.util.Collection; -import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.util.UserCodeException; import org.hamcrest.BaseMatcher; import org.hamcrest.Description; import org.hamcrest.Matcher; @@ -51,9 +54,15 @@ public class DoFnLifecycleManagersTest { third.get(); final Collection> suppressions = new ArrayList<>(); - suppressions.add(new ThrowableMessageMatcher("foo")); - suppressions.add(new ThrowableMessageMatcher("bar")); - suppressions.add(new ThrowableMessageMatcher("baz")); + suppressions.add(allOf( + instanceOf(UserCodeException.class), + new CausedByMatcher(new ThrowableMessageMatcher("foo")))); + suppressions.add(allOf( + instanceOf(UserCodeException.class), + new CausedByMatcher(new ThrowableMessageMatcher("bar")))); + suppressions.add(allOf( + instanceOf(UserCodeException.class), + new CausedByMatcher(new ThrowableMessageMatcher("baz")))); thrown.expect( new BaseMatcher() { @@ -90,18 +99,18 @@ public class DoFnLifecycleManagersTest { DoFnLifecycleManagers.removeAllFromManagers(ImmutableList.of(first, second, third)); } - private static class ThrowsInCleanupFn extends OldDoFn { + private static class ThrowsInCleanupFn extends DoFn { private final String message; private ThrowsInCleanupFn(String message) { this.message = message; } - @Override + @ProcessElement public void processElement(ProcessContext c) throws Exception { } - @Override + @Teardown public void teardown() throws Exception { throw new Exception(message); } @@ -130,9 +139,32 @@ public class DoFnLifecycleManagersTest { } } + private static class CausedByMatcher extends BaseMatcher { + private final Matcher causeMatcher; + + public CausedByMatcher( + Matcher causeMatcher) { + this.causeMatcher = causeMatcher; + } - private static class EmptyFn extends OldDoFn { @Override + public boolean matches(Object item) { + if (!(item instanceof UserCodeException)) { + return false; + } + UserCodeException that = (UserCodeException) item; + return causeMatcher.matches(that.getCause()); + } + + @Override + public void describeTo(Description description) { + description.appendText("a throwable with a cause ").appendDescriptionOf(causeMatcher); + } + } + + + private static class EmptyFn extends DoFn { + @ProcessElement public void processElement(ProcessContext c) throws Exception { } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1919d8b3/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java index 88e1484..8b0070b 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java @@ -57,6 +57,7 @@ import org.apache.beam.sdk.values.TupleTagList; import org.hamcrest.Matchers; import org.joda.time.Duration; import org.joda.time.Instant; +import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -236,6 +237,11 @@ public class ParDoMultiEvaluatorFactoryTest implements Serializable { WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING))); } + /** + * This test ignored, as today testing of GroupByKey is all the state that needs testing. + * This should be ported to state when ready. + */ + @Ignore("State is not supported until BEAM-25. GroupByKey tests the needed functionality.") @Test public void finishBundleWithStatePutsStateInResult() throws Exception { TestPipeline p = TestPipeline.create(); @@ -321,6 +327,11 @@ public class ParDoMultiEvaluatorFactoryTest implements Serializable { containsInAnyOrder("foo", "bara", "bazam")); } + /** + * This test ignored, as today testing of GroupByKey is all the state that needs testing. + * This should be ported to state when ready. + */ + @Ignore("State is not supported until BEAM-25. GroupByKey tests the needed functionality.") @Test public void finishBundleWithStateAndTimersPutsTimersInResult() throws Exception { TestPipeline p = TestPipeline.create(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1919d8b3/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java index 6a02e40..e562b28 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java @@ -54,6 +54,7 @@ import org.apache.beam.sdk.values.TupleTag; import org.hamcrest.Matchers; import org.joda.time.Duration; import org.joda.time.Instant; +import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -166,6 +167,11 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable { assertThat(result.getAggregatorChanges(), equalTo(mutator)); } + /** + * This test ignored, as today testing of GroupByKey is all the state that needs testing. + * This should be ported to state when ready. + */ + @Ignore("State is not supported until BEAM-25. GroupByKey tests the needed functionality.") @Test public void finishBundleWithStatePutsStateInResult() throws Exception { TestPipeline p = TestPipeline.create(); @@ -238,6 +244,11 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable { containsInAnyOrder("foo", "bara", "bazam")); } + /** + * This test ignored, as today testing of GroupByKey is all the state that needs testing. + * This should be ported to state when ready. + */ + @Ignore("State is not supported until BEAM-25. GroupByKey tests the needed functionality.") @Test public void finishBundleWithStateAndTimersPutsTimersInResult() throws Exception { TestPipeline p = TestPipeline.create(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1919d8b3/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SplittableDoFnTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SplittableDoFnTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SplittableDoFnTest.java index 84a0cd9..c164ce6 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SplittableDoFnTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SplittableDoFnTest.java @@ -34,7 +34,6 @@ import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.DoFn.ProcessElement; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; @@ -46,6 +45,7 @@ import org.apache.beam.sdk.values.TimestampedValue; import org.joda.time.Duration; import org.joda.time.Instant; import org.joda.time.MutableDateTime; +import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -140,6 +140,9 @@ public class SplittableDoFnTest { } } + @Ignore( + "BEAM-801: SplittableParDo uses unsupported OldDoFn features that are not available in DoFn; " + + "It must be implemented as a primitive.") @Test public void testPairWithIndexBasic() throws ClassNotFoundException { Pipeline p = TestPipeline.create(); @@ -164,6 +167,9 @@ public class SplittableDoFnTest { p.run(); } + @Ignore( + "BEAM-801: SplittableParDo uses unsupported OldDoFn features that are not available in DoFn; " + + "It must be implemented as a primitive.") @Test public void testPairWithIndexWindowedTimestamped() throws ClassNotFoundException { // Tests that Splittable DoFn correctly propagates windowing strategy, windows and timestamps http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1919d8b3/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java index a9f26a4..f16e0b3 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java @@ -73,6 +73,10 @@ import org.joda.time.Instant; public abstract class OldDoFn implements Serializable, HasDisplayData { public DoFn toDoFn() { + DoFn doFn = DoFnAdapters.getDoFn(this); + if (doFn != null) { + return doFn; + } if (this instanceof RequiresWindowAccess) { // No parameters as it just accesses `this` return new AdaptedRequiresWindowAccessDoFn(); @@ -553,8 +557,7 @@ public abstract class OldDoFn implements Serializable, HasDispl private final DoFn.ProcessContext newContext; - public AdaptedProcessContext( - DoFn.ProcessContext newContext) { + public AdaptedProcessContext(DoFn.ProcessContext newContext) { this.newContext = newContext; } @@ -632,21 +635,31 @@ public abstract class OldDoFn implements Serializable, HasDispl private class AdaptedDoFn extends DoFn { + @Setup + public void setup() throws Exception { + OldDoFn.this.setup(); + } + @StartBundle - public void startBundle(DoFn.Context c) throws Exception { + public void startBundle(Context c) throws Exception { OldDoFn.this.startBundle(OldDoFn.this.new AdaptedContext(c)); } @ProcessElement - public void processElement(DoFn.ProcessContext c) throws Exception { + public void processElement(ProcessContext c) throws Exception { OldDoFn.this.processElement(OldDoFn.this.new AdaptedProcessContext(c)); } @FinishBundle - public void finishBundle(DoFn.Context c) throws Exception { + public void finishBundle(Context c) throws Exception { OldDoFn.this.finishBundle(OldDoFn.this.new AdaptedContext(c)); } + @Teardown + public void teardown() throws Exception { + OldDoFn.this.teardown(); + } + @Override public Duration getAllowedTimestampSkew() { return OldDoFn.this.getAllowedTimestampSkew();