flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [1/3] flink git commit: [FLINK-4910] Introduce safety net for closing file system streams
Date Tue, 22 Nov 2016 23:25:53 GMT
Repository: flink
Updated Branches:
  refs/heads/master c590912c9 -> 4c23879a5


[FLINK-4910] Introduce safety net for closing file system streams

This closes #2691.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ba8ed263
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ba8ed263
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ba8ed263

Branch: refs/heads/master
Commit: ba8ed263695d16eacb4bdfdf195dd22c83bb53ed
Parents: c590912
Author: Stefan Richter <s.richter@data-artisans.com>
Authored: Mon Oct 24 17:49:54 2016 +0200
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Tue Nov 22 23:16:52 2016 +0100

----------------------------------------------------------------------
 .../common/operators/CollectionExecutor.java    |  31 +--
 .../apache/flink/core/fs/CloseableRegistry.java |  52 +++++
 .../flink/core/fs/ClosingFSDataInputStream.java |  97 ++++++++++
 .../core/fs/ClosingFSDataOutputStream.java      | 102 ++++++++++
 .../flink/core/fs/FSDataInputStreamWrapper.java |  96 +++++++++
 .../core/fs/FSDataOutputStreamWrapper.java      |  76 ++++++++
 .../org/apache/flink/core/fs/FileSystem.java    |  88 ++++++---
 .../core/fs/SafetyNetCloseableRegistry.java     | 181 +++++++++++++++++
 .../core/fs/SafetyNetWrapperFileSystem.java     | 150 ++++++++++++++
 .../flink/core/fs/WrappingProxyCloseable.java   |  30 +++
 .../flink/util/AbstractCloseableRegistry.java   | 114 +++++++++++
 .../java/org/apache/flink/util/IOUtils.java     |  15 +-
 .../org/apache/flink/util/WrappingProxy.java    |  25 +++
 .../apache/flink/util/WrappingProxyUtil.java    |  33 ++++
 .../apache/flink/core/fs/FileSystemTest.java    |  29 +--
 .../core/fs/SafetyNetCloseableRegistryTest.java | 193 +++++++++++++++++++
 .../flink/runtime/filecache/FileCache.java      |  42 ++--
 .../state/AbstractKeyedStateBackend.java        |   5 +-
 .../flink/runtime/state/ClosableRegistry.java   | 108 -----------
 .../state/DefaultOperatorStateBackend.java      |   5 +-
 .../state/StateInitializationContextImpl.java   |  15 +-
 .../StateSnapshotContextSynchronousImpl.java    |   5 +-
 .../state/filesystem/FileStateHandle.java       |  18 +-
 .../apache/flink/runtime/taskmanager/Task.java  |   5 +
 .../streaming/runtime/tasks/StreamTask.java     |   6 +-
 .../StateInitializationContextImplTest.java     |   6 +-
 ...StateSnapshotContextSynchronousImplTest.java |   4 +-
 .../util/AbstractStreamOperatorTestHarness.java |  10 +-
 .../test/checkpointing/RescalingITCase.java     |   1 +
 29 files changed, 1324 insertions(+), 218 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
index a6fc17e..07f48fc 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
@@ -18,20 +18,6 @@
 
 package org.apache.flink.api.common.operators;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.InvalidProgramException;
@@ -58,6 +44,7 @@ import org.apache.flink.api.common.operators.util.TypeComparable;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.fs.local.LocalFileSystem;
 import org.apache.flink.metrics.MetricGroup;
@@ -65,6 +52,20 @@ import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.types.Value;
 import org.apache.flink.util.Visitor;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
 /**
  * Execution utility for serial, local, collection-based executions of Flink programs.
  */
