Repository: flink
Updated Branches:
refs/heads/master 7407076d3 -> b88f909ce
[FLINK-1492] Fix exceptions on blob store shutdown
This closes #376
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b88f909c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b88f909c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b88f909c
Branch: refs/heads/master
Commit: b88f909ce44bbd25528afee03079d0437d5f9a5b
Parents: 7407076
Author: Ufuk Celebi <uce@apache.org>
Authored: Mon Feb 9 11:44:47 2015 +0100
Committer: Robert Metzger <rmetzger@apache.org>
Committed: Tue Feb 10 15:32:19 2015 +0100
----------------------------------------------------------------------
.../apache/flink/runtime/blob/BlobCache.java | 15 +++++--
.../apache/flink/runtime/blob/BlobServer.java | 46 ++++++++++++--------
.../apache/flink/runtime/blob/BlobUtils.java | 21 +++++----
3 files changed, 52 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/b88f909c/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
index 3f0fcb6..d0d9a45 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
@@ -29,6 +29,7 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.URL;
+import java.util.concurrent.atomic.AtomicBoolean;
/**
* The BLOB cache implements a local cache for content-addressable BLOBs. When requesting
BLOBs through the
@@ -48,6 +49,10 @@ public final class BlobCache implements BlobService {
private final File storageDir;
+ private AtomicBoolean shutdownRequested = new AtomicBoolean();
+
+ /** Shutdown hook thread to ensure deletion of the storage directory. */
+ private final Thread shutdownHook;
public BlobCache(InetSocketAddress serverAddress) {
this.serverAddress = serverAddress;
@@ -57,7 +62,7 @@ public final class BlobCache implements BlobService {
LOG.info("Created BLOB cache storage directory " + storageDir);
// Add shutdown hook to delete storage directory
- BlobUtils.addDeleteDirectoryShutdownHook(storageDir, LOG);
+ shutdownHook = BlobUtils.addShutdownHook(this, LOG);
}
/**
@@ -155,7 +160,11 @@ public final class BlobCache implements BlobService {
}
@Override
- public void shutdown() throws IOException{
- FileUtils.deleteDirectory(storageDir);
+ public void shutdown() throws IOException {
+ if (shutdownRequested.compareAndSet(false, true)) {
+ FileUtils.deleteDirectory(storageDir);
+
+ Runtime.getRuntime().removeShutdownHook(shutdownHook);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b88f909c/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
index cf82a9b..068a859 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
@@ -26,6 +26,7 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.URL;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.io.FileUtils;
@@ -86,7 +87,10 @@ public final class BlobServer extends Thread implements BlobService{
/**
* Indicates whether a shutdown of server component has been requested.
*/
- private volatile boolean shutdownRequested = false;
+ private AtomicBoolean shutdownRequested = new AtomicBoolean();
+
+ /** Shutdown hook thread to ensure deletion of the storage directory. */
+ private final Thread shutdownHook;
/**
* Is the root directory for file storage
@@ -114,8 +118,7 @@ public final class BlobServer extends Thread implements BlobService{
LOG.info("Created BLOB server storage directory " + storageDir);
- // Add shutdown hook to delete storage directory
- BlobUtils.addDeleteDirectoryShutdownHook(storageDir, LOG);
+ shutdownHook = BlobUtils.addShutdownHook(this, LOG);
}
catch (IOException e) {
throw new IOException("Could not create BlobServer with random port.", e);
@@ -180,12 +183,12 @@ public final class BlobServer extends Thread implements BlobService{
try {
- while (!this.shutdownRequested) {
+ while (!this.shutdownRequested.get()) {
new BlobConnection(this.serverSocket.accept(), this).start();
}
} catch (IOException ioe) {
- if (!this.shutdownRequested && LOG.isErrorEnabled()) {
+ if (!this.shutdownRequested.get() && LOG.isErrorEnabled()) {
LOG.error("Blob server stopped working.", ioe);
}
}
@@ -196,23 +199,28 @@ public final class BlobServer extends Thread implements BlobService{
*/
@Override
public void shutdown() throws IOException {
-
- this.shutdownRequested = true;
- try {
- this.serverSocket.close();
- } catch (IOException ioe) {
+ if (shutdownRequested.compareAndSet(false, true)) {
+ try {
+ this.serverSocket.close();
+ }
+ catch (IOException ioe) {
LOG.debug("Error while closing the server socket.", ioe);
- }
- try {
- join();
- } catch (InterruptedException ie) {
- LOG.debug("Error while waiting for this thread to die.", ie);
- }
+ }
+ try {
+ join();
+ }
+ catch (InterruptedException ie) {
+ LOG.debug("Error while waiting for this thread to die.", ie);
+ }
+
+ // Clean up the storage directory
+ FileUtils.deleteDirectory(storageDir);
- // Clean up the storage directory
- FileUtils.deleteDirectory(storageDir);
+ // Remove shutdown hook to prevent resource leaks
+ Runtime.getRuntime().removeShutdownHook(shutdownHook);
- // TODO: Find/implement strategy to handle content-addressable BLOBs
+ // TODO: Find/implement strategy to handle content-addressable BLOBs
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/b88f909c/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
index 751137a..476f481 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
@@ -197,22 +197,27 @@ public class BlobUtils {
}
/**
- * Adds a shutdown hook to the JVM to delete the given directory.
+ * Adds a shutdown hook to the JVM and returns the Thread, which has been registered.
*/
- static void addDeleteDirectoryShutdownHook(final File dir, final Logger errorLogger) {
- checkNotNull(dir);
+ static Thread addShutdownHook(final BlobService service, final Logger logger) {
+ checkNotNull(service);
+ checkNotNull(logger);
- // Add shutdown hook to delete directory
- Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+ final Thread shutdownHook = new Thread(new Runnable() {
@Override
public void run() {
try {
- FileUtils.deleteDirectory(dir);
+ service.shutdown();
}
catch (Throwable t) {
- errorLogger.error("Error deleting directory " + dir + " during JVM shutdown: " + t.getMessage(),
t);
+ logger.error("Error during shutdown of blob service via JVM shutdown hook: " + t.getMessage(),
t);
}
}
- }));
+ });
+
+ // Add JVM shutdown hook to call shutdown of service
+ Runtime.getRuntime().addShutdownHook(shutdownHook);
+
+ return shutdownHook;
}
}
|