flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From srich...@apache.org
Subject [06/26] flink git commit: [hotfix] Introduce ShutdownHookUtil to avoid code duplication
Date Sun, 25 Feb 2018 16:11:40 GMT
[hotfix] Introduce ShutdownHookUtil to avoid code duplication

(Un)registering shotdown hooks for cleanups is a very common concern in Flink.
Many places in the code essentially duplicate all the code for doing this.
This commit introduces a utils class and deduplicates the code.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4d19b1c2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4d19b1c2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4d19b1c2

Branch: refs/heads/master
Commit: 4d19b1c2c276d85eb78a138da5edb25eaac5c088
Parents: 4e7f03e
Author: Stefan Richter <s.richter@data-artisans.com>
Authored: Fri Feb 23 12:11:30 2018 +0100
Committer: Stefan Richter <s.richter@data-artisans.com>
Committed: Sun Feb 25 15:10:28 2018 +0100

----------------------------------------------------------------------
 .../org/apache/flink/util/ShutdownHookUtil.java | 100 +++++++++++++++++++
 .../flink/api/java/RemoteEnvironment.java       |  40 +-------
 .../api/streaming/data/PythonStreamer.java      |  21 ++--
 .../runtime/webmonitor/WebRuntimeMonitor.java   |  16 +--
 .../webmonitor/history/HistoryServer.java       |  33 ++----
 .../flink/runtime/blob/AbstractBlobCache.java   |  15 +--
 .../apache/flink/runtime/blob/BlobServer.java   |  18 +---
 .../apache/flink/runtime/blob/BlobUtils.java    |  36 -------
 .../FileArchivedExecutionGraphStore.java        |  19 +---
 .../flink/runtime/filecache/FileCache.java      |  46 ++-------
 .../io/disk/iomanager/IOManagerAsync.java       |  35 +------
 .../runtime/util/JvmShutdownSafeguard.java      |  13 +--
 .../flink/runtime/testutils/TestJvmProcess.java |  15 +--
 .../yarn/AbstractYarnClusterDescriptor.java     |   7 +-
 14 files changed, 152 insertions(+), 262 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4d19b1c2/flink-core/src/main/java/org/apache/flink/util/ShutdownHookUtil.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/ShutdownHookUtil.java b/flink-core/src/main/java/org/apache/flink/util/ShutdownHookUtil.java
