flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject flink git commit: [FLINK-1492] Fix exceptions on blob store shutdown
Date Tue, 10 Feb 2015 15:46:32 GMT
Repository: flink
Updated Branches:
  refs/heads/release-0.8 e01712111 -> 5b420d847


[FLINK-1492] Fix exceptions on blob store shutdown

Conflicts:
	flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
	flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
	flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5b420d84
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5b420d84
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5b420d84

Branch: refs/heads/release-0.8
Commit: 5b420d8477b2ed1dd72d3bbcade5fd0f72a6110b
Parents: e017121
Author: Ufuk Celebi <uce@apache.org>
Authored: Mon Feb 9 11:44:47 2015 +0100
Committer: Robert Metzger <rmetzger@apache.org>
Committed: Tue Feb 10 14:40:35 2015 +0100

----------------------------------------------------------------------
 .../apache/flink/runtime/blob/BlobCache.java    | 16 ++++-
 .../apache/flink/runtime/blob/BlobServer.java   | 66 ++++++++++++--------
 .../apache/flink/runtime/blob/BlobUtils.java    | 30 ++++++++-
 3 files changed, 84 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5b420d84/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
index 0f57ea3..d0d9a45 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
@@ -29,6 +29,7 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.net.InetSocketAddress;
 import java.net.URL;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * The BLOB cache implements a local cache for content-addressable BLOBs. When requesting
