flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From srich...@apache.org
Subject [1/2] flink git commit: [FLINK-7524] Remove potentially blocking behaviour from AbstractCloseableRegistry.
Date Mon, 25 Sep 2017 15:47:50 GMT
Repository: flink
Updated Branches:
  refs/heads/master 1ebd44a63 -> 5af463a9c


[FLINK-7524] Remove potentially blocking behaviour from AbstractCloseableRegistry.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0073204b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0073204b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0073204b

Branch: refs/heads/master
Commit: 0073204b257860ad104cde29d3795b3c633f4759
Parents: 1ebd44a
Author: Stefan Richter <s.richter@data-artisans.com>
Authored: Mon Sep 4 12:30:41 2017 +0200
Committer: Stefan Richter <s.richter@data-artisans.com>
Committed: Mon Sep 25 16:04:06 2017 +0200

----------------------------------------------------------------------
 .../apache/flink/core/fs/CloseableRegistry.java |   9 +-
 .../core/fs/SafetyNetCloseableRegistry.java     |  35 ++-
 .../flink/util/AbstractCloseableRegistry.java   |  92 ++++++--
 .../apache/flink/util/WrappingProxyUtil.java    |   8 +-
 .../core/fs/AbstractCloseableRegistryTest.java  | 223 +++++++++++++++++++
 .../flink/core/fs/CloseableRegistryTest.java    |  59 +++++
 .../core/fs/SafetyNetCloseableRegistryTest.java | 211 +++++-------------
 ...StateSnapshotContextSynchronousImplTest.java |   4 +-
 8 files changed, 449 insertions(+), 192 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0073204b/flink-core/src/main/java/org/apache/flink/core/fs/CloseableRegistry.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/CloseableRegistry.java b/flink-core/src/main/java/org/apache/flink/core/fs/CloseableRegistry.java
index 0d4ea0c..29f363c 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/CloseableRegistry.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/CloseableRegistry.java
@@ -21,8 +21,9 @@ package org.apache.flink.core.fs;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.util.AbstractCloseableRegistry;
 
+import javax.annotation.Nonnull;
+
 import java.io.Closeable;
-import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -39,16 +40,16 @@ public class CloseableRegistry extends AbstractCloseableRegistry<Closeable,
Obje
 	private static final Object DUMMY = new Object();
 
 	public CloseableRegistry() {
-		super(new HashMap<Closeable, Object>());
+		super(new HashMap<>());
 	}
 
 	@Override