new file mode 100644
index 0000000..7526f65
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/ShutdownHookUtil.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import org.slf4j.Logger;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Utils class for dealing with JVM shutdown hooks.
+ */
+public class ShutdownHookUtil {
+
+	/**
+	 * Adds a shutdown hook to the JVM and returns the Thread, which has been registered.
+	 */
+	public static Thread addShutdownHook(
+		final AutoCloseable service,
+		final String serviceName,
+		final Logger logger) {
+
+		checkNotNull(service);
+		checkNotNull(logger);
+
+		final Thread shutdownHook = new Thread(() -> {
+			try {
+				service.close();
+			} catch (Throwable t) {
+				logger.error("Error during shutdown of {} via JVM shutdown hook.", serviceName, t);
+			}
+		}, serviceName + " shutdown hook");
+
+		return addShutdownHookThread(shutdownHook, serviceName, logger) ? shutdownHook : null;
+	}
+
+	/**
+	 * Adds a shutdown hook to the JVM and returns the Thread, which has been registered.
+	 */
+	public static boolean addShutdownHookThread(
+		final Thread shutdownHook,
+		final String serviceName,
+		final Logger logger) {
+
+		checkNotNull(shutdownHook);
+		checkNotNull(logger);
+
+		try {
+			// Add JVM shutdown hook to call shutdown of service
+			Runtime.getRuntime().addShutdownHook(shutdownHook);
+			return true;
+		} catch (IllegalStateException e) {
+			// JVM is already shutting down. no need to do our work
+		} catch (Throwable t) {
+			logger.error("Cannot register shutdown hook that cleanly terminates {}.", serviceName,
t);
+		}
+		return false;
+	}
+
+	/**
+	 * Removes a shutdown hook from the JVM.
+	 */
+	public static void removeShutdownHook(final Thread shutdownHook, final String serviceName,
final Logger logger) {
+
+		// Do not run if this is invoked by the shutdown hook itself
+		if (shutdownHook == null || shutdownHook == Thread.currentThread()) {
+			return;
+		}
+
+		checkNotNull(logger);
+
+		try {
+			Runtime.getRuntime().removeShutdownHook(shutdownHook);
+		} catch (IllegalStateException e) {
+			// race, JVM is in shutdown already, we can safely ignore this
+			logger.debug("Unable to remove shutdown hook for {}, shutdown already in progress", serviceName,
e);
+		} catch (Throwable t) {
+			logger.warn("Exception while un-registering {}'s shutdown hook.", serviceName, t);
+		}
+	}
+
+	private ShutdownHookUtil() {
+		throw new AssertionError();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4d19b1c2/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java
index fa223bd..c50f79c 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.PlanExecutor;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.ShutdownHookUtil;
 
 import java.io.File;
 import java.net.MalformedURLException;
@@ -224,19 +225,8 @@ public class RemoteEnvironment extends ExecutionEnvironment {
 	// ------------------------------------------------------------------------
 
 	protected void dispose() {
-		// Remove shutdown hook to prevent resource leaks, unless this is invoked by the
-		// shutdown hook itself
-		if (shutdownHook != null && shutdownHook != Thread.currentThread()) {
-			try {
-				Runtime.getRuntime().removeShutdownHook(shutdownHook);
-			}
-			catch (IllegalStateException e) {
-				// race, JVM is in shutdown already, we can safely ignore this
-			}
-			catch (Throwable t) {
-				LOG.warn("Exception while unregistering the cleanup shutdown hook.");
-			}
-		}
+		// Remove shutdown hook to prevent resource leaks
+		ShutdownHookUtil.removeShutdownHook(shutdownHook, getClass().getSimpleName(), LOG);
 
 		try {
 			PlanExecutor executor = this.executor;
@@ -262,29 +252,7 @@ public class RemoteEnvironment extends ExecutionEnvironment {
 
 	private void installShutdownHook() {
 		if (shutdownHook == null) {
-			Thread shutdownHook = new Thread(new Runnable() {
-				@Override
-				public void run() {
-					try {
-						dispose();
-					}
-					catch (Throwable t) {
-						LOG.error("Error in cleanup of RemoteEnvironment during JVM shutdown: " + t.getMessage(),
t);
-					}
-				}
-			});
-
-			try {
-				// Add JVM shutdown hook to call shutdown of service
-				Runtime.getRuntime().addShutdownHook(shutdownHook);
-				this.shutdownHook = shutdownHook;
-			}
-			catch (IllegalStateException e) {
-				// JVM is already shutting down. no need or a shutdown hook
-			}
-			catch (Throwable t) {
-				LOG.error("Cannot register shutdown hook that cleanly terminates the BLOB service.");
-			}
+			this.shutdownHook = ShutdownHookUtil.addShutdownHook(this::dispose, getClass().getSimpleName(),
LOG);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4d19b1c2/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
index 864ea30..e28b8db 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
@@ -21,6 +21,7 @@ import org.apache.flink.python.api.streaming.util.SerializationUtils.IntSerializ
 import org.apache.flink.python.api.streaming.util.SerializationUtils.StringSerializer;
 import org.apache.flink.python.api.streaming.util.StreamPrinter;
 import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.ShutdownHookUtil;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -124,18 +125,10 @@ public class PythonStreamer<S extends PythonSender, OUT> implements
Serializable
 		errorPrinter = new Thread(new StreamPrinter(process.getErrorStream(), msg));
 		errorPrinter.start();
 
-		shutdownThread = new Thread() {
-			@Override
-			public void run() {
-				try {
-					destroyProcess(process);
-				} catch (IOException ioException) {
-					LOG.warn("Could not destroy python process.", ioException);
-				}
-			}
-		};
-
-		Runtime.getRuntime().addShutdownHook(shutdownThread);
+		shutdownThread = ShutdownHookUtil.addShutdownHook(
+			() -> destroyProcess(process),
+			getClass().getSimpleName(),
+			LOG);
 
 		OutputStream processOutput = process.getOutputStream();
 		processOutput.write("operator\n".getBytes(ConfigConstants.DEFAULT_CHARSET));
@@ -207,9 +200,7 @@ public class PythonStreamer<S extends PythonSender, OUT> implements
Serializable
 			throwable = ExceptionUtils.firstOrSuppressed(t, throwable);
 		}
 
-		if (shutdownThread != null) {
-			Runtime.getRuntime().removeShutdownHook(shutdownThread);
-		}
+		ShutdownHookUtil.removeShutdownHook(shutdownThread, getClass().getSimpleName(), LOG);
 
 		ExceptionUtils.tryRethrowIOException(throwable);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/4d19b1c2/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
index 6abe0e7..f27ae00 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
@@ -84,6 +84,7 @@ import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever
 import org.apache.flink.runtime.webmonitor.utils.WebFrontendBootstrap;
 import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ShutdownHookUtil;
 
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Router;
@@ -382,20 +383,7 @@ public class WebRuntimeMonitor implements WebMonitor {
 			webRootDir));
 
 		// add shutdown hook for deleting the directories and remaining temp files on shutdown
-		try {
-			Runtime.getRuntime().addShutdownHook(new Thread() {
-				@Override
-				public void run() {
-					cleanup();
-				}
-			});
-		} catch (IllegalStateException e) {
-			// race, JVM is in shutdown already, we can safely ignore this
-			LOG.debug("Unable to add shutdown hook, shutdown already in progress", e);
-		} catch (Throwable t) {
-			// these errors usually happen when the shutdown is already in progress
-			LOG.warn("Error while adding shutdown hook", t);
-		}
+		ShutdownHookUtil.addShutdownHook(this::cleanup, getClass().getSimpleName(), LOG);
 
 		this.netty = new WebFrontendBootstrap(router, LOG, uploadDir, serverSSLContext, configuredAddress,
configuredPort, config);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4d19b1c2/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
index 9c3b51e..d361934 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
@@ -36,6 +36,7 @@ import org.apache.flink.runtime.webmonitor.utils.WebFrontendBootstrap;
 import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ShutdownHookUtil;
 
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Router;
 
@@ -181,22 +182,10 @@ public class HistoryServer {
 		long refreshIntervalMillis = config.getLong(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_REFRESH_INTERVAL);
 		archiveFetcher = new HistoryServerArchiveFetcher(refreshIntervalMillis, refreshDirs, webDir,
numFinishedPolls);
 
-		this.shutdownHook = new Thread() {
-			@Override
-			public void run() {
-				HistoryServer.this.stop();
-			}
-		};
-		// add shutdown hook for deleting the directories and remaining temp files on shutdown
-		try {
-			Runtime.getRuntime().addShutdownHook(shutdownHook);
-		} catch (IllegalStateException e) {
-			// race, JVM is in shutdown already, we can safely ignore this
-			LOG.debug("Unable to add shutdown hook, shutdown already in progress", e);
-		} catch (Throwable t) {
-			// these errors usually happen when the shutdown is already in progress
-			LOG.warn("Error while adding shutdown hook", t);
-		}
+		this.shutdownHook = ShutdownHookUtil.addShutdownHook(
+			HistoryServer.this::stop,
+			HistoryServer.class.getSimpleName(),
+			LOG);
 	}
 
 	@VisibleForTesting
@@ -263,16 +252,8 @@ public class HistoryServer {
 
 				LOG.info("Stopped history server.");
 
-				// Remove shutdown hook to prevent resource leaks, unless this is invoked by the shutdown
hook itself
-				if (shutdownHook != null && shutdownHook != Thread.currentThread()) {
-					try {
-						Runtime.getRuntime().removeShutdownHook(shutdownHook);
-					} catch (IllegalStateException ignored) {
-						// race, JVM is in shutdown already, we can safely ignore this
-					} catch (Throwable t) {
-						LOG.warn("Exception while unregistering HistoryServer cleanup shutdown hook.");
-					}
-				}
+				// Remove shutdown hook to prevent resource leaks
+				ShutdownHookUtil.removeShutdownHook(shutdownHook, getClass().getSimpleName(), LOG);
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/4d19b1c2/flink-runtime/src/main/java/org/apache/flink/runtime/blob/AbstractBlobCache.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/AbstractBlobCache.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/AbstractBlobCache.java
index ebcb42e..ce12898 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/AbstractBlobCache.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/AbstractBlobCache.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.ShutdownHookUtil;
 
 import org.slf4j.Logger;
 
@@ -116,7 +117,7 @@ public abstract class AbstractBlobCache implements Closeable {
 		}
 
 		// Add shutdown hook to delete storage directory
-		shutdownHook = BlobUtils.addShutdownHook(this, log);
+		shutdownHook = ShutdownHookUtil.addShutdownHook(this, getClass().getSimpleName(), log);
 
 		this.serverAddress = serverAddress;
 	}
@@ -249,16 +250,8 @@ public abstract class AbstractBlobCache implements Closeable {
 			try {
 				FileUtils.deleteDirectory(storageDir);
 			} finally {
-				// Remove shutdown hook to prevent resource leaks, unless this is invoked by the shutdown
hook itself
-				if (shutdownHook != null && shutdownHook != Thread.currentThread()) {
-					try {
-						Runtime.getRuntime().removeShutdownHook(shutdownHook);
-					} catch (IllegalStateException e) {
-						// race, JVM is in shutdown already, we can safely ignore this
-					} catch (Throwable t) {
-						log.warn("Exception while unregistering BLOB cache's cleanup shutdown hook.");
-					}
-				}
+				// Remove shutdown hook to prevent resource leaks
+				ShutdownHookUtil.removeShutdownHook(shutdownHook, getClass().getSimpleName(), log);
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/4d19b1c2/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
index 1213a31..92d1135 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.net.SSLUtils;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.NetUtils;
+import org.apache.flink.util.ShutdownHookUtil;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -169,7 +170,7 @@ public class BlobServer extends Thread implements BlobService, BlobWriter,
Perma
 			.schedule(new TransientBlobCleanupTask(blobExpiryTimes, readWriteLock.writeLock(),
 				storageDir, LOG), cleanupInterval, cleanupInterval);
 
-		this.shutdownHook = BlobUtils.addShutdownHook(this, LOG);
+		this.shutdownHook = ShutdownHookUtil.addShutdownHook(this, getClass().getSimpleName(),
LOG);
 
 		if (config.getBoolean(BlobServerOptions.SSL_ENABLED)) {
 			try {
@@ -345,19 +346,8 @@ public class BlobServer extends Thread implements BlobService, BlobWriter,
Perma
 				exception = ExceptionUtils.firstOrSuppressed(e, exception);
 			}
 
-			// Remove shutdown hook to prevent resource leaks, unless this is invoked by the
-			// shutdown hook itself
-			if (shutdownHook != null && shutdownHook != Thread.currentThread()) {
-				try {
-					Runtime.getRuntime().removeShutdownHook(shutdownHook);
-				}
-				catch (IllegalStateException e) {
-					// race, JVM is in shutdown already, we can safely ignore this
-				}
-				catch (Throwable t) {
-					LOG.warn("Exception while unregistering BLOB server's cleanup shutdown hook.", t);
-				}
-			}
+			// Remove shutdown hook to prevent resource leaks
+			ShutdownHookUtil.removeShutdownHook(shutdownHook, getClass().getSimpleName(), LOG);
 
 			if (LOG.isInfoEnabled()) {
 				LOG.info("Stopped BLOB server at {}:{}", serverSocket.getInetAddress().getHostAddress(),
getPort());

http://git-wip-us.apache.org/repos/asf/flink/blob/4d19b1c2/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
index 3273e1c..a21c7d6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
@@ -33,7 +33,6 @@ import org.slf4j.Logger;
 
 import javax.annotation.Nullable;
 
-import java.io.Closeable;
 import java.io.EOFException;
 import java.io.File;
 import java.io.IOException;
@@ -47,7 +46,6 @@ import java.security.NoSuchAlgorithmException;
 import java.util.Random;
 import java.util.UUID;
 
-import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly;
 
 /**
@@ -304,40 +302,6 @@ public class BlobUtils {
 	}
 
 	/**
-	 * Adds a shutdown hook to the JVM and returns the Thread, which has been registered.
-	 */
-	public static Thread addShutdownHook(final Closeable service, final Logger logger) {
-		checkNotNull(service);
-		checkNotNull(logger);
-
-		final Thread shutdownHook = new Thread(new Runnable() {
-			@Override
-			public void run() {
-				try {
-					service.close();
-				}
-				catch (Throwable t) {
-					logger.error("Error during shutdown of blob service via JVM shutdown hook.", t);
-				}
-			}
-		});
-
-		try {
-			// Add JVM shutdown hook to call shutdown of service
-			Runtime.getRuntime().addShutdownHook(shutdownHook);
-			return shutdownHook;
-		}
-		catch (IllegalStateException e) {
-			// JVM is already shutting down. no need to do our work
-			return null;
-		}
-		catch (Throwable t) {
-			logger.error("Cannot register shutdown hook that cleanly terminates the BLOB service.");
-			return null;
-		}
-	}
-
-	/**
 	 * Auxiliary method to write the length of an upcoming data chunk to an
 	 * output stream.
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/4d19b1c2/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStore.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStore.java
index d2dbeb5..6526072 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStore.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.dispatcher;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.blob.BlobUtils;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
@@ -31,6 +30,7 @@ import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
 import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ShutdownHookUtil;
 
 import org.apache.flink.shaded.guava18.com.google.common.base.Ticker;
 import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
@@ -122,7 +122,7 @@ public class FileArchivedExecutionGraphStore implements ArchivedExecutionGraphSt
 			expirationTime.toMilliseconds(),
 			TimeUnit.MILLISECONDS);
 
-		this.shutdownHook = BlobUtils.addShutdownHook(this, LOG);
+		this.shutdownHook = ShutdownHookUtil.addShutdownHook(this, getClass().getSimpleName(),
LOG);
 
 		this.numFinishedJobs = 0;
 		this.numFailedJobs = 0;
@@ -206,19 +206,8 @@ public class FileArchivedExecutionGraphStore implements ArchivedExecutionGraphSt
 		// clean up the storage directory
 		FileUtils.deleteFileOrDirectory(storageDir);
 
-		// Remove shutdown hook to prevent resource leaks, unless this is invoked by the
-		// shutdown hook itself
-		if (shutdownHook != null && shutdownHook != Thread.currentThread()) {
-			try {
-				Runtime.getRuntime().removeShutdownHook(shutdownHook);
-			}
-			catch (IllegalStateException e) {
-				// race, JVM is in shutdown already, we can safely ignore this
-			}
-			catch (Throwable t) {
-				LOG.warn("Exception while unregistering FileArchivedExecutionGraphStore's cleanup shutdown
hook.", t);
-			}
-		}
+		// Remove shutdown hook to prevent resource leaks
+		ShutdownHookUtil.removeShutdownHook(shutdownHook, getClass().getSimpleName(), LOG);
 	}
 
 	// --------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/4d19b1c2/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
index d78801d..d9d021a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
@@ -29,6 +29,7 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.util.ExecutorThreadFactory;
 import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ShutdownHookUtil;
 
 import org.apache.commons.io.FileUtils;
 import org.slf4j.Logger;
@@ -132,19 +133,8 @@ public class FileCache {
 				}
 			}
 
-			// Remove shutdown hook to prevent resource leaks, unless this is invoked by the
-			// shutdown hook itself
-			if (shutdownHook != null && shutdownHook != Thread.currentThread()) {
-				try {
-					Runtime.getRuntime().removeShutdownHook(shutdownHook);
-				}
-				catch (IllegalStateException e) {
-					// race, JVM is in shutdown already, we can safely ignore this
-				}
-				catch (Throwable t) {
-					LOG.warn("Exception while unregistering file cache's cleanup shutdown hook.");
-				}
-			}
+			// Remove shutdown hook to prevent resource leaks
+			ShutdownHookUtil.removeShutdownHook(shutdownHook, getClass().getSimpleName(), LOG);
 		}
 	}
 
@@ -265,31 +255,11 @@ public class FileCache {
 
 	private static Thread createShutdownHook(final FileCache cache, final Logger logger) {
 
-		Thread shutdownHook = new Thread(new Runnable() {
-			@Override
-			public void run() {
-				try {
-					cache.shutdown();
-				}
-				catch (Throwable t) {
-					logger.error("Error during shutdown of file cache via JVM shutdown hook: " + t.getMessage(),
t);
-				}
-			}
-		});
-
-		try {
-			// Add JVM shutdown hook to call shutdown of service
-			Runtime.getRuntime().addShutdownHook(shutdownHook);
-			return shutdownHook;
-		}
-		catch (IllegalStateException e) {
-			// JVM is already shutting down. no need to do our work
-			return null;
-		}
-		catch (Throwable t) {
-			logger.error("Cannot register shutdown hook that cleanly terminates the file cache service.");
-			return null;
-		}
+		return ShutdownHookUtil.addShutdownHook(
+			cache::shutdown,
+			FileCache.class.getSimpleName(),
+			logger
+		);
 	}
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/4d19b1c2/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java
index e2a3a6f..2b8e7f3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.io.disk.iomanager;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.util.ShutdownHookUtil;
 
 import java.io.IOException;
 import java.lang.Thread.UncaughtExceptionHandler;
@@ -99,22 +100,7 @@ public class IOManagerAsync extends IOManager implements UncaughtExceptionHandle
 		}
 
 		// install a shutdown hook that makes sure the temp directories get deleted
-		this.shutdownHook = new Thread("I/O manager shutdown hook") {
-			@Override
-			public void run() {
-				shutdown();
-			}
-		};
-		try {
-			Runtime.getRuntime().addShutdownHook(this.shutdownHook);
-		}
-		catch (IllegalStateException e) {
-			// race, JVM is in shutdown already, we can safely ignore this
-			LOG.debug("Unable to add shutdown hook, shutdown already in progress", e);
-		}
-		catch (Throwable t) {
-			LOG.warn("Error while adding shutdown hook for IOManager", t);
-		}
+		this.shutdownHook = ShutdownHookUtil.addShutdownHook(this::shutdown, getClass().getSimpleName(),
LOG);
 	}
 
 	/**
@@ -129,20 +115,9 @@ public class IOManagerAsync extends IOManager implements UncaughtExceptionHandle
 			return;
 		}
 
-		// Remove shutdown hook to prevent resource leaks, unless this is invoked by the shutdown
hook itself
-		if (shutdownHook != null && shutdownHook != Thread.currentThread()) {
-			try {
-				Runtime.getRuntime().removeShutdownHook(shutdownHook);
-			}
-			catch (IllegalStateException e) {
-				// race, JVM is in shutdown already, we can safely ignore this
-				LOG.debug("Unable to remove shutdown hook, shutdown already in progress", e);
-			}
-			catch (Throwable t) {
-				LOG.warn("Exception while unregistering IOManager's shutdown hook.", t);
-			}
-		}
-		
+		// Remove shutdown hook to prevent resource leaks
+		ShutdownHookUtil.removeShutdownHook(shutdownHook, getClass().getSimpleName(), LOG);
+
 		try {
 			if (LOG.isDebugEnabled()) {
 				LOG.debug("Shutting down I/O manager.");

http://git-wip-us.apache.org/repos/asf/flink/blob/4d19b1c2/flink-runtime/src/main/java/org/apache/flink/runtime/util/JvmShutdownSafeguard.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/JvmShutdownSafeguard.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/JvmShutdownSafeguard.java
index e8e378e..579cdf4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/JvmShutdownSafeguard.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/JvmShutdownSafeguard.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.util;
 
+import org.apache.flink.util.ShutdownHookUtil;
+
 import org.slf4j.Logger;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -112,15 +114,6 @@ public class JvmShutdownSafeguard extends Thread {
 
 		// install the blocking shutdown hook
 		Thread shutdownHook = new JvmShutdownSafeguard(delayMillis);
-		try {
-			// Add JVM shutdown hook to call shutdown of service
-			Runtime.getRuntime().addShutdownHook(shutdownHook);
-		}
-		catch (IllegalStateException ignored) {
-			// JVM is already shutting down. No need to do this.
-		}
-		catch (Throwable t) {
-			logger.error("Cannot install JVM Shutdown Safeguard against blocked shutdown hooks");
-		}
+		ShutdownHookUtil.addShutdownHookThread(shutdownHook, JvmShutdownSafeguard.class.getSimpleName(),
logger);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4d19b1c2/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestJvmProcess.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestJvmProcess.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestJvmProcess.java
index 4578edf..885ea8b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestJvmProcess.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestJvmProcess.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.testutils;
 
+import org.apache.flink.util.ShutdownHookUtil;
+
 import org.apache.commons.lang3.ArrayUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -229,18 +231,7 @@ public abstract class TestJvmProcess {
 			}
 			finally {
 				destroyed = true;
-
-				if (shutdownHook != null && shutdownHook != Thread.currentThread()) {
-					try {
-						Runtime.getRuntime().removeShutdownHook(shutdownHook);
-					}
-					catch (IllegalStateException ignored) {
-						// JVM is in shutdown already, we can safely ignore this.
-					}
-					catch (Throwable t) {
-						LOG.warn("Exception while unregistering process cleanup shutdown hook.");
-					}
-				}
+				ShutdownHookUtil.removeShutdownHook(shutdownHook, getClass().getSimpleName(), LOG);
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/4d19b1c2/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index 2d6926a..e6c36f6 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -40,6 +40,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ShutdownHookUtil;
 import org.apache.flink.yarn.configuration.YarnConfigOptions;
 
 import org.apache.hadoop.fs.FileSystem;
@@ -1036,11 +1037,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 					"temporary files of the YARN session in the home directoy will not be removed.");
 		}
 		// since deployment was successful, remove the hook
-		try {
-			Runtime.getRuntime().removeShutdownHook(deploymentFailureHook);
-		} catch (IllegalStateException e) {
-			// we're already in the shut down hook.
-		}
+		ShutdownHookUtil.removeShutdownHook(deploymentFailureHook, getClass().getSimpleName(),
LOG);
 		return report;
 	}
 


Mime
View raw message