BLOBs through the
@@ -48,6 +49,10 @@ public final class BlobCache implements BlobService {
 
 	private final File storageDir;
 
+	private AtomicBoolean shutdownRequested = new AtomicBoolean();
+
+	/** Shutdown hook thread to ensure deletion of the storage directory. */
+	private final Thread shutdownHook;
 
 	public BlobCache(InetSocketAddress serverAddress) {
 		this.serverAddress = serverAddress;
@@ -55,6 +60,9 @@ public final class BlobCache implements BlobService {
 		this.storageDir = BlobUtils.initStorageDirectory();
 
 		LOG.info("Created BLOB cache storage directory " + storageDir);
+
+		// Add shutdown hook to delete storage directory
+		shutdownHook = BlobUtils.addShutdownHook(this, LOG);
 	}
 
 	/**
@@ -152,7 +160,11 @@ public final class BlobCache implements BlobService {
 	}
 
 	@Override
-	public void shutdown() throws IOException{
-		FileUtils.deleteDirectory(storageDir);
+	public void shutdown() throws IOException {
+		if (shutdownRequested.compareAndSet(false, true)) {
+			FileUtils.deleteDirectory(storageDir);
+
+			Runtime.getRuntime().removeShutdownHook(shutdownHook);
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5b420d84/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
index 60e1716..068a859 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
@@ -26,6 +26,7 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.net.ServerSocket;
 import java.net.URL;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.io.FileUtils;
@@ -86,7 +87,10 @@ public final class BlobServer extends Thread implements BlobService{
 	/**
 	 * Indicates whether a shutdown of server component has been requested.
 	 */
-	private volatile boolean shutdownRequested = false;
+	private AtomicBoolean shutdownRequested = new AtomicBoolean();
+
+	/** Shutdown hook thread to ensure deletion of the storage directory. */
+	private final Thread shutdownHook;
 
 	/**
 	 * Is the root directory for file storage
@@ -100,18 +104,25 @@ public final class BlobServer extends Thread implements BlobService{
 	 *         thrown if the BLOB server cannot bind to a free network port
 	 */
 	public BlobServer() throws IOException {
+		try {
+			this.serverSocket = new ServerSocket(0);
 
-		this.serverSocket = new ServerSocket(0);
-		start();
+			start();
 
-		if (LOG.isInfoEnabled()) {
-			LOG.info(String.format("Started BLOB server on port %d",
-				this.serverSocket.getLocalPort()));
-		}
+			if (LOG.isInfoEnabled()) {
+				LOG.info(String.format("Started BLOB server on port %d",
+						this.serverSocket.getLocalPort()));
+			}
+
+			this.storageDir = BlobUtils.initStorageDirectory();
 
-		this.storageDir = BlobUtils.initStorageDirectory();
+			LOG.info("Created BLOB server storage directory " + storageDir);
 
-		LOG.info("Created BLOB server storage directory " + storageDir);
+			shutdownHook = BlobUtils.addShutdownHook(this, LOG);
+		}
+		catch (IOException e) {
+			throw new IOException("Could not create BlobServer with random port.", e);
+		}
 	}
 
 	/**
@@ -172,12 +183,12 @@ public final class BlobServer extends Thread implements BlobService{
 
 		try {
 
-			while (!this.shutdownRequested) {
+			while (!this.shutdownRequested.get()) {
 				new BlobConnection(this.serverSocket.accept(), this).start();
 			}
 
 		} catch (IOException ioe) {
-			if (!this.shutdownRequested && LOG.isErrorEnabled()) {
+			if (!this.shutdownRequested.get() && LOG.isErrorEnabled()) {
 				LOG.error("Blob server stopped working.", ioe);
 			}
 		}
@@ -188,23 +199,28 @@ public final class BlobServer extends Thread implements BlobService{
 	 */
 	@Override
 	public void shutdown() throws IOException {
-
-		this.shutdownRequested = true;
-		try {
-			this.serverSocket.close();
-		} catch (IOException ioe) {
+		if (shutdownRequested.compareAndSet(false, true)) {
+			try {
+				this.serverSocket.close();
+			}
+			catch (IOException ioe) {
 				LOG.debug("Error while closing the server socket.", ioe);
-		}
-		try {
-			join();
-		} catch (InterruptedException ie) {
-			LOG.debug("Error while waiting for this thread to die.", ie);
-		}
+			}
+			try {
+				join();
+			}
+			catch (InterruptedException ie) {
+				LOG.debug("Error while waiting for this thread to die.", ie);
+			}
 
-		// Clean up the storage directory
-		FileUtils.deleteDirectory(storageDir);
+			// Clean up the storage directory
+			FileUtils.deleteDirectory(storageDir);
 
-		// TODO: Find/implement strategy to handle content-addressable BLOBs
+			// Remove shutdown hook to prevent resource leaks
+			Runtime.getRuntime().removeShutdownHook(shutdownHook);
+
+			// TODO: Find/implement strategy to handle content-addressable BLOBs
+		}
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/5b420d84/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
index dec574d..476f481 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
@@ -23,6 +23,7 @@ import org.apache.commons.io.FileUtils;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.runtime.jobgraph.JobID;
+import org.slf4j.Logger;
 
 import java.io.File;
 import java.io.IOException;
@@ -31,7 +32,10 @@ import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
 import java.util.UUID;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+
 public class BlobUtils {
+
 	/**
 	 * Algorithm to be used for calculating the BLOB keys.
 	 */
@@ -52,7 +56,6 @@ public class BlobUtils {
 	 */
 	static final Charset DEFAULT_CHARSET = Charset.forName("utf-8");
 
-
 	/**
 	 * Creates a storage directory for a blob service.
 	 *
@@ -192,4 +195,29 @@ public class BlobUtils {
 			throw new RuntimeException(e);
 		}
 	}
+
+	/**
+	 * Adds a shutdown hook to the JVM and returns the Thread, which has been registered.
+	 */
+	static Thread addShutdownHook(final BlobService service, final Logger logger) {
+		checkNotNull(service);
+		checkNotNull(logger);
+
+		final Thread shutdownHook = new Thread(new Runnable() {
+			@Override
+			public void run() {
+				try {
+					service.shutdown();
+				}
+				catch (Throwable t) {
+					logger.error("Error during shutdown of blob service via JVM shutdown hook: " + t.getMessage(),
t);
+				}
+			}
+		});
+
+		// Add JVM shutdown hook to call shutdown of service
+		Runtime.getRuntime().addShutdownHook(shutdownHook);
+
+		return shutdownHook;
+	}
 }


Mime
View raw message