-	protected void doRegister(Closeable closeable, Map<Closeable, Object> closeableMap)
throws IOException {
+	protected void doRegister(@Nonnull Closeable closeable, @Nonnull Map<Closeable, Object>
closeableMap) {
 		closeableMap.put(closeable, DUMMY);
 	}
 
 	@Override
-	protected void doUnRegister(Closeable closeable, Map<Closeable, Object> closeableMap)
{
+	protected void doUnRegister(@Nonnull Closeable closeable, @Nonnull Map<Closeable, Object>
closeableMap) {
 		closeableMap.remove(closeable);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0073204b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java
b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java
index 8b28fa2..6097334 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java
@@ -27,7 +27,8 @@ import org.apache.flink.util.WrappingProxyUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.Nonnull;
+
 import java.io.Closeable;
 import java.io.IOException;
 import java.lang.ref.PhantomReference;
@@ -53,19 +54,17 @@ public class SafetyNetCloseableRegistry extends
 
 	private static final Logger LOG = LoggerFactory.getLogger(SafetyNetCloseableRegistry.class);
 
-	/** Lock for accessing reaper thread and registry count */
+	/** Lock for atomic modifications to reaper thread and registry count */
 	private static final Object REAPER_THREAD_LOCK = new Object();
 
 	/** Singleton reaper thread takes care of all registries in VM */
-	@GuardedBy("REAPER_THREAD_LOCK")
 	private static CloseableReaperThread REAPER_THREAD = null;
 
 	/** Global count of all instances of SafetyNetCloseableRegistry */
-	@GuardedBy("REAPER_THREAD_LOCK")
 	private static int GLOBAL_SAFETY_NET_REGISTRY_COUNT = 0;
 
-	public SafetyNetCloseableRegistry() {
-		super(new IdentityHashMap<Closeable, PhantomDelegatingCloseableRef>());
+	SafetyNetCloseableRegistry() {
+		super(new IdentityHashMap<>());
 
 		synchronized (REAPER_THREAD_LOCK) {
 			if (0 == GLOBAL_SAFETY_NET_REGISTRY_COUNT) {
@@ -79,8 +78,8 @@ public class SafetyNetCloseableRegistry extends
 
 	@Override
 	protected void doRegister(
-			WrappingProxyCloseable<? extends Closeable> wrappingProxyCloseable,
-			Map<Closeable, PhantomDelegatingCloseableRef> closeableMap) throws IOException {
+			@Nonnull WrappingProxyCloseable<? extends Closeable> wrappingProxyCloseable,
+			@Nonnull Map<Closeable, PhantomDelegatingCloseableRef> closeableMap) {
 
 		assert Thread.holdsLock(getSynchronizationLock());
 
@@ -100,8 +99,8 @@ public class SafetyNetCloseableRegistry extends
 
 	@Override
 	protected void doUnRegister(
-			WrappingProxyCloseable<? extends Closeable> closeable,
-			Map<Closeable, PhantomDelegatingCloseableRef> closeableMap) {
+		@Nonnull WrappingProxyCloseable<? extends Closeable> closeable,
+		@Nonnull Map<Closeable, PhantomDelegatingCloseableRef> closeableMap) {
 
 		assert Thread.holdsLock(getSynchronizationLock());
 
@@ -131,7 +130,7 @@ public class SafetyNetCloseableRegistry extends
 	}
 
 	@VisibleForTesting
-	public static boolean isReaperThreadRunning() {
+	static boolean isReaperThreadRunning() {
 		synchronized (REAPER_THREAD_LOCK) {
 			return null != REAPER_THREAD && REAPER_THREAD.isAlive();
 		}
@@ -148,10 +147,10 @@ public class SafetyNetCloseableRegistry extends
 		private final SafetyNetCloseableRegistry closeableRegistry;
 		private final String debugString;
 
-		public PhantomDelegatingCloseableRef(
-				WrappingProxyCloseable<? extends Closeable> referent,
-				SafetyNetCloseableRegistry closeableRegistry,
-				ReferenceQueue<? super WrappingProxyCloseable<? extends Closeable>> q) {
+		PhantomDelegatingCloseableRef(
+			WrappingProxyCloseable<? extends Closeable> referent,
+			SafetyNetCloseableRegistry closeableRegistry,
+			ReferenceQueue<? super WrappingProxyCloseable<? extends Closeable>> q) {
 
 			super(referent, q);
 			this.innerCloseable = Preconditions.checkNotNull(WrappingProxyUtil.stripProxy(referent));
@@ -159,15 +158,13 @@ public class SafetyNetCloseableRegistry extends
 			this.debugString = referent.toString();
 		}
 
-		public String getDebugString() {
+		String getDebugString() {
 			return debugString;
 		}
 
 		@Override
 		public void close() throws IOException {
-			synchronized (closeableRegistry.getSynchronizationLock()) {
-				closeableRegistry.closeableToRef.remove(innerCloseable);
-			}
+			closeableRegistry.removeCloseableInternal(innerCloseable);
 			innerCloseable.close();
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/0073204b/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java
b/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java
index f949779..4527b5e 100644
--- a/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java
+++ b/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java
@@ -19,9 +19,15 @@
 package org.apache.flink.util;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+
+import javax.annotation.Nonnull;
+import javax.annotation.concurrent.GuardedBy;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Map;
 
 /**
@@ -38,11 +44,20 @@ import java.util.Map;
 @Internal
 public abstract class AbstractCloseableRegistry<C extends Closeable, T> implements
Closeable {
 
-	protected final Map<Closeable, T> closeableToRef;
+	/** Lock that guards state of this registry. **/
+	private final Object lock;
+
+	/** Map from tracked Closeables to some associated meta data. */
+	@GuardedBy("lock")
+	private final Map<Closeable, T> closeableToRef;
+
+	/** Indicates if this registry is closed. */
+	@GuardedBy("lock")
 	private boolean closed;
 
-	public AbstractCloseableRegistry(Map<Closeable, T> closeableToRef) {
-		this.closeableToRef = closeableToRef;
+	public AbstractCloseableRegistry(@Nonnull Map<Closeable, T> closeableToRef) {
+		this.lock = new Object();
+		this.closeableToRef = Preconditions.checkNotNull(closeableToRef);
 		this.closed = false;
 	}
 
@@ -51,7 +66,6 @@ public abstract class AbstractCloseableRegistry<C extends Closeable,
T> implemen
 	 * {@link IllegalStateException} and closes the passed {@link Closeable}.
 	 *
 	 * @param closeable Closeable tor register
-	 * 
 	 * @throws IOException exception when the registry was closed before
 	 */
 	public final void registerClosable(C closeable) throws IOException {
@@ -61,13 +75,14 @@ public abstract class AbstractCloseableRegistry<C extends Closeable,
T> implemen
 		}
 
 		synchronized (getSynchronizationLock()) {
-			if (closed) {
-				IOUtils.closeQuietly(closeable);
-				throw new IOException("Cannot register Closeable, registry is already closed. Closing
argument.");
+			if (!closed) {
+				doRegister(closeable, closeableToRef);
+				return;
 			}
-
-			doRegister(closeable, closeableToRef);
 		}
+
+		IOUtils.closeQuietly(closeable);
+		throw new IOException("Cannot register Closeable, registry is already closed. Closing argument.");
 	}
 
 	/**
@@ -88,18 +103,22 @@ public abstract class AbstractCloseableRegistry<C extends Closeable,
T> implemen
 
 	@Override
 	public void close() throws IOException {
+		Collection<Closeable> toCloseCopy;
+
 		synchronized (getSynchronizationLock()) {
 
 			if (closed) {
 				return;
 			}
 
-			IOUtils.closeAllQuietly(closeableToRef.keySet());
+			closed = true;
 
-			closeableToRef.clear();
+			toCloseCopy = new ArrayList<>(closeableToRef.keySet());
 
-			closed = true;
+			closeableToRef.clear();
 		}
+
+		IOUtils.closeAllQuietly(toCloseCopy);
 	}
 
 	public boolean isClosed() {
@@ -108,11 +127,54 @@ public abstract class AbstractCloseableRegistry<C extends Closeable,
T> implemen
 		}
 	}
 
+	/**
+	 * Does the actual registration of the closeable with the registry map. This should not
do any long running or
+	 * potentially blocking operations as is is executed under the registry's lock.
+	 */
+	protected abstract void doRegister(@Nonnull C closeable, @Nonnull Map<Closeable, T>
closeableMap);
+
+	/**
+	 * Does the actual un-registration of the closeable from the registry map. This should not
do any long running or
+	 * potentially blocking operations as is is executed under the registry's lock.
+	 */
+	protected abstract void doUnRegister(@Nonnull C closeable, @Nonnull Map<Closeable, T>
closeableMap);
+
+	/**
+	 * Returns the lock on which manipulations to members closeableToRef and closeable must
be synchronized.
+	 */
 	protected final Object getSynchronizationLock() {
-		return closeableToRef;
+		return lock;
 	}
 
-	protected abstract void doUnRegister(C closeable, Map<Closeable, T> closeableMap);
+	/**
+	 * Adds a mapping to the registry map, respecting locking.
+	 */
+	protected final void addCloseableInternal(Closeable closeable, T metaData) {
+		synchronized (getSynchronizationLock()) {
+			closeableToRef.put(closeable, metaData);
+		}
+	}
 
-	protected abstract void doRegister(C closeable, Map<Closeable, T> closeableMap) throws
IOException;
+	/**
+	 * Removes a mapping from the registry map, respecting locking.
+	 */
+	protected final void removeCloseableInternal(Closeable closeable) {
+		synchronized (getSynchronizationLock()) {
+			closeableToRef.remove(closeable);
+		}
+	}
+
+	@VisibleForTesting
+	public final int getNumberOfRegisteredCloseables() {
+		synchronized (getSynchronizationLock()) {
+			return closeableToRef.size();
+		}
+	}
+
+	@VisibleForTesting
+	public final boolean isCloseableRegistered(Closeable c) {
+		synchronized (getSynchronizationLock()) {
+			return closeableToRef.containsKey(c);
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0073204b/flink-core/src/main/java/org/apache/flink/util/WrappingProxyUtil.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/WrappingProxyUtil.java b/flink-core/src/main/java/org/apache/flink/util/WrappingProxyUtil.java
index 6a79913..7493d76 100644
--- a/flink-core/src/main/java/org/apache/flink/util/WrappingProxyUtil.java
+++ b/flink-core/src/main/java/org/apache/flink/util/WrappingProxyUtil.java
@@ -27,10 +27,16 @@ public final class WrappingProxyUtil {
 		throw new AssertionError();
 	}
 
+	@SuppressWarnings("unchecked")
 	public static <T> T stripProxy(T object) {
-		while (object instanceof WrappingProxy) {
+
+		T previous = null;
+
+		while (object instanceof WrappingProxy && previous != object) {
+			previous = object;
 			object = ((WrappingProxy<T>) object).getWrappedDelegate();
 		}
+
 		return object;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0073204b/flink-core/src/test/java/org/apache/flink/core/fs/AbstractCloseableRegistryTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/AbstractCloseableRegistryTest.java
b/flink-core/src/test/java/org/apache/flink/core/fs/AbstractCloseableRegistryTest.java
new file mode 100644
index 0000000..41b69c8
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/core/fs/AbstractCloseableRegistryTest.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.util.AbstractCloseableRegistry;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.verify;
+import static org.powermock.api.mockito.PowerMockito.spy;
+
+public abstract class AbstractCloseableRegistryTest<C extends Closeable, T> {
+
+	protected ProducerThread[] streamOpenThreads;
+	protected AbstractCloseableRegistry<C, T> closeableRegistry;
+	protected AtomicInteger unclosedCounter;
+
+	protected abstract C createCloseable();
+
+	protected abstract AbstractCloseableRegistry<C, T> createRegistry();
+
+	protected abstract ProducerThread<C, T> createProducerThread(
+		AbstractCloseableRegistry<C, T> registry,
+		AtomicInteger unclosedCounter,
+		int maxStreams);
+
+	public void setup(int maxStreams) {
+		Assert.assertFalse(SafetyNetCloseableRegistry.isReaperThreadRunning());
+		this.closeableRegistry = createRegistry();
+		this.unclosedCounter = new AtomicInteger(0);
+		this.streamOpenThreads = new ProducerThread[10];
+		for (int i = 0; i < streamOpenThreads.length; ++i) {
+			streamOpenThreads[i] = createProducerThread(closeableRegistry, unclosedCounter, maxStreams);
+		}
+	}
+
+	protected void startThreads() {
+		for (ProducerThread t : streamOpenThreads) {
+			t.start();
+		}
+	}
+
+	protected void joinThreads() throws InterruptedException {
+		for (Thread t : streamOpenThreads) {
+			t.join();
+		}
+	}
+
+	@Test
+	public void testClose() throws Exception {
+
+		setup(Integer.MAX_VALUE);
+		startThreads();
+
+		for (int i = 0; i < 5; ++i) {
+			System.gc();
+			Thread.sleep(40);
+		}
+
+		closeableRegistry.close();
+
+		joinThreads();
+
+		Assert.assertEquals(0, unclosedCounter.get());
+		Assert.assertEquals(0, closeableRegistry.getNumberOfRegisteredCloseables());
+
+		final C testCloseable = spy(createCloseable());
+
+		try {
+
+			closeableRegistry.registerClosable(testCloseable);
+
+			Assert.fail("Closed registry should not accept closeables!");
+
+		} catch (IOException expected) {
+			//expected
+		}
+
+		Assert.assertEquals(0, unclosedCounter.get());
+		Assert.assertEquals(0, closeableRegistry.getNumberOfRegisteredCloseables());
+		verify(testCloseable).close();
+	}
+
+	@Test
+	public void testNonBlockingClose() throws Exception {
+		setup(Integer.MAX_VALUE);
+
+		final OneShotLatch waitRegistryClosedLatch = new OneShotLatch();
+		final OneShotLatch blockCloseLatch = new OneShotLatch();
+
+		final C spyCloseable = spy(createCloseable());
+
+		doAnswer(invocationOnMock -> {
+			invocationOnMock.callRealMethod();
+			waitRegistryClosedLatch.trigger();
+			blockCloseLatch.await();
+			return null;
+		}).when(spyCloseable).close();
+
+		closeableRegistry.registerClosable(spyCloseable);
+
+		Assert.assertEquals(1, closeableRegistry.getNumberOfRegisteredCloseables());
+
+		Thread closer = new Thread(() -> {
+			try {
+				closeableRegistry.close();
+			} catch (IOException ignore) {
+
+			}
+		});
+
+		closer.start();
+		waitRegistryClosedLatch.await();
+
+		final C testCloseable = spy(createCloseable());
+
+		try {
+			closeableRegistry.registerClosable(testCloseable);
+			Assert.fail("Closed registry should not accept closeables!");
+		}catch (IOException ignore) {
+		}
+
+		blockCloseLatch.trigger();
+		closer.join();
+
+		verify(spyCloseable).close();
+		verify(testCloseable).close();
+		Assert.assertEquals(0, closeableRegistry.getNumberOfRegisteredCloseables());
+	}
+
+	protected static abstract class ProducerThread<C extends Closeable, T> extends Thread
{
+
+		protected final AbstractCloseableRegistry<C, T> registry;
+		protected final AtomicInteger refCount;
+		protected final int maxStreams;
+		protected int numStreams;
+
+		public ProducerThread(AbstractCloseableRegistry<C, T> registry, AtomicInteger refCount,
int maxStreams) {
+			this.registry = registry;
+			this.refCount = refCount;
+			this.maxStreams = maxStreams;
+			this.numStreams = 0;
+		}
+
+		protected abstract void createAndRegisterStream() throws IOException;
+
+		@Override
+		public void run() {
+			try {
+				while (numStreams < maxStreams) {
+
+					createAndRegisterStream();
+
+					try {
+						Thread.sleep(2);
+					} catch (InterruptedException ignored) {}
+
+					if (maxStreams != Integer.MAX_VALUE) {
+						++numStreams;
+					}
+				}
+			} catch (Exception ex) {
+				// ignored
+			}
+		}
+	}
+
+	protected static final class TestStream extends FSDataInputStream {
+
+		protected AtomicInteger refCount;
+
+		public TestStream(AtomicInteger refCount) {
+			this.refCount = refCount;
+			refCount.incrementAndGet();
+		}
+
+		@Override
+		public void seek(long desired) throws IOException {
+
+		}
+
+		@Override
+		public long getPos() throws IOException {
+			return 0;
+		}
+
+		@Override
+		public int read() throws IOException {
+			return 0;
+		}
+
+		@Override
+		public synchronized void close() throws IOException {
+			if (refCount != null) {
+				refCount.decrementAndGet();
+				refCount = null;
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0073204b/flink-core/src/test/java/org/apache/flink/core/fs/CloseableRegistryTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/CloseableRegistryTest.java
b/flink-core/src/test/java/org/apache/flink/core/fs/CloseableRegistryTest.java
new file mode 100644
index 0000000..eb8d1f4
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/core/fs/CloseableRegistryTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.util.AbstractCloseableRegistry;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class CloseableRegistryTest extends AbstractCloseableRegistryTest<Closeable, Object>
{
+
+	@Override
+	protected Closeable createCloseable() {
+		return new Closeable() {
+			@Override
+			public void close() throws IOException {
+
+			}
+		};
+	}
+
+	@Override
+	protected AbstractCloseableRegistry<Closeable, Object> createRegistry() {
+
+		return new CloseableRegistry();
+	}
+
+	@Override
+	protected ProducerThread<Closeable, Object> createProducerThread(
+		AbstractCloseableRegistry<Closeable, Object> registry,
+		AtomicInteger unclosedCounter,
+		int maxStreams) {
+
+		return new ProducerThread<Closeable, Object>(registry, unclosedCounter, maxStreams)
{
+			@Override
+			protected void createAndRegisterStream() throws IOException {
+				TestStream testStream = new TestStream(unclosedCounter);
+				registry.registerClosable(testStream);
+			}
+		};
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0073204b/flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java
b/flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java
index 05ee894..4ceda50 100644
--- a/flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java
+++ b/flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java
@@ -19,7 +19,9 @@
 package org.apache.flink.core.fs;
 
 import org.apache.flink.core.testutils.CheckedThread;
+import org.apache.flink.util.AbstractCloseableRegistry;
 import org.apache.flink.util.ExceptionUtils;
+
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Rule;
@@ -30,41 +32,71 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.util.concurrent.atomic.AtomicInteger;
 
-public class SafetyNetCloseableRegistryTest {
+public class SafetyNetCloseableRegistryTest
+	extends AbstractCloseableRegistryTest<WrappingProxyCloseable<? extends Closeable>,
+	SafetyNetCloseableRegistry.PhantomDelegatingCloseableRef> {
 
 	@Rule
 	public final TemporaryFolder tmpFolder = new TemporaryFolder();
 
-	private ProducerThread[] streamOpenThreads;
-	private SafetyNetCloseableRegistry closeableRegistry;
-	private AtomicInteger unclosedCounter;
+	@Override
+	protected WrappingProxyCloseable<? extends Closeable> createCloseable() {
+		return new WrappingProxyCloseable<Closeable>() {
 
-	public void setup() {
-		Assert.assertFalse(SafetyNetCloseableRegistry.isReaperThreadRunning());
-		this.closeableRegistry = new SafetyNetCloseableRegistry();
-		this.unclosedCounter = new AtomicInteger(0);
-		this.streamOpenThreads = new ProducerThread[10];
-		for (int i = 0; i < streamOpenThreads.length; ++i) {
-			streamOpenThreads[i] = new ProducerThread(closeableRegistry, unclosedCounter, Integer.MAX_VALUE);
-		}
+			@Override
+			public void close() throws IOException {
+
+			}
+
+			@Override
+			public Closeable getWrappedDelegate() {
+				return this;
+			}
+		};
 	}
 
-	@After
-	public void tearDown() {
-		Assert.assertFalse(SafetyNetCloseableRegistry.isReaperThreadRunning());
+	@Override
+	protected AbstractCloseableRegistry<
+		WrappingProxyCloseable<? extends Closeable>,
+		SafetyNetCloseableRegistry.PhantomDelegatingCloseableRef> createRegistry() {
+
+		return new SafetyNetCloseableRegistry();
 	}
 
-	private void startThreads(int maxStreams) {
-		for (ProducerThread t : streamOpenThreads) {
-			t.setMaxStreams(maxStreams);
-			t.start();
-		}
+	@Override
+	protected AbstractCloseableRegistryTest.ProducerThread<
+		WrappingProxyCloseable<? extends Closeable>,
+		SafetyNetCloseableRegistry.PhantomDelegatingCloseableRef> createProducerThread(
+		AbstractCloseableRegistry<
+			WrappingProxyCloseable<? extends Closeable>,
+			SafetyNetCloseableRegistry.PhantomDelegatingCloseableRef> registry,
+		AtomicInteger unclosedCounter,
+		int maxStreams) {
+
+		return new AbstractCloseableRegistryTest.ProducerThread
+			<WrappingProxyCloseable<? extends Closeable>,
+				SafetyNetCloseableRegistry.PhantomDelegatingCloseableRef>(registry, unclosedCounter,
maxStreams) {
+
+			int count = 0;
+
+			@Override
+			protected void createAndRegisterStream() throws IOException {
+				String debug = Thread.currentThread().getName() + " " + count;
+				TestStream testStream = new TestStream(refCount);
+
+				// this method automatically registers the stream with the given registry.
+				@SuppressWarnings("unused")
+				ClosingFSDataInputStream pis = ClosingFSDataInputStream.wrapSafe(
+					testStream, (SafetyNetCloseableRegistry) registry,
+					debug); //reference dies here
+				++count;
+			}
+		};
 	}
 
-	private void joinThreads() throws InterruptedException {
-		for (Thread t : streamOpenThreads) {
-			t.join();
-		}
+	@After
+	public void tearDown() {
+		Assert.assertFalse(SafetyNetCloseableRegistry.isReaperThreadRunning());
 	}
 
 	@Test
@@ -133,51 +165,9 @@ public class SafetyNetCloseableRegistryTest {
 	}
 
 	@Test
-	public void testClose() throws Exception {
-
-		setup();
-		startThreads(Integer.MAX_VALUE);
-
-		for (int i = 0; i < 5; ++i) {
-			System.gc();
-			Thread.sleep(40);
-		}
-
-		closeableRegistry.close();
-
-		joinThreads();
-
-		Assert.assertEquals(0, unclosedCounter.get());
-
-		try {
-
-			WrappingProxyCloseable<Closeable> testCloseable = new WrappingProxyCloseable<Closeable>()
{
-				@Override
-				public Closeable getWrappedDelegate() {
-					return this;
-				}
-
-				@Override
-				public void close() throws IOException {
-					unclosedCounter.incrementAndGet();
-				}
-			};
-
-			closeableRegistry.registerClosable(testCloseable);
-
-			Assert.fail("Closed registry should not accept closeables!");
-
-		} catch (IOException expected) {
-			//expected
-		}
-
-		Assert.assertEquals(1, unclosedCounter.get());
-	}
-
-	@Test
 	public void testSafetyNetClose() throws Exception {
-		setup();
-		startThreads(20);
+		setup(20);
+		startThreads();
 
 		joinThreads();
 
@@ -194,95 +184,14 @@ public class SafetyNetCloseableRegistryTest {
 	public void testReaperThreadSpawnAndStop() throws Exception {
 		Assert.assertFalse(SafetyNetCloseableRegistry.isReaperThreadRunning());
 
-		try (SafetyNetCloseableRegistry r1 = new SafetyNetCloseableRegistry()) {
+		try (SafetyNetCloseableRegistry ignored = new SafetyNetCloseableRegistry()) {
 			Assert.assertTrue(SafetyNetCloseableRegistry.isReaperThreadRunning());
 
-			try (SafetyNetCloseableRegistry r2 = new SafetyNetCloseableRegistry()) {
+			try (SafetyNetCloseableRegistry ignored2 = new SafetyNetCloseableRegistry()) {
 				Assert.assertTrue(SafetyNetCloseableRegistry.isReaperThreadRunning());
 			}
 			Assert.assertTrue(SafetyNetCloseableRegistry.isReaperThreadRunning());
 		}
 		Assert.assertFalse(SafetyNetCloseableRegistry.isReaperThreadRunning());
 	}
-
-	//------------------------------------------------------------------------------------------------------------------
-
-	private static final class ProducerThread extends Thread {
-
-		private final SafetyNetCloseableRegistry registry;
-		private final AtomicInteger refCount;
-		private int maxStreams;
-
-		public ProducerThread(SafetyNetCloseableRegistry registry, AtomicInteger refCount, int
maxStreams) {
-			this.registry = registry;
-			this.refCount = refCount;
-			this.maxStreams = maxStreams;
-		}
-
-		public int getMaxStreams() {
-			return maxStreams;
-		}
-
-		public void setMaxStreams(int maxStreams) {
-			this.maxStreams = maxStreams;
-		}
-
-		@Override
-		public void run() {
-			try {
-				int count = 0;
-				while (maxStreams > 0) {
-					String debug = Thread.currentThread().getName() + " " + count;
-					TestStream testStream = new TestStream(refCount);
-					refCount.incrementAndGet();
-
-					@SuppressWarnings("unused")
-					ClosingFSDataInputStream pis = ClosingFSDataInputStream.wrapSafe(testStream, registry,
debug); //reference dies here
-
-					try {
-						Thread.sleep(2);
-					} catch (InterruptedException ignored) {}
-
-					if (maxStreams != Integer.MAX_VALUE) {
-						--maxStreams;
-					}
-					++count;
-				}
-			} catch (Exception ex) {
-				// ignored
-			}
-		}
-	}
-
-	private static final class TestStream extends FSDataInputStream {
-
-		private AtomicInteger refCount;
-
-		public TestStream(AtomicInteger refCount) {
-			this.refCount = refCount;
-		}
-
-		@Override
-		public void seek(long desired) throws IOException {
-
-		}
-
-		@Override
-		public long getPos() throws IOException {
-			return 0;
-		}
-
-		@Override
-		public int read() throws IOException {
-			return 0;
-		}
-
-		@Override
-		public void close() throws IOException {
-			if (refCount != null) {
-				refCount.decrementAndGet();
-				refCount = null;
-			}
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0073204b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateSnapshotContextSynchronousImplTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateSnapshotContextSynchronousImplTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateSnapshotContextSynchronousImplTest.java
index 099f1f9..8934591 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateSnapshotContextSynchronousImplTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateSnapshotContextSynchronousImplTest.java
@@ -120,11 +120,11 @@ public class StateSnapshotContextSynchronousImplTest extends TestLogger
{
 
 	static final class InsightCloseableRegistry extends CloseableRegistry {
 		public int size() {
-			return closeableToRef.size();
+			return getNumberOfRegisteredCloseables();
 		}
 
 		public boolean contains(Closeable closeable) {
-			return closeableToRef.containsKey(closeable);
+			return isCloseableRegistered(closeable);
 		}
 	}
 }


Mime
View raw message