Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 04E40200D06 for ; Mon, 25 Sep 2017 17:47:55 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 032441609E9; Mon, 25 Sep 2017 15:47:55 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id A088A1609BB for ; Mon, 25 Sep 2017 17:47:53 +0200 (CEST) Received: (qmail 18096 invoked by uid 500); 25 Sep 2017 15:47:52 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 18086 invoked by uid 99); 25 Sep 2017 15:47:52 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 25 Sep 2017 15:47:52 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 20AC8F564E; Mon, 25 Sep 2017 15:47:50 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: srichter@apache.org To: commits@flink.apache.org Date: Mon, 25 Sep 2017 15:47:50 -0000 Message-Id: <769df03911914c1a82afc633aef01c16@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/2] flink git commit: [FLINK-7524] Remove potentially blocking behaviour from AbstractCloseableRegistry. archived-at: Mon, 25 Sep 2017 15:47:55 -0000 Repository: flink Updated Branches: refs/heads/master 1ebd44a63 -> 5af463a9c [FLINK-7524] Remove potentially blocking behaviour from AbstractCloseableRegistry. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0073204b Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0073204b Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0073204b Branch: refs/heads/master Commit: 0073204b257860ad104cde29d3795b3c633f4759 Parents: 1ebd44a Author: Stefan Richter Authored: Mon Sep 4 12:30:41 2017 +0200 Committer: Stefan Richter Committed: Mon Sep 25 16:04:06 2017 +0200 ---------------------------------------------------------------------- .../apache/flink/core/fs/CloseableRegistry.java | 9 +- .../core/fs/SafetyNetCloseableRegistry.java | 35 ++- .../flink/util/AbstractCloseableRegistry.java | 92 ++++++-- .../apache/flink/util/WrappingProxyUtil.java | 8 +- .../core/fs/AbstractCloseableRegistryTest.java | 223 +++++++++++++++++++ .../flink/core/fs/CloseableRegistryTest.java | 59 +++++ .../core/fs/SafetyNetCloseableRegistryTest.java | 211 +++++------------- ...StateSnapshotContextSynchronousImplTest.java | 4 +- 8 files changed, 449 insertions(+), 192 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/0073204b/flink-core/src/main/java/org/apache/flink/core/fs/CloseableRegistry.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/CloseableRegistry.java b/flink-core/src/main/java/org/apache/flink/core/fs/CloseableRegistry.java index 0d4ea0c..29f363c 100644 --- a/flink-core/src/main/java/org/apache/flink/core/fs/CloseableRegistry.java +++ b/flink-core/src/main/java/org/apache/flink/core/fs/CloseableRegistry.java @@ -21,8 +21,9 @@ package org.apache.flink.core.fs; import org.apache.flink.annotation.Internal; import org.apache.flink.util.AbstractCloseableRegistry; +import javax.annotation.Nonnull; + import java.io.Closeable; -import java.io.IOException; import java.util.HashMap; import java.util.Map; @@ -39,16 +40,16 @@ public class CloseableRegistry extends AbstractCloseableRegistry()); + super(new HashMap<>()); } @Override - protected void doRegister(Closeable closeable, Map closeableMap) throws IOException { + protected void doRegister(@Nonnull Closeable closeable, @Nonnull Map closeableMap) { closeableMap.put(closeable, DUMMY); } @Override - protected void doUnRegister(Closeable closeable, Map closeableMap) { + protected void doUnRegister(@Nonnull Closeable closeable, @Nonnull Map closeableMap) { closeableMap.remove(closeable); } } http://git-wip-us.apache.org/repos/asf/flink/blob/0073204b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java index 8b28fa2..6097334 100644 --- a/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java +++ b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java @@ -27,7 +27,8 @@ import org.apache.flink.util.WrappingProxyUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.annotation.concurrent.GuardedBy; +import javax.annotation.Nonnull; + import java.io.Closeable; import java.io.IOException; import java.lang.ref.PhantomReference; @@ -53,19 +54,17 @@ public class SafetyNetCloseableRegistry extends private static final Logger LOG = LoggerFactory.getLogger(SafetyNetCloseableRegistry.class); - /** Lock for accessing reaper thread and registry count */ + /** Lock for atomic modifications to reaper thread and registry count */ private static final Object REAPER_THREAD_LOCK = new Object(); /** Singleton reaper thread takes care of all registries in VM */ - @GuardedBy("REAPER_THREAD_LOCK") private static CloseableReaperThread REAPER_THREAD = null; /** Global count of all instances of SafetyNetCloseableRegistry */ - @GuardedBy("REAPER_THREAD_LOCK") private static int GLOBAL_SAFETY_NET_REGISTRY_COUNT = 0; - public SafetyNetCloseableRegistry() { - super(new IdentityHashMap()); + SafetyNetCloseableRegistry() { + super(new IdentityHashMap<>()); synchronized (REAPER_THREAD_LOCK) { if (0 == GLOBAL_SAFETY_NET_REGISTRY_COUNT) { @@ -79,8 +78,8 @@ public class SafetyNetCloseableRegistry extends @Override protected void doRegister( - WrappingProxyCloseable wrappingProxyCloseable, - Map closeableMap) throws IOException { + @Nonnull WrappingProxyCloseable wrappingProxyCloseable, + @Nonnull Map closeableMap) { assert Thread.holdsLock(getSynchronizationLock()); @@ -100,8 +99,8 @@ public class SafetyNetCloseableRegistry extends @Override protected void doUnRegister( - WrappingProxyCloseable closeable, - Map closeableMap) { + @Nonnull WrappingProxyCloseable closeable, + @Nonnull Map closeableMap) { assert Thread.holdsLock(getSynchronizationLock()); @@ -131,7 +130,7 @@ public class SafetyNetCloseableRegistry extends } @VisibleForTesting - public static boolean isReaperThreadRunning() { + static boolean isReaperThreadRunning() { synchronized (REAPER_THREAD_LOCK) { return null != REAPER_THREAD && REAPER_THREAD.isAlive(); } @@ -148,10 +147,10 @@ public class SafetyNetCloseableRegistry extends private final SafetyNetCloseableRegistry closeableRegistry; private final String debugString; - public PhantomDelegatingCloseableRef( - WrappingProxyCloseable referent, - SafetyNetCloseableRegistry closeableRegistry, - ReferenceQueue> q) { + PhantomDelegatingCloseableRef( + WrappingProxyCloseable referent, + SafetyNetCloseableRegistry closeableRegistry, + ReferenceQueue> q) { super(referent, q); this.innerCloseable = Preconditions.checkNotNull(WrappingProxyUtil.stripProxy(referent)); @@ -159,15 +158,13 @@ public class SafetyNetCloseableRegistry extends this.debugString = referent.toString(); } - public String getDebugString() { + String getDebugString() { return debugString; } @Override public void close() throws IOException { - synchronized (closeableRegistry.getSynchronizationLock()) { - closeableRegistry.closeableToRef.remove(innerCloseable); - } + closeableRegistry.removeCloseableInternal(innerCloseable); innerCloseable.close(); } } http://git-wip-us.apache.org/repos/asf/flink/blob/0073204b/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java b/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java index f949779..4527b5e 100644 --- a/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java +++ b/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java @@ -19,9 +19,15 @@ package org.apache.flink.util; import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; + +import javax.annotation.Nonnull; +import javax.annotation.concurrent.GuardedBy; import java.io.Closeable; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; import java.util.Map; /** @@ -38,11 +44,20 @@ import java.util.Map; @Internal public abstract class AbstractCloseableRegistry implements Closeable { - protected final Map closeableToRef; + /** Lock that guards state of this registry. **/ + private final Object lock; + + /** Map from tracked Closeables to some associated meta data. */ + @GuardedBy("lock") + private final Map closeableToRef; + + /** Indicates if this registry is closed. */ + @GuardedBy("lock") private boolean closed; - public AbstractCloseableRegistry(Map closeableToRef) { - this.closeableToRef = closeableToRef; + public AbstractCloseableRegistry(@Nonnull Map closeableToRef) { + this.lock = new Object(); + this.closeableToRef = Preconditions.checkNotNull(closeableToRef); this.closed = false; } @@ -51,7 +66,6 @@ public abstract class AbstractCloseableRegistry implemen * {@link IllegalStateException} and closes the passed {@link Closeable}. * * @param closeable Closeable tor register - * * @throws IOException exception when the registry was closed before */ public final void registerClosable(C closeable) throws IOException { @@ -61,13 +75,14 @@ public abstract class AbstractCloseableRegistry implemen } synchronized (getSynchronizationLock()) { - if (closed) { - IOUtils.closeQuietly(closeable); - throw new IOException("Cannot register Closeable, registry is already closed. Closing argument."); + if (!closed) { + doRegister(closeable, closeableToRef); + return; } - - doRegister(closeable, closeableToRef); } + + IOUtils.closeQuietly(closeable); + throw new IOException("Cannot register Closeable, registry is already closed. Closing argument."); } /** @@ -88,18 +103,22 @@ public abstract class AbstractCloseableRegistry implemen @Override public void close() throws IOException { + Collection toCloseCopy; + synchronized (getSynchronizationLock()) { if (closed) { return; } - IOUtils.closeAllQuietly(closeableToRef.keySet()); + closed = true; - closeableToRef.clear(); + toCloseCopy = new ArrayList<>(closeableToRef.keySet()); - closed = true; + closeableToRef.clear(); } + + IOUtils.closeAllQuietly(toCloseCopy); } public boolean isClosed() { @@ -108,11 +127,54 @@ public abstract class AbstractCloseableRegistry implemen } } + /** + * Does the actual registration of the closeable with the registry map. This should not do any long running or + * potentially blocking operations as is is executed under the registry's lock. + */ + protected abstract void doRegister(@Nonnull C closeable, @Nonnull Map closeableMap); + + /** + * Does the actual un-registration of the closeable from the registry map. This should not do any long running or + * potentially blocking operations as is is executed under the registry's lock. + */ + protected abstract void doUnRegister(@Nonnull C closeable, @Nonnull Map closeableMap); + + /** + * Returns the lock on which manipulations to members closeableToRef and closeable must be synchronized. + */ protected final Object getSynchronizationLock() { - return closeableToRef; + return lock; } - protected abstract void doUnRegister(C closeable, Map closeableMap); + /** + * Adds a mapping to the registry map, respecting locking. + */ + protected final void addCloseableInternal(Closeable closeable, T metaData) { + synchronized (getSynchronizationLock()) { + closeableToRef.put(closeable, metaData); + } + } - protected abstract void doRegister(C closeable, Map closeableMap) throws IOException; + /** + * Removes a mapping from the registry map, respecting locking. + */ + protected final void removeCloseableInternal(Closeable closeable) { + synchronized (getSynchronizationLock()) { + closeableToRef.remove(closeable); + } + } + + @VisibleForTesting + public final int getNumberOfRegisteredCloseables() { + synchronized (getSynchronizationLock()) { + return closeableToRef.size(); + } + } + + @VisibleForTesting + public final boolean isCloseableRegistered(Closeable c) { + synchronized (getSynchronizationLock()) { + return closeableToRef.containsKey(c); + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/0073204b/flink-core/src/main/java/org/apache/flink/util/WrappingProxyUtil.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/util/WrappingProxyUtil.java b/flink-core/src/main/java/org/apache/flink/util/WrappingProxyUtil.java index 6a79913..7493d76 100644 --- a/flink-core/src/main/java/org/apache/flink/util/WrappingProxyUtil.java +++ b/flink-core/src/main/java/org/apache/flink/util/WrappingProxyUtil.java @@ -27,10 +27,16 @@ public final class WrappingProxyUtil { throw new AssertionError(); } + @SuppressWarnings("unchecked") public static T stripProxy(T object) { - while (object instanceof WrappingProxy) { + + T previous = null; + + while (object instanceof WrappingProxy && previous != object) { + previous = object; object = ((WrappingProxy) object).getWrappedDelegate(); } + return object; } } http://git-wip-us.apache.org/repos/asf/flink/blob/0073204b/flink-core/src/test/java/org/apache/flink/core/fs/AbstractCloseableRegistryTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/AbstractCloseableRegistryTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/AbstractCloseableRegistryTest.java new file mode 100644 index 0000000..41b69c8 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/core/fs/AbstractCloseableRegistryTest.java @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.fs; + +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.util.AbstractCloseableRegistry; + +import org.junit.Assert; +import org.junit.Test; + +import java.io.Closeable; +import java.io.IOException; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.verify; +import static org.powermock.api.mockito.PowerMockito.spy; + +public abstract class AbstractCloseableRegistryTest { + + protected ProducerThread[] streamOpenThreads; + protected AbstractCloseableRegistry closeableRegistry; + protected AtomicInteger unclosedCounter; + + protected abstract C createCloseable(); + + protected abstract AbstractCloseableRegistry createRegistry(); + + protected abstract ProducerThread createProducerThread( + AbstractCloseableRegistry registry, + AtomicInteger unclosedCounter, + int maxStreams); + + public void setup(int maxStreams) { + Assert.assertFalse(SafetyNetCloseableRegistry.isReaperThreadRunning()); + this.closeableRegistry = createRegistry(); + this.unclosedCounter = new AtomicInteger(0); + this.streamOpenThreads = new ProducerThread[10]; + for (int i = 0; i < streamOpenThreads.length; ++i) { + streamOpenThreads[i] = createProducerThread(closeableRegistry, unclosedCounter, maxStreams); + } + } + + protected void startThreads() { + for (ProducerThread t : streamOpenThreads) { + t.start(); + } + } + + protected void joinThreads() throws InterruptedException { + for (Thread t : streamOpenThreads) { + t.join(); + } + } + + @Test + public void testClose() throws Exception { + + setup(Integer.MAX_VALUE); + startThreads(); + + for (int i = 0; i < 5; ++i) { + System.gc(); + Thread.sleep(40); + } + + closeableRegistry.close(); + + joinThreads(); + + Assert.assertEquals(0, unclosedCounter.get()); + Assert.assertEquals(0, closeableRegistry.getNumberOfRegisteredCloseables()); + + final C testCloseable = spy(createCloseable()); + + try { + + closeableRegistry.registerClosable(testCloseable); + + Assert.fail("Closed registry should not accept closeables!"); + + } catch (IOException expected) { + //expected + } + + Assert.assertEquals(0, unclosedCounter.get()); + Assert.assertEquals(0, closeableRegistry.getNumberOfRegisteredCloseables()); + verify(testCloseable).close(); + } + + @Test + public void testNonBlockingClose() throws Exception { + setup(Integer.MAX_VALUE); + + final OneShotLatch waitRegistryClosedLatch = new OneShotLatch(); + final OneShotLatch blockCloseLatch = new OneShotLatch(); + + final C spyCloseable = spy(createCloseable()); + + doAnswer(invocationOnMock -> { + invocationOnMock.callRealMethod(); + waitRegistryClosedLatch.trigger(); + blockCloseLatch.await(); + return null; + }).when(spyCloseable).close(); + + closeableRegistry.registerClosable(spyCloseable); + + Assert.assertEquals(1, closeableRegistry.getNumberOfRegisteredCloseables()); + + Thread closer = new Thread(() -> { + try { + closeableRegistry.close(); + } catch (IOException ignore) { + + } + }); + + closer.start(); + waitRegistryClosedLatch.await(); + + final C testCloseable = spy(createCloseable()); + + try { + closeableRegistry.registerClosable(testCloseable); + Assert.fail("Closed registry should not accept closeables!"); + }catch (IOException ignore) { + } + + blockCloseLatch.trigger(); + closer.join(); + + verify(spyCloseable).close(); + verify(testCloseable).close(); + Assert.assertEquals(0, closeableRegistry.getNumberOfRegisteredCloseables()); + } + + protected static abstract class ProducerThread extends Thread { + + protected final AbstractCloseableRegistry registry; + protected final AtomicInteger refCount; + protected final int maxStreams; + protected int numStreams; + + public ProducerThread(AbstractCloseableRegistry registry, AtomicInteger refCount, int maxStreams) { + this.registry = registry; + this.refCount = refCount; + this.maxStreams = maxStreams; + this.numStreams = 0; + } + + protected abstract void createAndRegisterStream() throws IOException; + + @Override + public void run() { + try { + while (numStreams < maxStreams) { + + createAndRegisterStream(); + + try { + Thread.sleep(2); + } catch (InterruptedException ignored) {} + + if (maxStreams != Integer.MAX_VALUE) { + ++numStreams; + } + } + } catch (Exception ex) { + // ignored + } + } + } + + protected static final class TestStream extends FSDataInputStream { + + protected AtomicInteger refCount; + + public TestStream(AtomicInteger refCount) { + this.refCount = refCount; + refCount.incrementAndGet(); + } + + @Override + public void seek(long desired) throws IOException { + + } + + @Override + public long getPos() throws IOException { + return 0; + } + + @Override + public int read() throws IOException { + return 0; + } + + @Override + public synchronized void close() throws IOException { + if (refCount != null) { + refCount.decrementAndGet(); + refCount = null; + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/0073204b/flink-core/src/test/java/org/apache/flink/core/fs/CloseableRegistryTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/CloseableRegistryTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/CloseableRegistryTest.java new file mode 100644 index 0000000..eb8d1f4 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/core/fs/CloseableRegistryTest.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.fs; + +import org.apache.flink.util.AbstractCloseableRegistry; + +import java.io.Closeable; +import java.io.IOException; +import java.util.concurrent.atomic.AtomicInteger; + +public class CloseableRegistryTest extends AbstractCloseableRegistryTest { + + @Override + protected Closeable createCloseable() { + return new Closeable() { + @Override + public void close() throws IOException { + + } + }; + } + + @Override + protected AbstractCloseableRegistry createRegistry() { + + return new CloseableRegistry(); + } + + @Override + protected ProducerThread createProducerThread( + AbstractCloseableRegistry registry, + AtomicInteger unclosedCounter, + int maxStreams) { + + return new ProducerThread(registry, unclosedCounter, maxStreams) { + @Override + protected void createAndRegisterStream() throws IOException { + TestStream testStream = new TestStream(unclosedCounter); + registry.registerClosable(testStream); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/0073204b/flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java index 05ee894..4ceda50 100644 --- a/flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java +++ b/flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java @@ -19,7 +19,9 @@ package org.apache.flink.core.fs; import org.apache.flink.core.testutils.CheckedThread; +import org.apache.flink.util.AbstractCloseableRegistry; import org.apache.flink.util.ExceptionUtils; + import org.junit.After; import org.junit.Assert; import org.junit.Rule; @@ -30,41 +32,71 @@ import java.io.Closeable; import java.io.IOException; import java.util.concurrent.atomic.AtomicInteger; -public class SafetyNetCloseableRegistryTest { +public class SafetyNetCloseableRegistryTest + extends AbstractCloseableRegistryTest, + SafetyNetCloseableRegistry.PhantomDelegatingCloseableRef> { @Rule public final TemporaryFolder tmpFolder = new TemporaryFolder(); - private ProducerThread[] streamOpenThreads; - private SafetyNetCloseableRegistry closeableRegistry; - private AtomicInteger unclosedCounter; + @Override + protected WrappingProxyCloseable createCloseable() { + return new WrappingProxyCloseable() { - public void setup() { - Assert.assertFalse(SafetyNetCloseableRegistry.isReaperThreadRunning()); - this.closeableRegistry = new SafetyNetCloseableRegistry(); - this.unclosedCounter = new AtomicInteger(0); - this.streamOpenThreads = new ProducerThread[10]; - for (int i = 0; i < streamOpenThreads.length; ++i) { - streamOpenThreads[i] = new ProducerThread(closeableRegistry, unclosedCounter, Integer.MAX_VALUE); - } + @Override + public void close() throws IOException { + + } + + @Override + public Closeable getWrappedDelegate() { + return this; + } + }; } - @After - public void tearDown() { - Assert.assertFalse(SafetyNetCloseableRegistry.isReaperThreadRunning()); + @Override + protected AbstractCloseableRegistry< + WrappingProxyCloseable, + SafetyNetCloseableRegistry.PhantomDelegatingCloseableRef> createRegistry() { + + return new SafetyNetCloseableRegistry(); } - private void startThreads(int maxStreams) { - for (ProducerThread t : streamOpenThreads) { - t.setMaxStreams(maxStreams); - t.start(); - } + @Override + protected AbstractCloseableRegistryTest.ProducerThread< + WrappingProxyCloseable, + SafetyNetCloseableRegistry.PhantomDelegatingCloseableRef> createProducerThread( + AbstractCloseableRegistry< + WrappingProxyCloseable, + SafetyNetCloseableRegistry.PhantomDelegatingCloseableRef> registry, + AtomicInteger unclosedCounter, + int maxStreams) { + + return new AbstractCloseableRegistryTest.ProducerThread + , + SafetyNetCloseableRegistry.PhantomDelegatingCloseableRef>(registry, unclosedCounter, maxStreams) { + + int count = 0; + + @Override + protected void createAndRegisterStream() throws IOException { + String debug = Thread.currentThread().getName() + " " + count; + TestStream testStream = new TestStream(refCount); + + // this method automatically registers the stream with the given registry. + @SuppressWarnings("unused") + ClosingFSDataInputStream pis = ClosingFSDataInputStream.wrapSafe( + testStream, (SafetyNetCloseableRegistry) registry, + debug); //reference dies here + ++count; + } + }; } - private void joinThreads() throws InterruptedException { - for (Thread t : streamOpenThreads) { - t.join(); - } + @After + public void tearDown() { + Assert.assertFalse(SafetyNetCloseableRegistry.isReaperThreadRunning()); } @Test @@ -133,51 +165,9 @@ public class SafetyNetCloseableRegistryTest { } @Test - public void testClose() throws Exception { - - setup(); - startThreads(Integer.MAX_VALUE); - - for (int i = 0; i < 5; ++i) { - System.gc(); - Thread.sleep(40); - } - - closeableRegistry.close(); - - joinThreads(); - - Assert.assertEquals(0, unclosedCounter.get()); - - try { - - WrappingProxyCloseable testCloseable = new WrappingProxyCloseable() { - @Override - public Closeable getWrappedDelegate() { - return this; - } - - @Override - public void close() throws IOException { - unclosedCounter.incrementAndGet(); - } - }; - - closeableRegistry.registerClosable(testCloseable); - - Assert.fail("Closed registry should not accept closeables!"); - - } catch (IOException expected) { - //expected - } - - Assert.assertEquals(1, unclosedCounter.get()); - } - - @Test public void testSafetyNetClose() throws Exception { - setup(); - startThreads(20); + setup(20); + startThreads(); joinThreads(); @@ -194,95 +184,14 @@ public class SafetyNetCloseableRegistryTest { public void testReaperThreadSpawnAndStop() throws Exception { Assert.assertFalse(SafetyNetCloseableRegistry.isReaperThreadRunning()); - try (SafetyNetCloseableRegistry r1 = new SafetyNetCloseableRegistry()) { + try (SafetyNetCloseableRegistry ignored = new SafetyNetCloseableRegistry()) { Assert.assertTrue(SafetyNetCloseableRegistry.isReaperThreadRunning()); - try (SafetyNetCloseableRegistry r2 = new SafetyNetCloseableRegistry()) { + try (SafetyNetCloseableRegistry ignored2 = new SafetyNetCloseableRegistry()) { Assert.assertTrue(SafetyNetCloseableRegistry.isReaperThreadRunning()); } Assert.assertTrue(SafetyNetCloseableRegistry.isReaperThreadRunning()); } Assert.assertFalse(SafetyNetCloseableRegistry.isReaperThreadRunning()); } - - //------------------------------------------------------------------------------------------------------------------ - - private static final class ProducerThread extends Thread { - - private final SafetyNetCloseableRegistry registry; - private final AtomicInteger refCount; - private int maxStreams; - - public ProducerThread(SafetyNetCloseableRegistry registry, AtomicInteger refCount, int maxStreams) { - this.registry = registry; - this.refCount = refCount; - this.maxStreams = maxStreams; - } - - public int getMaxStreams() { - return maxStreams; - } - - public void setMaxStreams(int maxStreams) { - this.maxStreams = maxStreams; - } - - @Override - public void run() { - try { - int count = 0; - while (maxStreams > 0) { - String debug = Thread.currentThread().getName() + " " + count; - TestStream testStream = new TestStream(refCount); - refCount.incrementAndGet(); - - @SuppressWarnings("unused") - ClosingFSDataInputStream pis = ClosingFSDataInputStream.wrapSafe(testStream, registry, debug); //reference dies here - - try { - Thread.sleep(2); - } catch (InterruptedException ignored) {} - - if (maxStreams != Integer.MAX_VALUE) { - --maxStreams; - } - ++count; - } - } catch (Exception ex) { - // ignored - } - } - } - - private static final class TestStream extends FSDataInputStream { - - private AtomicInteger refCount; - - public TestStream(AtomicInteger refCount) { - this.refCount = refCount; - } - - @Override - public void seek(long desired) throws IOException { - - } - - @Override - public long getPos() throws IOException { - return 0; - } - - @Override - public int read() throws IOException { - return 0; - } - - @Override - public void close() throws IOException { - if (refCount != null) { - refCount.decrementAndGet(); - refCount = null; - } - } - } } http://git-wip-us.apache.org/repos/asf/flink/blob/0073204b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateSnapshotContextSynchronousImplTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateSnapshotContextSynchronousImplTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateSnapshotContextSynchronousImplTest.java index 099f1f9..8934591 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateSnapshotContextSynchronousImplTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateSnapshotContextSynchronousImplTest.java @@ -120,11 +120,11 @@ public class StateSnapshotContextSynchronousImplTest extends TestLogger { static final class InsightCloseableRegistry extends CloseableRegistry { public int size() { - return closeableToRef.size(); + return getNumberOfRegisteredCloseables(); } public boolean contains(Closeable closeable) { - return closeableToRef.containsKey(closeable); + return isCloseableRegistered(closeable); } } }