@@ -571,7 +572,7 @@ public class CollectionExecutor {
 
 		public CompletedFuture(Path entry) {
 			try{
-				LocalFileSystem fs = (LocalFileSystem) entry.getFileSystem();
+				LocalFileSystem fs = (LocalFileSystem) FileSystem.getUnguardedFileSystem(entry.toUri());
 				result = entry.isAbsolute() ? new Path(entry.toUri().getPath()): new Path(fs.getWorkingDirectory(),entry);
 			} catch (Exception e){
 				throw new RuntimeException("DistributedCache supports only local files for Collection Environments");

http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-core/src/main/java/org/apache/flink/core/fs/CloseableRegistry.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/CloseableRegistry.java b/flink-core/src/main/java/org/apache/flink/core/fs/CloseableRegistry.java
new file mode 100644
index 0000000..81ba7ab
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/CloseableRegistry.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.util.AbstractCloseableRegistry;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * This class allows to register instances of {@link Closeable}, which are all closed if this registry is closed.
+ * <p>
+ * Registering to an already closed registry will throw an exception and close the provided {@link Closeable}
+ * <p>
+ * All methods in this class are thread-safe.
+ */
+public class CloseableRegistry extends AbstractCloseableRegistry<Closeable, Object> {
+
+	private static final Object DUMMY = new Object();
+
+	public CloseableRegistry() {
+		super(new HashMap<Closeable, Object>());
+	}
+
+	@Override
+	protected void doRegister(Closeable closeable, Map<Closeable, Object> closeableMap) throws IOException {
+		closeableMap.put(closeable, DUMMY);
+	}
+
+	@Override
+	protected void doUnRegister(Closeable closeable, Map<Closeable, Object> closeableMap) {
+		closeableMap.remove(closeable);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-core/src/main/java/org/apache/flink/core/fs/ClosingFSDataInputStream.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/ClosingFSDataInputStream.java b/flink-core/src/main/java/org/apache/flink/core/fs/ClosingFSDataInputStream.java
new file mode 100644
index 0000000..23ac4f2
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/ClosingFSDataInputStream.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * This class is a {@link org.apache.flink.util.WrappingProxy} for {@link FSDataInputStream} that is used to
+ * implement a safety net against unclosed streams.
+ * <p>
+ * See {@link SafetyNetCloseableRegistry} for more details on how this is utilized.
+ */
+public class ClosingFSDataInputStream
+		extends FSDataInputStreamWrapper
+		implements WrappingProxyCloseable<FSDataInputStream> {
+
+	private final SafetyNetCloseableRegistry registry;
+	private final String debugInfo;
+
+	private volatile boolean closed;
+
+	private ClosingFSDataInputStream(
+			FSDataInputStream delegate, SafetyNetCloseableRegistry registry, String debugInfo) throws IOException {
+		super(delegate);
+		this.registry = Preconditions.checkNotNull(registry);
+		this.debugInfo = Preconditions.checkNotNull(debugInfo);
+		this.closed = false;
+	}
+
+	public boolean isClosed() {
+		return closed;
+	}
+
+	@Override
+	public void close() throws IOException {
+		if (!closed) {
+			closed = true;
+			registry.unregisterClosable(this);
+			inputStream.close();
+		}
+	}
+
+	@Override
+	public int hashCode() {
+		return inputStream.hashCode();
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+
+		if (this == obj) {
+			return true;
+		}
+
+		if (obj instanceof ClosingFSDataInputStream) {
+			return inputStream.equals(((ClosingFSDataInputStream) obj).inputStream);
+		}
+
+		return false;
+	}
+
+	@Override
+	public String toString() {
+		return "ClosingFSDataInputStream(" + inputStream.toString() + ") : " + debugInfo;
+	}
+
+	public static ClosingFSDataInputStream wrapSafe(
+			FSDataInputStream delegate, SafetyNetCloseableRegistry registry) throws IOException{
+		return wrapSafe(delegate, registry, "");
+	}
+
+	public static ClosingFSDataInputStream wrapSafe(
+			FSDataInputStream delegate, SafetyNetCloseableRegistry registry, String debugInfo) throws IOException{
+
+		ClosingFSDataInputStream inputStream = new ClosingFSDataInputStream(delegate, registry, debugInfo);
+		registry.registerClosable(inputStream);
+		return inputStream;
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-core/src/main/java/org/apache/flink/core/fs/ClosingFSDataOutputStream.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/ClosingFSDataOutputStream.java b/flink-core/src/main/java/org/apache/flink/core/fs/ClosingFSDataOutputStream.java
new file mode 100644
index 0000000..120ca67
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/ClosingFSDataOutputStream.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * This class is a {@link org.apache.flink.util.WrappingProxy} for {@link FSDataOutputStream} that is used to
+ * implement a safety net against unclosed streams.
+ * <p>
+ * See {@link SafetyNetCloseableRegistry} for more details on how this is utilized.
+ */
+public class ClosingFSDataOutputStream
+		extends FSDataOutputStreamWrapper
+		implements WrappingProxyCloseable<FSDataOutputStream> {
+
+	private final SafetyNetCloseableRegistry registry;
+	private final String debugString;
+
+	private volatile boolean closed;
+
+	public ClosingFSDataOutputStream(
+			FSDataOutputStream delegate, SafetyNetCloseableRegistry registry) throws IOException {
+		this(delegate, registry, "");
+	}
+
+	private ClosingFSDataOutputStream(
+			FSDataOutputStream delegate, SafetyNetCloseableRegistry registry, String debugString) throws IOException {
+		super(delegate);
+		this.registry = Preconditions.checkNotNull(registry);
+		this.debugString = Preconditions.checkNotNull(debugString);
+		this.closed = false;
+	}
+
+	public boolean isClosed() {
+		return closed;
+	}
+
+	@Override
+	public void close() throws IOException {
+		if (!closed) {
+			closed = true;
+			registry.unregisterClosable(this);
+			outputStream.close();
+		}
+	}
+
+	@Override
+	public int hashCode() {
+		return outputStream.hashCode();
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+
+		if (this == obj) {
+			return true;
+		}
+
+		if (obj instanceof ClosingFSDataOutputStream) {
+			return outputStream.equals(((ClosingFSDataOutputStream) obj).outputStream);
+		}
+
+		return false;
+	}
+
+	@Override
+	public String toString() {
+		return "ClosingFSDataOutputStream(" + outputStream.toString() + ") : " + debugString;
+	}
+
+	public static ClosingFSDataOutputStream wrapSafe(
+			FSDataOutputStream delegate, SafetyNetCloseableRegistry registry) throws IOException {
+		return wrapSafe(delegate, registry, "");
+	}
+
+	public static ClosingFSDataOutputStream wrapSafe(
+			FSDataOutputStream delegate, SafetyNetCloseableRegistry registry, String debugInfo) throws IOException {
+
+		ClosingFSDataOutputStream inputStream = new ClosingFSDataOutputStream(delegate, registry, debugInfo);
+		registry.registerClosable(inputStream);
+		return inputStream;
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-core/src/main/java/org/apache/flink/core/fs/FSDataInputStreamWrapper.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FSDataInputStreamWrapper.java b/flink-core/src/main/java/org/apache/flink/core/fs/FSDataInputStreamWrapper.java
new file mode 100644
index 0000000..507b756
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FSDataInputStreamWrapper.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.WrappingProxy;
+
+import java.io.IOException;
+
+/**
+ * Simple forwarding wrapper around {@link FSDataInputStream}
+ */
+public class FSDataInputStreamWrapper extends FSDataInputStream implements WrappingProxy<FSDataInputStream> {
+
+	protected final FSDataInputStream inputStream;
+
+	public FSDataInputStreamWrapper(FSDataInputStream inputStream) {
+		this.inputStream = Preconditions.checkNotNull(inputStream);
+	}
+
+	@Override
+	public void seek(long desired) throws IOException {
+		inputStream.seek(desired);
+	}
+
+	@Override
+	public long getPos() throws IOException {
+		return inputStream.getPos();
+	}
+
+	@Override
+	public int read() throws IOException {
+		return inputStream.read();
+	}
+
+	@Override
+	public int read(byte[] b) throws IOException {
+		return inputStream.read(b);
+	}
+
+	@Override
+	public int read(byte[] b, int off, int len) throws IOException {
+		return inputStream.read(b, off, len);
+	}
+
+	@Override
+	public long skip(long n) throws IOException {
+		return inputStream.skip(n);
+	}
+
+	@Override
+	public int available() throws IOException {
+		return inputStream.available();
+	}
+
+	@Override
+	public void close() throws IOException {
+		inputStream.close();
+	}
+
+	@Override
+	public void mark(int readlimit) {
+		inputStream.mark(readlimit);
+	}
+
+	@Override
+	public void reset() throws IOException {
+		inputStream.reset();
+	}
+
+	@Override
+	public boolean markSupported() {
+		return inputStream.markSupported();
+	}
+
+	@Override
+	public FSDataInputStream getWrappedDelegate() {
+		return inputStream;
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-core/src/main/java/org/apache/flink/core/fs/FSDataOutputStreamWrapper.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FSDataOutputStreamWrapper.java b/flink-core/src/main/java/org/apache/flink/core/fs/FSDataOutputStreamWrapper.java
new file mode 100644
index 0000000..36ebe10
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FSDataOutputStreamWrapper.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.WrappingProxy;
+
+import java.io.IOException;
+
+/**
+ * Simple forwarding wrapper around {@link FSDataInputStream}
+ */
+public class FSDataOutputStreamWrapper extends FSDataOutputStream implements WrappingProxy<FSDataOutputStream> {
+
+	protected final FSDataOutputStream outputStream;
+
+	public FSDataOutputStreamWrapper(FSDataOutputStream outputStream) {
+		this.outputStream = Preconditions.checkNotNull(outputStream);
+	}
+
+	@Override
+	public long getPos() throws IOException {
+		return outputStream.getPos();
+	}
+
+	@Override
+	public void flush() throws IOException {
+		outputStream.flush();
+	}
+
+	@Override
+	public void sync() throws IOException {
+		outputStream.sync();
+	}
+
+	@Override
+	public void write(int b) throws IOException {
+		outputStream.write(b);
+	}
+
+	@Override
+	public void write(byte[] b) throws IOException {
+		outputStream.write(b);
+	}
+
+	@Override
+	public void write(byte[] b, int off, int len) throws IOException {
+		outputStream.write(b, off, len);
+	}
+
+	@Override
+	public void close() throws IOException {
+		outputStream.close();
+	}
+
+	@Override
+	public FSDataOutputStream getWrappedDelegate() {
+		return outputStream;
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
index 1844d64..5a608b5 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
@@ -25,6 +25,14 @@
 
 package org.apache.flink.core.fs;
 
+import org.apache.flink.annotation.Public;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.OperatingSystem;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
@@ -34,11 +42,6 @@ import java.net.URISyntaxException;
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.flink.annotation.Public;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.util.OperatingSystem;
-
 /**
  * An abstract base class for a fairly generic file system. It
  * may be implemented as a distributed file system, or as a local
@@ -47,6 +50,8 @@ import org.apache.flink.util.OperatingSystem;
 @Public
 public abstract class FileSystem {
 
+	private static final InheritableThreadLocal<SafetyNetCloseableRegistry> REGISTRIES = new InheritableThreadLocal<>();
+
 	private static final String LOCAL_FILESYSTEM_CLASS = "org.apache.flink.core.fs.local.LocalFileSystem";
 
 	private static final String HADOOP_WRAPPER_FILESYSTEM_CLASS = "org.apache.flink.runtime.fs.hdfs.HadoopFileSystem";
@@ -55,6 +60,39 @@ public abstract class FileSystem {
 
 	private static final String HADOOP_WRAPPER_SCHEME = "hdwrapper";
 
+	private static final Logger LOG = LoggerFactory.getLogger(FileSystem.class);
+
+	/**
+	 * Create a SafetyNetCloseableRegistry for a Task. This method should be called at the beginning of the task's
+	 * main thread.
+	 */
+	public static void createFileSystemCloseableRegistryForTask() {
+		SafetyNetCloseableRegistry oldRegistry = REGISTRIES.get();
+		if (null != oldRegistry) {
+			IOUtils.closeQuietly(oldRegistry);
+			LOG.warn("Found existing SafetyNetCloseableRegistry. Closed and replaced it.");
+		}
+		SafetyNetCloseableRegistry newRegistry = new SafetyNetCloseableRegistry();
+		REGISTRIES.set(newRegistry);
+	}
+
+	/**
+	 * Create a SafetyNetCloseableRegistry for a Task. This method should be called at the end of the task's
+	 * main thread or when the task should be canceled.
+	 */
+	public static void disposeFileSystemCloseableRegistryForTask() {
+		SafetyNetCloseableRegistry registry = REGISTRIES.get();
+		if (null != registry) {
+			LOG.info("Ensuring all FileSystem streams are closed");
+			REGISTRIES.remove();
+			IOUtils.closeQuietly(registry);
+		}
+	}
+
+	private static FileSystem wrapWithSafetyNetWhenInTask(FileSystem fs) {
+		SafetyNetCloseableRegistry reg = REGISTRIES.get();
+		return reg != null ? new SafetyNetWrapperFileSystem(fs, reg) : fs;
+	}
 
 	/** Object used to protect calls to specific methods.*/
 	private static final Object SYNCHRONIZATION_OBJECT = new Object();
@@ -63,7 +101,7 @@ public abstract class FileSystem {
 	 * Enumeration for write modes.
 	 *
 	 */
-	public static enum WriteMode {
+	public enum WriteMode {
 
 		/** Creates write path if it does not exist. Does not overwrite existing files and directories. */
 		NO_OVERWRITE,
@@ -214,18 +252,7 @@ public abstract class FileSystem {
 		}
 	}
 
-	/**
-	 * Returns a reference to the {@link FileSystem} instance for accessing the
-	 * file system identified by the given {@link URI}.
-	 *
-	 * @param uri
-	 *        the {@link URI} identifying the file system
-	 * @return a reference to the {@link FileSystem} instance for accessing the file system identified by the given
-	 *         {@link URI}.
-	 * @throws IOException
-	 *         thrown if a reference to the file system instance could not be obtained
-	 */
-	public static FileSystem get(URI uri) throws IOException {
+	public static FileSystem getUnguardedFileSystem(URI uri) throws IOException {
 		FileSystem fs;
 
 		URI asked = uri;
@@ -238,13 +265,13 @@ public abstract class FileSystem {
 					}
 
 					uri = new URI(defaultScheme.getScheme(), null, defaultScheme.getHost(),
-						defaultScheme.getPort(), uri.getPath(), null, null);
+							defaultScheme.getPort(), uri.getPath(), null, null);
 
 				} catch (URISyntaxException e) {
 					try {
 						if (defaultScheme.getScheme().equals("file")) {
 							uri = new URI("file", null,
-								new Path(new File(uri.getPath()).getAbsolutePath()).toUri().getPath(), null);
+									new Path(new File(uri.getPath()).getAbsolutePath()).toUri().getPath(), null);
 						}
 					} catch (URISyntaxException ex) {
 						// we tried to repair it, but could not. report the scheme error
@@ -255,8 +282,8 @@ public abstract class FileSystem {
 
 			if(uri.getScheme() == null) {
 				throw new IOException("The URI '" + uri + "' is invalid.\n" +
-					"The fs.default-scheme = " + defaultScheme + ", the requested URI = " + asked +
-					", and the final URI = " + uri + ".");
+						"The fs.default-scheme = " + defaultScheme + ", the requested URI = " + asked +
+						", and the final URI = " + uri + ".");
 			}
 
 			if (uri.getScheme().equals("file") && uri.getAuthority() != null && !uri.getAuthority().isEmpty()) {
@@ -294,7 +321,7 @@ public abstract class FileSystem {
 				} else {
 					// we can not read from this file system.
 					throw new IOException("No file system found with scheme " + uri.getScheme()
-						+ ", referenced in file URI '" + uri.toString() + "'.");
+							+ ", referenced in file URI '" + uri.toString() + "'.");
 				}
 			} else {
 				// we end up here if we have a file system with build-in flink support.
@@ -316,6 +343,21 @@ public abstract class FileSystem {
 	}
 
 	/**
+	 * Returns a reference to the {@link FileSystem} instance for accessing the
+	 * file system identified by the given {@link URI}.
+	 *
+	 * @param uri
+	 *        the {@link URI} identifying the file system
+	 * @return a reference to the {@link FileSystem} instance for accessing the file system identified by the given
+	 *         {@link URI}.
+	 * @throws IOException
+	 *         thrown if a reference to the file system instance could not be obtained
+	 */
+	public static FileSystem get(URI uri) throws IOException {
+		return wrapWithSafetyNetWhenInTask(getUnguardedFileSystem(uri));
+	}
+
+	/**
 	 * Returns a boolean indicating whether a scheme has built-in Flink support.
 	 *
 	 * @param scheme

http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java
new file mode 100644
index 0000000..de4fb30
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.util.AbstractCloseableRegistry;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.WrappingProxyUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.lang.ref.PhantomReference;
+import java.lang.ref.ReferenceQueue;
+import java.util.IdentityHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This implementation of an {@link AbstractCloseableRegistry} registers {@link WrappingProxyCloseable}. When
+ * the proxy becomes subject to GC, this registry takes care of closing unclosed {@link Closeable}s.
+ * <p>
+ * Phantom references are used to track when {@link org.apache.flink.util.WrappingProxy}s of {@link Closeable} got
+ * GC'ed. We ensure that the wrapped {@link Closeable} is properly closed to avoid resource leaks.
+ * <p>
+ * Other than that, it works like a normal {@link CloseableRegistry}.
+ * <p>
+ * All methods in this class are thread-safe.
+ */
+public class SafetyNetCloseableRegistry extends
+		AbstractCloseableRegistry<WrappingProxyCloseable<? extends Closeable>,
+				SafetyNetCloseableRegistry.PhantomDelegatingCloseableRef> {
+
+	private static final Logger LOG = LoggerFactory.getLogger(SafetyNetCloseableRegistry.class);
+	private final ReferenceQueue<WrappingProxyCloseable<? extends Closeable>> referenceQueue;
+	private final Thread reaperThread;
+
+	public SafetyNetCloseableRegistry() {
+		super(new IdentityHashMap<Closeable, PhantomDelegatingCloseableRef>());
+		this.referenceQueue = new ReferenceQueue<>();
+		this.reaperThread = new CloseableReaperThread();
+		reaperThread.start();
+	}
+
+	@Override
+	protected void doRegister(
+			WrappingProxyCloseable<? extends Closeable> wrappingProxyCloseable,
+			Map<Closeable, PhantomDelegatingCloseableRef> closeableMap) throws IOException {
+
+		Closeable innerCloseable = WrappingProxyUtil.stripProxy(wrappingProxyCloseable.getWrappedDelegate());
+
+		if (null == innerCloseable) {
+			return;
+		}
+
+		PhantomDelegatingCloseableRef phantomRef =
+				new PhantomDelegatingCloseableRef(wrappingProxyCloseable, referenceQueue);
+
+		closeableMap.put(innerCloseable, phantomRef);
+	}
+
+	@Override
+	protected void doUnRegister(
+			WrappingProxyCloseable<? extends Closeable> closeable,
+			Map<Closeable, PhantomDelegatingCloseableRef> closeableMap) {
+
+		Closeable innerCloseable = WrappingProxyUtil.stripProxy(closeable.getWrappedDelegate());
+
+		if (null == innerCloseable) {
+			return;
+		}
+
+		closeableMap.remove(innerCloseable);
+	}
+
+	/**
+	 * Phantom reference to {@link WrappingProxyCloseable}.
+	 */
+	static final class PhantomDelegatingCloseableRef
+			extends PhantomReference<WrappingProxyCloseable<? extends Closeable>>
+			implements Closeable {
+
+		private final Closeable innerCloseable;
+		private final String debugString;
+
+		public PhantomDelegatingCloseableRef(
+				WrappingProxyCloseable<? extends Closeable> referent,
+				ReferenceQueue<? super WrappingProxyCloseable<? extends Closeable>> q) {
+
+			super(referent, q);
+			this.innerCloseable = Preconditions.checkNotNull(WrappingProxyUtil.stripProxy(referent));
+			this.debugString = referent.toString();
+		}
+
+		public Closeable getInnerCloseable() {
+			return innerCloseable;
+		}
+
+		public String getDebugString() {
+			return debugString;
+		}
+
+		@Override
+		public void close() throws IOException {
+			innerCloseable.close();
+		}
+	}
+
+	/**
+	 * Reaper runnable collects and closes leaking resources
+	 */
+	final class CloseableReaperThread extends Thread {
+
+		public CloseableReaperThread() {
+			super("CloseableReaperThread");
+			this.running = false;
+		}
+
+		private volatile boolean running;
+
+		@Override
+		public void run() {
+			this.running = true;
+			try {
+				List<PhantomDelegatingCloseableRef> closeableList = new LinkedList<>();
+				while (running) {
+					PhantomDelegatingCloseableRef oldRef = (PhantomDelegatingCloseableRef) referenceQueue.remove();
+					synchronized (getSynchronizationLock()) {
+						do {
+							closeableList.add(oldRef);
+							closeableToRef.remove(oldRef.getInnerCloseable());
+						}
+						while ((oldRef = (PhantomDelegatingCloseableRef) referenceQueue.poll()) != null);
+					}
+
+					// close outside the synchronized block in case this is blocking
+					for (PhantomDelegatingCloseableRef closeableRef : closeableList) {
+						IOUtils.closeQuietly(closeableRef);
+						if (LOG.isDebugEnabled()) {
+							LOG.debug("Closing unclosed resource: " + closeableRef.getDebugString());
+						}
+					}
+
+					closeableList.clear();
+				}
+			} catch (InterruptedException e) {
+				// done
+			}
+		}
+
+		@Override
+		public void interrupt() {
+			this.running = false;
+			super.interrupt();
+		}
+	}
+
+	@Override
+	public void close() throws IOException {
+		super.close();
+		reaperThread.interrupt();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java
new file mode 100644
index 0000000..bf30b4f
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.WrappingProxy;
+
+import java.io.IOException;
+import java.net.URI;
+
+/**
+ * This is a {@link WrappingProxy} around {@link FileSystem} which (i) wraps all opened streams as
+ * {@link ClosingFSDataInputStream} or {@link ClosingFSDataOutputStream} and (ii) registers them to
+ * a {@link SafetyNetCloseableRegistry}.
+ *
+ * Streams obtained by this are therefore managed by the {@link SafetyNetCloseableRegistry} to prevent resource leaks
+ * from unclosed streams.
+ */
+public class SafetyNetWrapperFileSystem extends FileSystem implements WrappingProxy<FileSystem> {
+
+	private final SafetyNetCloseableRegistry registry;
+	private final FileSystem unsafeFileSystem;
+
+	public SafetyNetWrapperFileSystem(FileSystem unsafeFileSystem, SafetyNetCloseableRegistry registry) {
+		this.registry = Preconditions.checkNotNull(registry);
+		this.unsafeFileSystem = Preconditions.checkNotNull(unsafeFileSystem);
+	}
+
+	@Override
+	public Path getWorkingDirectory() {
+		return unsafeFileSystem.getWorkingDirectory();
+	}
+
+	@Override
+	public Path getHomeDirectory() {
+		return unsafeFileSystem.getHomeDirectory();
+	}
+
+	@Override
+	public URI getUri() {
+		return unsafeFileSystem.getUri();
+	}
+
+	@Override
+	public void initialize(URI name) throws IOException {
+		unsafeFileSystem.initialize(name);
+	}
+
+	@Override
+	public FileStatus getFileStatus(Path f) throws IOException {
+		return unsafeFileSystem.getFileStatus(f);
+	}
+
+	@Override
+	public BlockLocation[] getFileBlockLocations(FileStatus file, long start, long len) throws IOException {
+		return unsafeFileSystem.getFileBlockLocations(file, start, len);
+	}
+
+	@Override
+	public FSDataInputStream open(Path f, int bufferSize) throws IOException {
+		FSDataInputStream innerStream = unsafeFileSystem.open(f, bufferSize);
+		return ClosingFSDataInputStream.wrapSafe(innerStream, registry, String.valueOf(f));
+	}
+
+	@Override
+	public FSDataInputStream open(Path f) throws IOException {
+		FSDataInputStream innerStream = unsafeFileSystem.open(f);
+		return ClosingFSDataInputStream.wrapSafe(innerStream, registry, String.valueOf(f));
+	}
+
+	@Override
+	public long getDefaultBlockSize() {
+		return unsafeFileSystem.getDefaultBlockSize();
+	}
+
+	@Override
+	public FileStatus[] listStatus(Path f) throws IOException {
+		return unsafeFileSystem.listStatus(f);
+	}
+
+	@Override
+	public boolean exists(Path f) throws IOException {
+		return unsafeFileSystem.exists(f);
+	}
+
+	@Override
+	public boolean delete(Path f, boolean recursive) throws IOException {
+		return unsafeFileSystem.delete(f, recursive);
+	}
+
+	@Override
+	public boolean mkdirs(Path f) throws IOException {
+		return unsafeFileSystem.mkdirs(f);
+	}
+
+	@Override
+	public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize, short replication, long blockSize)
+			throws IOException {
+
+		FSDataOutputStream innerStream = unsafeFileSystem.create(f, overwrite, bufferSize, replication, blockSize);
+		return ClosingFSDataOutputStream.wrapSafe(innerStream, registry, String.valueOf(f));
+	}
+
+	@Override
+	public FSDataOutputStream create(Path f, boolean overwrite) throws IOException {
+		FSDataOutputStream innerStream = unsafeFileSystem.create(f, overwrite);
+		return ClosingFSDataOutputStream.wrapSafe(innerStream, registry, String.valueOf(f));
+	}
+
+	@Override
+	public boolean rename(Path src, Path dst) throws IOException {
+		return unsafeFileSystem.rename(src, dst);
+	}
+
+	@Override
+	public boolean initOutPathLocalFS(Path outPath, WriteMode writeMode, boolean createDirectory) throws IOException {
+		return unsafeFileSystem.initOutPathLocalFS(outPath, writeMode, createDirectory);
+	}
+
+	@Override
+	public boolean initOutPathDistFS(Path outPath, WriteMode writeMode, boolean createDirectory) throws IOException {
+		return unsafeFileSystem.initOutPathDistFS(outPath, writeMode, createDirectory);
+	}
+
+	@Override
+	public boolean isDistributedFS() {
+		return unsafeFileSystem.isDistributedFS();
+	}
+
+	@Override
+	public FileSystem getWrappedDelegate() {
+		return unsafeFileSystem;
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-core/src/main/java/org/apache/flink/core/fs/WrappingProxyCloseable.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/WrappingProxyCloseable.java b/flink-core/src/main/java/org/apache/flink/core/fs/WrappingProxyCloseable.java
new file mode 100644
index 0000000..b74fc78
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/WrappingProxyCloseable.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.util.WrappingProxy;
+
+import java.io.Closeable;
+
+/**
+ * {@link WrappingProxy} for {@link Closeable} that is also closeable.
+ */
+public interface WrappingProxyCloseable<T extends Closeable> extends Closeable, WrappingProxy<T> {
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java b/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java
new file mode 100644
index 0000000..7c0291c
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * This is the abstract base class for registries that allow to register instances of {@link Closeable}, which are all
+ * closed if this registry is closed.
+ * <p>
+ * Registering to an already closed registry will throw an exception and close the provided {@link Closeable}
+ * <p>
+ * All methods in this class are thread-safe.
+ *
+ * @param <C> Type of the closeable this registers
+ * @param <T> Type for potential meta data associated with the registering closeables
+ */
+public abstract class AbstractCloseableRegistry<C extends Closeable, T> implements Closeable {
+
+	protected final Map<Closeable, T> closeableToRef;
+	private boolean closed;
+
+	public AbstractCloseableRegistry(Map<Closeable, T> closeableToRef) {
+		this.closeableToRef = closeableToRef;
+		this.closed = false;
+	}
+
+	/**
+	 * Registers a {@link Closeable} with the registry. In case the registry is already closed, this method throws an
+	 * {@link IllegalStateException} and closes the passed {@link Closeable}.
+	 *
+	 * @param closeable Closeable tor register
+	 * @return true if the the Closeable was newly added to the registry
+	 * @throws IOException exception when the registry was closed before
+	 */
+	public final void registerClosable(C closeable) throws IOException {
+
+		if (null == closeable) {
+			return;
+		}
+
+		synchronized (getSynchronizationLock()) {
+			if (closed) {
+				IOUtils.closeQuietly(closeable);
+				throw new IOException("Cannot register Closeable, registry is already closed. Closing argument.");
+			}
+
+			doRegister(closeable, closeableToRef);
+		}
+	}
+
+	/**
+	 * Removes a {@link Closeable} from the registry.
+	 *
+	 * @param closeable instance to remove from the registry.
+	 * @return true, if the instance was actually registered and now removed
+	 */
+	public final void unregisterClosable(C closeable) {
+
+		if (null == closeable) {
+			return;
+		}
+
+		synchronized (getSynchronizationLock()) {
+			doUnRegister(closeable, closeableToRef);
+		}
+	}
+
+	@Override
+	public void close() throws IOException {
+		synchronized (getSynchronizationLock()) {
+
+			for (Closeable closeable : closeableToRef.keySet()) {
+				IOUtils.closeQuietly(closeable);
+			}
+
+			closeableToRef.clear();
+
+			closed = true;
+		}
+	}
+
+	public boolean isClosed() {
+		synchronized (getSynchronizationLock()) {
+			return closed;
+		}
+	}
+
+	protected final Object getSynchronizationLock() {
+		return closeableToRef;
+	}
+
+	protected abstract void doUnRegister(C closeable, Map<Closeable, T> closeableMap);
+
+	protected abstract void doRegister(C closeable, Map<Closeable, T> closeableMap) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-core/src/main/java/org/apache/flink/util/IOUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/IOUtils.java b/flink-core/src/main/java/org/apache/flink/util/IOUtils.java
index 12d70ce..9810271 100644
--- a/flink-core/src/main/java/org/apache/flink/util/IOUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/IOUtils.java
@@ -18,14 +18,15 @@
 
 package org.apache.flink.util;
 
+import org.slf4j.Logger;
+
+import java.io.Closeable;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.PrintStream;
 import java.net.Socket;
 
-import org.slf4j.Logger;
-
 /**
  * An utility class for I/O related functionality.
  * 
@@ -213,6 +214,16 @@ public final class IOUtils {
 			}
 		}
 	}
+
+	public static void closeQuietly(Closeable closeable) {
+		try {
+			if (closeable != null) {
+				closeable.close();
+			}
+		} catch (IOException ignored) {
+
+		}
+	}
 	
 	// ------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-core/src/main/java/org/apache/flink/util/WrappingProxy.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/WrappingProxy.java b/flink-core/src/main/java/org/apache/flink/util/WrappingProxy.java
new file mode 100644
index 0000000..82fcf04
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/WrappingProxy.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+public interface WrappingProxy<T> {
+
+	T getWrappedDelegate();
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-core/src/main/java/org/apache/flink/util/WrappingProxyUtil.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/WrappingProxyUtil.java b/flink-core/src/main/java/org/apache/flink/util/WrappingProxyUtil.java
new file mode 100644
index 0000000..0f62abd
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/WrappingProxyUtil.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+public final class WrappingProxyUtil {
+
+	private WrappingProxyUtil() {
+		throw new AssertionError();
+	}
+
+	public static <T> T stripProxy(T object) {
+		while (object instanceof WrappingProxy) {
+			object = ((WrappingProxy<T>) object).getWrappedDelegate();
+		}
+		return object;
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-core/src/test/java/org/apache/flink/core/fs/FileSystemTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/FileSystemTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/FileSystemTest.java
index 04ebc0e..1bde2fb 100644
--- a/flink-core/src/test/java/org/apache/flink/core/fs/FileSystemTest.java
+++ b/flink-core/src/test/java/org/apache/flink/core/fs/FileSystemTest.java
@@ -17,32 +17,37 @@
  */
 package org.apache.flink.core.fs;
 
+import org.apache.flink.core.fs.local.LocalFileSystem;
+import org.apache.flink.util.WrappingProxyUtil;
+import org.junit.Test;
+
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
-import org.apache.flink.core.fs.local.LocalFileSystem;
-import org.junit.Test;
-import static org.junit.Assert.*;
+
+import static org.junit.Assert.assertTrue;
 
 public class FileSystemTest {
+
 	@Test
 	public void testGet() throws URISyntaxException, IOException {
 		String scheme = "file";
-		
-		assertTrue(FileSystem.get(new URI(scheme + ":///test/test")) instanceof LocalFileSystem);
-		
+
+		assertTrue(WrappingProxyUtil.stripProxy(FileSystem.get(new URI(scheme + ":///test/test"))) instanceof LocalFileSystem);
+
 		try {
 			FileSystem.get(new URI(scheme + "://test/test"));
 		} catch (IOException ioe) {
 			assertTrue(ioe.getMessage().startsWith("Found local file path with authority '"));
 		}
 
-		assertTrue(FileSystem.get(new URI(scheme + ":/test/test")) instanceof LocalFileSystem);
-		
-		assertTrue(FileSystem.get(new URI(scheme + ":test/test")) instanceof LocalFileSystem);
+		assertTrue(WrappingProxyUtil.stripProxy(FileSystem.get(new URI(scheme + ":/test/test"))) instanceof LocalFileSystem);
 
-		assertTrue(FileSystem.get(new URI("/test/test")) instanceof LocalFileSystem);
-		
-		assertTrue(FileSystem.get(new URI("test/test")) instanceof LocalFileSystem);
+		assertTrue(WrappingProxyUtil.stripProxy(FileSystem.get(new URI(scheme + ":test/test"))) instanceof LocalFileSystem);
+
+		assertTrue(WrappingProxyUtil.stripProxy(FileSystem.get(new URI("/test/test"))) instanceof LocalFileSystem);
+
+		assertTrue(WrappingProxyUtil.stripProxy(FileSystem.get(new URI("test/test"))) instanceof LocalFileSystem);
 	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java
new file mode 100644
index 0000000..6628407
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class SafetyNetCloseableRegistryTest {
+
+	private ProducerThread[] streamOpenThreads;
+	private SafetyNetCloseableRegistry closeableRegistry;
+	private AtomicInteger unclosedCounter;
+
+	@Before
+	public void setup() {
+		this.closeableRegistry = new SafetyNetCloseableRegistry();
+		this.unclosedCounter = new AtomicInteger(0);
+		this.streamOpenThreads = new ProducerThread[10];
+		for (int i = 0; i < streamOpenThreads.length; ++i) {
+			streamOpenThreads[i] = new ProducerThread(closeableRegistry, unclosedCounter, Integer.MAX_VALUE);
+		}
+	}
+
+	private void startThreads(int maxStreams) {
+		for (ProducerThread t : streamOpenThreads) {
+			t.setMaxStreams(maxStreams);
+			t.start();
+		}
+	}
+
+	private void joinThreads() throws InterruptedException {
+		for (Thread t : streamOpenThreads) {
+			t.join();
+		}
+	}
+
+	@Test
+	public void testClose() throws Exception {
+
+		startThreads(Integer.MAX_VALUE);
+
+		for (int i = 0; i < 5; ++i) {
+			System.gc();
+			Thread.sleep(40);
+		}
+
+		closeableRegistry.close();
+
+		joinThreads();
+
+		Assert.assertEquals(0, unclosedCounter.get());
+
+		try {
+
+			WrappingProxyCloseable<Closeable> testCloseable = new WrappingProxyCloseable<Closeable>() {
+				@Override
+				public Closeable getWrappedDelegate() {
+					return this;
+				}
+
+				@Override
+				public void close() throws IOException {
+					unclosedCounter.incrementAndGet();
+				}
+			};
+
+			closeableRegistry.registerClosable(testCloseable);
+
+			Assert.fail("Closed registry should not accept closeables!");
+
+		} catch (IOException expected) {
+			//expected
+		}
+
+		Assert.assertEquals(1, unclosedCounter.get());
+	}
+
+	@Test
+	public void testSafetyNetClose() throws Exception {
+
+		startThreads(20);
+
+		joinThreads();
+
+		for (int i = 0; i < 5 && unclosedCounter.get() > 0; ++i) {
+			System.gc();
+			Thread.sleep(50);
+		}
+
+		Assert.assertEquals(0, unclosedCounter.get());
+		closeableRegistry.close();
+	}
+
+	private static final class ProducerThread extends Thread {
+
+		private final SafetyNetCloseableRegistry registry;
+		private final AtomicInteger refCount;
+		private int maxStreams;
+
+		public ProducerThread(SafetyNetCloseableRegistry registry, AtomicInteger refCount, int maxStreams) {
+			this.registry = registry;
+			this.refCount = refCount;
+			this.maxStreams = maxStreams;
+		}
+
+		public int getMaxStreams() {
+			return maxStreams;
+		}
+
+		public void setMaxStreams(int maxStreams) {
+			this.maxStreams = maxStreams;
+		}
+
+		@Override
+		public void run() {
+			try {
+				int count = 0;
+				while (maxStreams > 0) {
+					String debug = Thread.currentThread().getName() + " " + count;
+					TestStream testStream = new TestStream(refCount);
+					refCount.incrementAndGet();
+					ClosingFSDataInputStream pis = ClosingFSDataInputStream.wrapSafe(testStream, registry, debug); //reference dies here
+
+					try {
+						Thread.sleep(2);
+					} catch (InterruptedException e) {
+
+					}
+
+					if (maxStreams != Integer.MAX_VALUE) {
+						--maxStreams;
+					}
+					++count;
+				}
+			} catch (Exception ex) {
+
+			}
+		}
+	}
+
+	private static final class TestStream extends FSDataInputStream {
+
+		private AtomicInteger refCount;
+
+		public TestStream(AtomicInteger refCount) {
+			this.refCount = refCount;
+		}
+
+		@Override
+		public void seek(long desired) throws IOException {
+
+		}
+
+		@Override
+		public long getPos() throws IOException {
+			return 0;
+		}
+
+		@Override
+		public int read() throws IOException {
+			return 0;
+		}
+
+		@Override
+		public void close() throws IOException {
+			if (refCount != null) {
+				refCount.decrementAndGet();
+				refCount = null;
+			}
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
index b5bdcaf..d79be05 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
@@ -18,19 +18,8 @@
 
 package org.apache.flink.runtime.filecache;
 
-import java.io.File;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.Callable;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.FutureTask;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
 import org.apache.commons.io.FileUtils;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.cache.DistributedCache.DistributedCacheEntry;
 import org.apache.flink.api.java.tuple.Tuple4;
 import org.apache.flink.configuration.ConfigConstants;
@@ -40,13 +29,23 @@ import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.util.ExecutorThreadFactory;
 import org.apache.flink.util.IOUtils;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
 /**
  * The FileCache is used to create the local files for the registered cache files when a task is deployed.
  * The files will be removed when the task is unregistered after a 5 second delay.
@@ -236,8 +235,10 @@ public class FileCache {
 	// ------------------------------------------------------------------------
 
 	public static void copy(Path sourcePath, Path targetPath, boolean executable) throws IOException {
-		FileSystem sFS = sourcePath.getFileSystem();
-		FileSystem tFS = targetPath.getFileSystem();
+		// TODO rewrite this to make it participate in the closable registry and the lifecycle of a task.
+		// we unwrap the file system to get raw streams without safety net
+		FileSystem sFS = FileSystem.getUnguardedFileSystem(sourcePath.toUri());
+		FileSystem tFS = FileSystem.getUnguardedFileSystem(targetPath.toUri());
 		if (!tFS.exists(targetPath)) {
 			if (sFS.getFileStatus(sourcePath).isDir()) {
 				tFS.mkdirs(targetPath);
@@ -253,16 +254,11 @@ public class FileCache {
 					copy(content.getPath(), new Path(localPath), executable);
 				}
 			} else {
-				try {
-					FSDataOutputStream lfsOutput = tFS.create(targetPath, false);
-					FSDataInputStream fsInput = sFS.open(sourcePath);
+				try (FSDataOutputStream lfsOutput = tFS.create(targetPath, false); FSDataInputStream fsInput = sFS.open(sourcePath)) {
 					IOUtils.copyBytes(fsInput, lfsOutput);
 					//noinspection ResultOfMethodCallIgnored
 					new File(targetPath.toString()).setExecutable(executable);
-					// closing the FSDataOutputStream
-					lfsOutput.close();
-				}
-				catch (IOException ioe) {
+				} catch (IOException ioe) {
 					LOG.error("could not copy file to local file cache.", ioe);
 				}
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
index e5d9b2b..ae71c7f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
@@ -33,6 +33,7 @@ import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.util.Preconditions;
 
@@ -80,7 +81,7 @@ public abstract class AbstractKeyedStateBackend<K>
 	protected final TaskKvStateRegistry kvStateRegistry;
 
 	/** Registry for all opened streams, so they can be closed if the task using this backend is closed */
-	protected ClosableRegistry cancelStreamRegistry;
+	protected CloseableRegistry cancelStreamRegistry;
 
 	protected final ClassLoader userCodeClassLoader;
 
@@ -96,7 +97,7 @@ public abstract class AbstractKeyedStateBackend<K>
 		this.numberOfKeyGroups = Preconditions.checkNotNull(numberOfKeyGroups);
 		this.userCodeClassLoader = Preconditions.checkNotNull(userCodeClassLoader);
 		this.keyGroupRange = Preconditions.checkNotNull(keyGroupRange);
-		this.cancelStreamRegistry = new ClosableRegistry();
+		this.cancelStreamRegistry = new CloseableRegistry();
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-runtime/src/main/java/org/apache/flink/runtime/state/ClosableRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ClosableRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ClosableRegistry.java
deleted file mode 100644
index b5f7dad..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ClosableRegistry.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state;
-
-import org.apache.commons.io.IOUtils;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.Set;
-
-/**
- * This class allows to register instances of {@link Closeable}, which are all closed if this registry is closed.
- * <p>
- * Registering to an already closed registry will throw an exception and close the provided {@link Closeable}
- * <p>
- * All methods in this class are thread-safe.
- */
-public class ClosableRegistry implements Closeable {
-
-	private final Set<Closeable> registeredCloseables;
-	private boolean closed;
-
-	public ClosableRegistry() {
-		this.registeredCloseables = new HashSet<>();
-		this.closed = false;
-	}
-
-	/**
-	 * Registers a {@link Closeable} with the registry. In case the registry is already closed, this method throws an
-	 * {@link IllegalStateException} and closes the passed {@link Closeable}.
-	 *
-	 * @param closeable Closable tor register
-	 * @return true if the the Closable was newly added to the registry
-	 * @throws IOException exception when the registry was closed before
-	 */
-	public boolean registerClosable(Closeable closeable) throws IOException {
-
-		if (null == closeable) {
-			return false;
-		}
-
-		synchronized (getSynchronizationLock()) {
-			if (closed) {
-				IOUtils.closeQuietly(closeable);
-				throw new IOException("Cannot register Closable, registry is already closed. Closed passed closable.");
-			}
-
-			return registeredCloseables.add(closeable);
-		}
-	}
-
-	/**
-	 * Removes a {@link Closeable} from the registry.
-	 *
-	 * @param closeable instance to remove from the registry.
-	 * @return true, if the instance was actually registered and now removed
-	 */
-	public boolean unregisterClosable(Closeable closeable) {
-
-		if (null == closeable) {
-			return false;
-		}
-
-		synchronized (getSynchronizationLock()) {
-			return registeredCloseables.remove(closeable);
-		}
-	}
-
-	@Override
-	public void close() throws IOException {
-		synchronized (getSynchronizationLock()) {
-
-			for (Closeable closeable : registeredCloseables) {
-				IOUtils.closeQuietly(closeable);
-			}
-
-			registeredCloseables.clear();
-			closed = true;
-		}
-	}
-
-	public boolean isClosed() {
-		synchronized (getSynchronizationLock()) {
-			return closed;
-		}
-	}
-
-	private Object getSynchronizationLock() {
-		return registeredCloseables;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
index 2f5d3cb..5b47362 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.state;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.memory.DataInputView;
@@ -49,7 +50,7 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
 	
 	private final Map<String, PartitionableListState<?>> registeredStates;
 	private final Collection<OperatorStateHandle> restoreSnapshots;
-	private final ClosableRegistry closeStreamOnCancelRegistry;
+	private final CloseableRegistry closeStreamOnCancelRegistry;
 	private final JavaSerializer<Serializable> javaSerializer;
 
 	/**
@@ -65,7 +66,7 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
 		this.javaSerializer = new JavaSerializer<>(userClassLoader);
 		this.restoreSnapshots = restoreSnapshots;
 		this.registeredStates = new HashMap<>();
-		this.closeStreamOnCancelRegistry = new ClosableRegistry();
+		this.closeStreamOnCancelRegistry = new CloseableRegistry();
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java
index 8fbde05..b131d14 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java
@@ -22,6 +22,7 @@ import org.apache.commons.io.IOUtils;
 import org.apache.flink.api.common.state.KeyedStateStore;
 import org.apache.flink.api.common.state.OperatorStateStore;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.util.Preconditions;
 
@@ -36,7 +37,7 @@ import java.util.Iterator;
 public class StateInitializationContextImpl implements StateInitializationContext {
 
 	/** Closable registry to participate in the operator's cancel/close methods */
-	private final ClosableRegistry closableRegistry;
+	private final CloseableRegistry closableRegistry;
 
 	/** Signal whether any state to restore was found */
 	private final boolean restored;
@@ -55,7 +56,7 @@ public class StateInitializationContextImpl implements StateInitializationContex
 			KeyedStateStore keyedStateStore,
 			Collection<KeyGroupsStateHandle> keyGroupsStateHandles,
 			Collection<OperatorStateHandle> operatorStateHandles,
-			ClosableRegistry closableRegistry) {
+			CloseableRegistry closableRegistry) {
 
 		this.restored = restored;
 		this.closableRegistry = Preconditions.checkNotNull(closableRegistry);
@@ -87,7 +88,7 @@ public class StateInitializationContextImpl implements StateInitializationContex
 		return keyGroupsStateHandles;
 	}
 
-	public ClosableRegistry getClosableRegistry() {
+	public CloseableRegistry getClosableRegistry() {
 		return closableRegistry;
 	}
 
@@ -137,14 +138,14 @@ public class StateInitializationContextImpl implements StateInitializationContex
 	private static class KeyGroupStreamIterator implements Iterator<KeyGroupStatePartitionStreamProvider> {
 
 		private final Iterator<KeyGroupsStateHandle> stateHandleIterator;
-		private final ClosableRegistry closableRegistry;
+		private final CloseableRegistry closableRegistry;
 
 		private KeyGroupsStateHandle currentStateHandle;
 		private FSDataInputStream currentStream;
 		private Iterator<Tuple2<Integer, Long>> currentOffsetsIterator;
 
 		public KeyGroupStreamIterator(
-				Iterator<KeyGroupsStateHandle> stateHandleIterator, ClosableRegistry closableRegistry) {
+				Iterator<KeyGroupsStateHandle> stateHandleIterator, CloseableRegistry closableRegistry) {
 
 			this.stateHandleIterator = Preconditions.checkNotNull(stateHandleIterator);
 			this.closableRegistry = Preconditions.checkNotNull(closableRegistry);
@@ -200,7 +201,7 @@ public class StateInitializationContextImpl implements StateInitializationContex
 		private final String stateName; //TODO since we only support a single named state in raw, this could be dropped
 
 		private final Iterator<OperatorStateHandle> stateHandleIterator;
-		private final ClosableRegistry closableRegistry;
+		private final CloseableRegistry closableRegistry;
 
 		private OperatorStateHandle currentStateHandle;
 		private FSDataInputStream currentStream;
@@ -210,7 +211,7 @@ public class StateInitializationContextImpl implements StateInitializationContex
 		public OperatorStateStreamIterator(
 				String stateName,
 				Iterator<OperatorStateHandle> stateHandleIterator,
-				ClosableRegistry closableRegistry) {
+				CloseableRegistry closableRegistry) {
 
 			this.stateName = Preconditions.checkNotNull(stateName);
 			this.stateHandleIterator = Preconditions.checkNotNull(stateHandleIterator);

http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java
index d632529..ce8a6c4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.state;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
@@ -42,7 +43,7 @@ public class StateSnapshotContextSynchronousImpl implements StateSnapshotContext
 	 * Registry for opened streams to participate in the lifecycle of the stream task. Hence, this registry should be 
 	 * obtained from and managed by the stream task.
 	 */
-	private final ClosableRegistry closableRegistry;
+	private final CloseableRegistry closableRegistry;
 
 	private KeyedStateCheckpointOutputStream keyedStateCheckpointOutputStream;
 	private OperatorStateCheckpointOutputStream operatorStateCheckpointOutputStream;
@@ -62,7 +63,7 @@ public class StateSnapshotContextSynchronousImpl implements StateSnapshotContext
 			long checkpointTimestamp,
 			CheckpointStreamFactory streamFactory,
 			KeyGroupRange keyGroupRange,
-			ClosableRegistry closableRegistry) {
+			CloseableRegistry closableRegistry) {
 
 		this.checkpointId = checkpointId;
 		this.checkpointTimestamp = checkpointTimestamp;

http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java
index 29e905c..b61c52d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java
@@ -43,9 +43,6 @@ public class FileStateHandle implements StreamStateHandle {
 	/** The size of the state in the file */
 	private final long stateSize;
 
-	/** Cached file system handle */
-	private transient FileSystem fs;
-
 	/**
 	 * Creates a new file state for the given file path.
 	 *
@@ -79,13 +76,17 @@ public class FileStateHandle implements StreamStateHandle {
 	 */
 	@Override
 	public void discardState() throws Exception {
-		getFileSystem().delete(filePath, false);
+
+		FileSystem fs = getFileSystem();
+
+		fs.delete(filePath, false);
 
 		// send a call to delete the checkpoint directory containing the file. This will
 		// fail (and be ignored) when some files still exist
 		try {
-			getFileSystem().delete(filePath.getParent(), false);
-		} catch (IOException ignored) {}
+			fs.delete(filePath.getParent(), false);
+		} catch (IOException ignored) {
+		}
 	}
 
 	/**
@@ -106,10 +107,7 @@ public class FileStateHandle implements StreamStateHandle {
 	 * @throws IOException Thrown if the file system cannot be accessed.
 	 */
 	private FileSystem getFileSystem() throws IOException {
-		if (fs == null) {
-			fs = FileSystem.get(filePath.toUri());
-		}
-		return fs;
+		return FileSystem.get(filePath.toUri());
 	}
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 3254fc1..c794f56 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.api.common.cache.DistributedCache;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
@@ -538,6 +539,9 @@ public class Task implements Runnable, TaskActions {
 			//  check for canceling as a shortcut
 			// ----------------------------
 
+			// init closeable registry for this task
+			FileSystem.createFileSystemCloseableRegistryForTask();
+
 			// first of all, get a user-code classloader
 			// this may involve downloading the job's JAR files and/or classes
 			LOG.info("Loading JAR files for task " + taskNameWithSubtask);
@@ -758,6 +762,7 @@ public class Task implements Runnable, TaskActions {
 
 				// remove all files in the distributed cache
 				removeCachedFiles(distributedCacheEntries, fileCache);
+				FileSystem.disposeFileSystemCloseableRegistryForTask();
 
 				notifyFinalState();
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 4aaad71..6595901 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.SubtaskState;
@@ -36,7 +37,6 @@ import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.ChainedStateHandle;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
-import org.apache.flink.runtime.state.ClosableRegistry;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.OperatorStateBackend;
@@ -161,7 +161,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 
 
 	/** The currently active background materialization threads */
-	private final ClosableRegistry cancelables = new ClosableRegistry();
+	private final CloseableRegistry cancelables = new CloseableRegistry();
 
 	/** Flag to mark the task "in operation", in which case check
 	 * needs to be initialized to true, so that early cancel() before invoke() behaves correctly */
@@ -949,7 +949,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 		}
 	}
 
-	public ClosableRegistry getCancelables() {
+	public CloseableRegistry getCancelables() {
 		return cancelables;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java
index 75c2261..cd94076 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java
@@ -20,13 +20,13 @@ package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.api.common.state.KeyedStateStore;
 import org.apache.flink.api.common.state.OperatorStateStore;
+import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.runtime.state.ClosableRegistry;
 import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
@@ -58,7 +58,7 @@ public class StateInitializationContextImplTest {
 	static final int NUM_HANDLES = 10;
 
 	private StateInitializationContextImpl initializationContext;
-	private ClosableRegistry closableRegistry;
+	private CloseableRegistry closableRegistry;
 
 	private int writtenKeyGroups;
 	private Set<Integer> writtenOperatorStates;
@@ -70,7 +70,7 @@ public class StateInitializationContextImplTest {
 		this.writtenKeyGroups = 0;
 		this.writtenOperatorStates = new HashSet<>();
 
-		this.closableRegistry = new ClosableRegistry();
+		this.closableRegistry = new CloseableRegistry();
 		OperatorStateStore stateStore = mock(OperatorStateStore.class);
 
 		ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos(64);

http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateSnapshotContextSynchronousImplTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateSnapshotContextSynchronousImplTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateSnapshotContextSynchronousImplTest.java
index 0ee839e..2b2df4c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateSnapshotContextSynchronousImplTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateSnapshotContextSynchronousImplTest.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.streaming.api.operators;
 
+import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
-import org.apache.flink.runtime.state.ClosableRegistry;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream;
 import org.apache.flink.runtime.state.OperatorStateCheckpointOutputStream;
@@ -35,7 +35,7 @@ public class StateSnapshotContextSynchronousImplTest {
 
 	@Before
 	public void setUp() throws Exception {
-		ClosableRegistry closableRegistry = new ClosableRegistry();
+		CloseableRegistry closableRegistry = new CloseableRegistry();
 		CheckpointStreamFactory streamFactory = new MemCheckpointStreamFactory(1024);
 		KeyGroupRange keyGroupRange = new KeyGroupRange(0, 2);
 		this.snapshotContext = new StateSnapshotContextSynchronousImpl(42, 4711, streamFactory, keyGroupRange, closableRegistry);

http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index 23a31d5..830cd6f 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.runtime.checkpoint.OperatorStateRepartitioner;
@@ -32,7 +33,6 @@ import org.apache.flink.runtime.operators.testutils.MockEnvironment;
 import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
-import org.apache.flink.runtime.state.ClosableRegistry;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.OperatorStateBackend;
@@ -65,7 +65,9 @@ import java.util.List;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
 import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 /**
  * Base class for {@code AbstractStreamOperator} test harnesses.
@@ -86,7 +88,7 @@ public class AbstractStreamOperatorTestHarness<OUT> {
 
 	final Environment environment;
 
-	ClosableRegistry closableRegistry;
+	CloseableRegistry closableRegistry;
 
 	// use this as default for tests
 	protected AbstractStateBackend stateBackend = new MemoryStateBackend();
@@ -115,7 +117,7 @@ public class AbstractStreamOperatorTestHarness<OUT> {
 		this.config = new StreamConfig(underlyingConfig);
 		this.config.setCheckpointingEnabled(true);
 		this.executionConfig = new ExecutionConfig();
-		this.closableRegistry = new ClosableRegistry();
+		this.closableRegistry = new CloseableRegistry();
 		this.checkpointLock = new Object();
 
 		environment = new MockEnvironment(

http://git-wip-us.apache.org/repos/asf/flink/blob/ba8ed263/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
index 5a64173..09de67f 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
@@ -114,6 +114,7 @@ public class RescalingITCase extends TestLogger {
 	public static void teardown() {
 		if (cluster != null) {
 			cluster.shutdown();
+			cluster.awaitTermination();
 		}
 	}
 


Mime
View raw message