Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 31B80200BAD for ; Tue, 25 Oct 2016 20:05:50 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 302C0160AF3; Tue, 25 Oct 2016 18:05:50 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 2933A160AD8 for ; Tue, 25 Oct 2016 20:05:49 +0200 (CEST) Received: (qmail 63995 invoked by uid 500); 25 Oct 2016 18:05:48 -0000 Mailing-List: contact commits-help@beam.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.incubator.apache.org Delivered-To: mailing list commits@beam.incubator.apache.org Received: (qmail 63980 invoked by uid 99); 25 Oct 2016 18:05:48 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 25 Oct 2016 18:05:48 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id DF9DBC6C90 for ; Tue, 25 Oct 2016 18:05:47 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -6.219 X-Spam-Level: X-Spam-Status: No, score=-6.219 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-2.999] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id zOOk1KkWrS5o for ; Tue, 25 Oct 2016 18:05:46 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 075015F2F0 for ; Tue, 25 Oct 2016 18:05:38 +0000 (UTC) Received: (qmail 30964 invoked by uid 99); 25 Oct 2016 17:54:53 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 25 Oct 2016 17:54:53 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 78F5FE17B1; Tue, 25 Oct 2016 17:54:53 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: kenn@apache.org To: commits@beam.incubator.apache.org Date: Tue, 25 Oct 2016 17:54:54 -0000 Message-Id: <93e1f85cec0849a484a6f88f46d77066@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [2/2] incubator-beam git commit: Improve teardown behavior in DoFnLifecycleManager archived-at: Tue, 25 Oct 2016 18:05:50 -0000 Improve teardown behavior in DoFnLifecycleManager Use Cache invalidation hooks to teardown DoFns that are no longer in the cache. Ensure that remove() and removeAll() report thrown exceptions even though the exceptions are not thrown by the LoadingCache. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7239ebb0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7239ebb0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7239ebb0 Branch: refs/heads/master Commit: 7239ebb0c76f539f476cea0b44b1070e765cca41 Parents: 79bb2c2 Author: Thomas Groh Authored: Mon Oct 24 13:43:43 2016 -0700 Committer: Kenneth Knowles Committed: Tue Oct 25 10:46:43 2016 -0700 ---------------------------------------------------------------------- .../runners/direct/DoFnLifecycleManager.java | 56 +++++++++------ .../direct/DoFnLifecycleManagerTest.java | 74 ++++++++++++++++++-- 2 files changed, 104 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7239ebb0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManager.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManager.java index 23460b6..472b28b 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManager.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManager.java @@ -21,17 +21,17 @@ package org.apache.beam.runners.direct; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; -import java.util.ArrayList; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; import java.util.Collection; -import java.util.Iterator; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.DoFn.Setup; import org.apache.beam.sdk.transforms.DoFn.Teardown; import org.apache.beam.sdk.transforms.reflect.DoFnInvokers; import org.apache.beam.sdk.util.SerializableUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Manages {@link DoFn} setup, teardown, and serialization. @@ -42,16 +42,18 @@ import org.slf4j.LoggerFactory; * clearing all cached {@link DoFn DoFns}. */ class DoFnLifecycleManager { - private static final Logger LOG = LoggerFactory.getLogger(DoFnLifecycleManager.class); - public static DoFnLifecycleManager of(DoFn original) { return new DoFnLifecycleManager(original); } private final LoadingCache> outstanding; + private final ConcurrentMap thrownOnTeardown; private DoFnLifecycleManager(DoFn original) { - this.outstanding = CacheBuilder.newBuilder().build(new DeserializingCacheLoader(original)); + this.outstanding = CacheBuilder.newBuilder() + .removalListener(new TeardownRemovedFnListener()) + .build(new DeserializingCacheLoader(original)); + thrownOnTeardown = new ConcurrentHashMap<>(); } public DoFn get() throws Exception { @@ -61,8 +63,15 @@ class DoFnLifecycleManager { public void remove() throws Exception { Thread currentThread = Thread.currentThread(); - DoFn fn = outstanding.asMap().remove(currentThread); - DoFnInvokers.INSTANCE.invokerFor(fn).invokeTeardown(); + outstanding.invalidate(currentThread); + // Block until the invalidate is fully completed + outstanding.cleanUp(); + // Remove to try too avoid reporting the same teardown exception twice. May still double-report, + // but the second will be suppressed. + Exception thrown = thrownOnTeardown.remove(currentThread); + if (thrown != null) { + throw thrown; + } } /** @@ -73,21 +82,13 @@ class DoFnLifecycleManager { * DoFn.Teardown @Teardown} method, and the {@link PipelineRunner} should throw an exception. */ public Collection removeAll() throws Exception { - Iterator> fns = outstanding.asMap().values().iterator(); - Collection thrown = new ArrayList<>(); - while (fns.hasNext()) { - DoFn fn = fns.next(); - fns.remove(); - try { - DoFnInvokers.INSTANCE.invokerFor(fn).invokeTeardown(); - } catch (Exception e) { - thrown.add(e); - } - } - return thrown; + outstanding.invalidateAll(); + // Make sure all of the teardowns are run + outstanding.cleanUp(); + return thrownOnTeardown.values(); } - private class DeserializingCacheLoader extends CacheLoader> { + private static class DeserializingCacheLoader extends CacheLoader> { private final byte[] original; public DeserializingCacheLoader(DoFn original) { @@ -102,4 +103,15 @@ class DoFnLifecycleManager { return fn; } } + + private class TeardownRemovedFnListener implements RemovalListener> { + @Override + public void onRemoval(RemovalNotification> notification) { + try { + DoFnInvokers.INSTANCE.newByteBuddyInvoker(notification.getValue()).invokeTeardown(); + } catch (Exception e) { + thrownOnTeardown.put(notification.getKey(), e); + } + } + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7239ebb0/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerTest.java index aef9d29..59e1e16 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerTest.java @@ -21,6 +21,7 @@ package org.apache.beam.runners.direct; import static com.google.common.base.Preconditions.checkState; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.isA; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.theInstance; import static org.junit.Assert.assertThat; @@ -34,8 +35,11 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.util.UserCodeException; import org.hamcrest.Matchers; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -44,6 +48,8 @@ import org.junit.runners.JUnit4; */ @RunWith(JUnit4.class) public class DoFnLifecycleManagerTest { + @Rule public ExpectedException thrown = ExpectedException.none(); + private TestFn fn = new TestFn(); private DoFnLifecycleManager mgr = DoFnLifecycleManager.of(fn); @@ -105,6 +111,17 @@ public class DoFnLifecycleManagerTest { } @Test + public void teardownThrowsRemoveThrows() throws Exception { + TestFn obtained = (TestFn) mgr.get(); + obtained.teardown(); + + thrown.expect(UserCodeException.class); + thrown.expectCause(isA(IllegalStateException.class)); + thrown.expectMessage("Cannot call teardown: already torn down"); + mgr.remove(); + } + + @Test public void teardownAllOnRemoveAll() throws Exception { CountDownLatch startSignal = new CountDownLatch(1); ExecutorService executor = Executors.newCachedThreadPool(); @@ -125,6 +142,38 @@ public class DoFnLifecycleManagerTest { } } + @Test + public void removeAndRemoveAllConcurrent() throws Exception { + CountDownLatch startSignal = new CountDownLatch(1); + ExecutorService executor = Executors.newCachedThreadPool(); + List> futures = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + futures.add(executor.submit(new GetFnCallable(mgr, startSignal))); + } + startSignal.countDown(); + List fns = new ArrayList<>(); + for (Future future : futures) { + fns.add(future.get(1L, TimeUnit.SECONDS)); + } + CountDownLatch removeSignal = new CountDownLatch(1); + List> removeFutures = new ArrayList<>(); + for (int i = 0; i < 5; i++) { + // These will reuse the threads used in the GetFns + removeFutures.add(executor.submit(new TeardownFnCallable(mgr, removeSignal))); + } + removeSignal.countDown(); + assertThat(mgr.removeAll(), Matchers.emptyIterable()); + for (Future removed : removeFutures) { + // Should not have thrown an exception. + removed.get(); + } + + for (TestFn fn : fns) { + assertThat(fn.setupCalled, is(true)); + assertThat(fn.teardownCalled, is(true)); + } + } + private static class GetFnCallable implements Callable { private final DoFnLifecycleManager mgr; private final CountDownLatch startSignal; @@ -141,6 +190,23 @@ public class DoFnLifecycleManagerTest { } } + private static class TeardownFnCallable implements Callable { + private final DoFnLifecycleManager mgr; + private final CountDownLatch startSignal; + + private TeardownFnCallable(DoFnLifecycleManager mgr, CountDownLatch startSignal) { + this.mgr = mgr; + this.startSignal = startSignal; + } + + @Override + public Void call() throws Exception { + startSignal.await(); + // Will throw an exception if the TestFn has already been removed from this thread + mgr.remove(); + return null; + } + } private static class TestFn extends DoFn { boolean setupCalled = false; @@ -148,8 +214,8 @@ public class DoFnLifecycleManagerTest { @Setup public void setup() { - checkState(!setupCalled); - checkState(!teardownCalled); + checkState(!setupCalled, "Cannot call setup: already set up"); + checkState(!teardownCalled, "Cannot call setup: already torn down"); setupCalled = true; } @@ -160,8 +226,8 @@ public class DoFnLifecycleManagerTest { @Teardown public void teardown() { - checkState(setupCalled); - checkState(!teardownCalled); + checkState(setupCalled, "Cannot call teardown: not set up"); + checkState(!teardownCalled, "Cannot call teardown: already torn down"); teardownCalled = true; }