flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject flink git commit: [FLINK-1492] Fix exceptions on blob store shutdown
Date Tue, 10 Feb 2015 14:34:00 GMT
Repository: flink
Updated Branches:
  refs/heads/master 7407076d3 -> b88f909ce


[FLINK-1492] Fix exceptions on blob store shutdown

This closes #376


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b88f909c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b88f909c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b88f909c

Branch: refs/heads/master
Commit: b88f909ce44bbd25528afee03079d0437d5f9a5b
Parents: 7407076
Author: Ufuk Celebi <uce@apache.org>
Authored: Mon Feb 9 11:44:47 2015 +0100
Committer: Robert Metzger <rmetzger@apache.org>
Committed: Tue Feb 10 15:32:19 2015 +0100

----------------------------------------------------------------------
 .../apache/flink/runtime/blob/BlobCache.java    | 15 +++++--
 .../apache/flink/runtime/blob/BlobServer.java   | 46 ++++++++++++--------
 .../apache/flink/runtime/blob/BlobUtils.java    | 21 +++++----
 3 files changed, 52 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b88f909c/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
index 3f0fcb6..d0d9a45 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
@@ -29,6 +29,7 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.net.InetSocketAddress;
 import java.net.URL;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * The BLOB cache implements a local cache for content-addressable BLOBs. When requesting
BLOBs through the
@@ -48,6 +49,10 @@ public final class BlobCache implements BlobService {
 
 	private final File storageDir;
 
+	private AtomicBoolean shutdownRequested = new AtomicBoolean();
+
+	/** Shutdown hook thread to ensure deletion of the storage directory. */
+	private final Thread shutdownHook;
 
 	public BlobCache(InetSocketAddress serverAddress) {
 		this.serverAddress = serverAddress;
@@ -57,7 +62,7 @@ public final class BlobCache implements BlobService {
 		LOG.info("Created BLOB cache storage directory " + storageDir);
 
 		// Add shutdown hook to delete storage directory
-		BlobUtils.addDeleteDirectoryShutdownHook(storageDir, LOG);
+		shutdownHook = BlobUtils.addShutdownHook(this, LOG);
 	}
 
 	/**
@@ -155,7 +160,11 @@ public final class BlobCache implements BlobService {
 	}
 
 	@Override
-	public void shutdown() throws IOException{
-		FileUtils.deleteDirectory(storageDir);
+	public void shutdown() throws IOException {
+		if (shutdownRequested.compareAndSet(false, true)) {
+			FileUtils.deleteDirectory(storageDir);
+
+			Runtime.getRuntime().removeShutdownHook(shutdownHook);
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b88f909c/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
index cf82a9b..068a859 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
@@ -26,6 +26,7 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.net.ServerSocket;
 import java.net.URL;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.io.FileUtils;
@@ -86,7 +87,10 @@ public final class BlobServer extends Thread implements BlobService{
 	/**
 	 * Indicates whether a shutdown of server component has been requested.
 	 */
-	private volatile boolean shutdownRequested = false;
+	private AtomicBoolean shutdownRequested = new AtomicBoolean();
+
+	/** Shutdown hook thread to ensure deletion of the storage directory. */
+	private final Thread shutdownHook;
 
 	/**
 	 * Is the root directory for file storage
@@ -114,8 +118,7 @@ public final class BlobServer extends Thread implements BlobService{
 
 			LOG.info("Created BLOB server storage directory " + storageDir);
 
-			// Add shutdown hook to delete storage directory
-			BlobUtils.addDeleteDirectoryShutdownHook(storageDir, LOG);
+			shutdownHook = BlobUtils.addShutdownHook(this, LOG);
 		}
 		catch (IOException e) {
 			throw new IOException("Could not create BlobServer with random port.", e);
@@ -180,12 +183,12 @@ public final class BlobServer extends Thread implements BlobService{
 
 		try {
 
-			while (!this.shutdownRequested) {
+			while (!this.shutdownRequested.get()) {
 				new BlobConnection(this.serverSocket.accept(), this).start();
 			}
 
 		} catch (IOException ioe) {
-			if (!this.shutdownRequested && LOG.isErrorEnabled()) {
+			if (!this.shutdownRequested.get() && LOG.isErrorEnabled()) {
 				LOG.error("Blob server stopped working.", ioe);
 			}
 		}
@@ -196,23 +199,28 @@ public final class BlobServer extends Thread implements BlobService{
 	 */
 	@Override
 	public void shutdown() throws IOException {
-
-		this.shutdownRequested = true;
-		try {
-			this.serverSocket.close();
-		} catch (IOException ioe) {
+		if (shutdownRequested.compareAndSet(false, true)) {
+			try {
+				this.serverSocket.close();
+			}
+			catch (IOException ioe) {
 				LOG.debug("Error while closing the server socket.", ioe);
-		}
-		try {
-			join();
-		} catch (InterruptedException ie) {
-			LOG.debug("Error while waiting for this thread to die.", ie);
-		}
+			}
+			try {
+				join();
+			}
+			catch (InterruptedException ie) {
+				LOG.debug("Error while waiting for this thread to die.", ie);
+			}
+
+			// Clean up the storage directory
+			FileUtils.deleteDirectory(storageDir);
 
-		// Clean up the storage directory
-		FileUtils.deleteDirectory(storageDir);
+			// Remove shutdown hook to prevent resource leaks
+			Runtime.getRuntime().removeShutdownHook(shutdownHook);
 
-		// TODO: Find/implement strategy to handle content-addressable BLOBs
+			// TODO: Find/implement strategy to handle content-addressable BLOBs
+		}
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/b88f909c/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
index 751137a..476f481 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
@@ -197,22 +197,27 @@ public class BlobUtils {
 	}
 
 	/**
-	 * Adds a shutdown hook to the JVM to delete the given directory.
+	 * Adds a shutdown hook to the JVM and returns the Thread, which has been registered.
 	 */
-	static void addDeleteDirectoryShutdownHook(final File dir, final Logger errorLogger) {
-		checkNotNull(dir);
+	static Thread addShutdownHook(final BlobService service, final Logger logger) {
+		checkNotNull(service);
+		checkNotNull(logger);
 
-		// Add shutdown hook to delete directory
-		Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+		final Thread shutdownHook = new Thread(new Runnable() {
 			@Override
 			public void run() {
 				try {
-					FileUtils.deleteDirectory(dir);
+					service.shutdown();
 				}
 				catch (Throwable t) {
-					errorLogger.error("Error deleting directory " + dir + " during JVM shutdown: " + t.getMessage(),
t);
+					logger.error("Error during shutdown of blob service via JVM shutdown hook: " + t.getMessage(),
t);
 				}
 			}
-		}));
+		});
+
+		// Add JVM shutdown hook to call shutdown of service
+		Runtime.getRuntime().addShutdownHook(shutdownHook);
+
+		return shutdownHook;
 	}
 }


Mime
View raw message