flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [4/9] flink git commit: [FLINK-9027] [web] Clean up web UI resources by installing shut down hook
Date Fri, 23 Mar 2018 11:38:33 GMT
[FLINK-9027] [web] Clean up web UI resources by installing shut down hook

The ClusterEntrypoint creates temp directory for the RestServerEndpoint. This
directory contains the web ui files and if not differently configured the web
upload directory. In case of a hard shut down, as it happens with bin/stop-cluster.sh
the ClusterEntrypoint will clean up this directory by installing a shut down hook.

All future directory cleanup tasks should go into this method
ClusterEntrypoin#cleanupDirectories.

This closes #5740.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e1805583
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e1805583
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e1805583

Branch: refs/heads/master
Commit: e1805583b75ab7d31726cd605241e6faee793efb
Parents: 8809185
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Wed Mar 21 21:04:40 2018 +0100
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Fri Mar 23 12:32:38 2018 +0100

----------------------------------------------------------------------
 .../program/rest/RestClusterClientTest.java     |  5 --
 .../runtime/entrypoint/ClusterEntrypoint.java   | 55 ++++++++++++++++++--
 .../flink/runtime/minicluster/MiniCluster.java  |  2 +-
 .../flink/runtime/rest/RestServerEndpoint.java  | 14 ++---
 .../rest/RestServerEndpointConfiguration.java   |  3 +-
 .../rest/handler/RestHandlerConfiguration.java  | 20 +++----
 .../runtime/webmonitor/WebMonitorEndpoint.java  | 11 ++--
 .../runtime/rest/RestServerEndpointITCase.java  |  2 +-
 8 files changed, 74 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e1805583/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
index e108a0b..e98ba43 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
@@ -666,11 +666,6 @@ public class RestClusterClientTest extends TestLogger {
 
 		@Override
 		protected void startInternal() throws Exception {}
-
-		@Override
-		public void close() throws Exception {
-			shutDownAsync().get();
-		}
 	}
 
 	@FunctionalInterface

http://git-wip-us.apache.org/repos/asf/flink/blob/e1805583/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index 63c8072..8a4db03 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -69,7 +69,9 @@ import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever
 import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaQueryServiceRetriever;
 import org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever;
 import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ShutdownHookUtil;
 
 import akka.actor.ActorSystem;
 import org.slf4j.Logger;
@@ -78,10 +80,14 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.GuardedBy;
 
+import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.nio.file.Path;
+import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
@@ -159,9 +165,13 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler
{
 	@GuardedBy("lock")
 	private JobManagerMetricGroup jobManagerMetricGroup;
 
+	private final Thread shutDownHook;
+
 	protected ClusterEntrypoint(Configuration configuration) {
-		this.configuration = Preconditions.checkNotNull(configuration);
+		this.configuration = generateClusterConfiguration(configuration);
 		this.terminationFuture = new CompletableFuture<>();
+
+		shutDownHook = ShutdownHookUtil.addShutdownHook(this::cleanupDirectories, getClass().getSimpleName(),
LOG);
 	}
 
 	public CompletableFuture<Void> getTerminationFuture() {
@@ -479,7 +489,7 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
 			}
 
 			if (webMonitorEndpoint != null) {
-				terminationFutures.add(webMonitorEndpoint.shutDownAsync());
+				terminationFutures.add(webMonitorEndpoint.closeAsync());
 			}
 
 			if (dispatcher != null) {
@@ -523,6 +533,17 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler
{
 	// Internal methods
 	// --------------------------------------------------
 
+	private Configuration generateClusterConfiguration(Configuration configuration) {
+		final Configuration resultConfiguration = new Configuration(Preconditions.checkNotNull(configuration));
+
+		final String webTmpDir = configuration.getString(WebOptions.TMP_DIR);
+		final Path uniqueWebTmpDir = Paths.get(webTmpDir, "flink-web-" + UUID.randomUUID());
+
+		resultConfiguration.setString(WebOptions.TMP_DIR, uniqueWebTmpDir.toAbsolutePath().toString());
+
+		return resultConfiguration;
+	}
+
 	private CompletableFuture<Void> shutDownAsync(boolean cleanupHaData) {
 		if (isShutDown.compareAndSet(false, true)) {
 			LOG.info("Stopping {}.", getClass().getSimpleName());
@@ -535,11 +556,22 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler
{
 
 					serviceShutdownFuture.whenComplete(
 						(Void ignored2, Throwable serviceThrowable) -> {
+							Throwable finalException = null;
+
 							if (serviceThrowable != null) {
-								terminationFuture.completeExceptionally(
-									ExceptionUtils.firstOrSuppressed(serviceThrowable, componentThrowable));
+								finalException = ExceptionUtils.firstOrSuppressed(serviceThrowable, componentThrowable);
 							} else if (componentThrowable != null) {
-								terminationFuture.completeExceptionally(componentThrowable);
+								finalException = componentThrowable;
+							}
+
+							try {
+								cleanupDirectories();
+							} catch (IOException e) {
+								finalException = ExceptionUtils.firstOrSuppressed(e, finalException);
+							}
+
+							if (finalException != null) {
+								terminationFuture.completeExceptionally(finalException);
 							} else {
 								terminationFuture.complete(null);
 							}
@@ -576,6 +608,19 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler
{
 		}
 	}
 
+	/**
+	 * Clean up of temporary directories created by the {@link ClusterEntrypoint}.
+	 *
+	 * @throws IOException if the temporary directories could not be cleaned up
+	 */
+	private void cleanupDirectories() throws IOException {
+		ShutdownHookUtil.removeShutdownHook(shutDownHook, getClass().getSimpleName(), LOG);
+
+		final String webTmpDir = configuration.getString(WebOptions.TMP_DIR);
+
+		FileUtils.deleteDirectory(new File(webTmpDir));
+	}
+
 	// --------------------------------------------------
 	// Abstract methods
 	// --------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/e1805583/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index bc75a54..0da6f33 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -758,7 +758,7 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync
{
 		}
 
 		if (dispatcherRestEndpoint != null) {
-			terminationFutures.add(dispatcherRestEndpoint.shutDownAsync());
+			terminationFutures.add(dispatcherRestEndpoint.closeAsync());
 
 			dispatcherRestEndpoint = null;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/e1805583/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
index 80d9140..15fbbb2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
@@ -25,8 +25,8 @@ import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.rest.handler.PipelineErrorHandler;
 import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
 import org.apache.flink.runtime.rest.handler.RouterHandler;
+import org.apache.flink.util.AutoCloseableAsync;
 import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.Preconditions;
 
 import org.apache.flink.shaded.netty4.io.netty.bootstrap.ServerBootstrap;
@@ -67,7 +67,7 @@ import java.util.concurrent.TimeoutException;
 /**
  * An abstract class for netty-based REST server endpoints.
  */
-public abstract class RestServerEndpoint {
+public abstract class RestServerEndpoint implements AutoCloseableAsync {
 
 	protected final Logger log = LoggerFactory.getLogger(getClass());
 
@@ -256,7 +256,8 @@ public abstract class RestServerEndpoint {
 		}
 	}
 
-	public final CompletableFuture<Void> shutDownAsync() {
+	@Override
+	public CompletableFuture<Void> closeAsync() {
 		synchronized (lock) {
 			log.info("Shutting down rest endpoint.");
 
@@ -370,12 +371,7 @@ public abstract class RestServerEndpoint {
 						});
 			});
 
-			return FutureUtils.runAfterwards(
-				channelTerminationFuture,
-				() -> {
-					log.info("Cleaning upload directory {}", uploadDir);
-					FileUtils.cleanDirectory(uploadDir.toFile());
-				});
+			return channelTerminationFuture;
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e1805583/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java
index 1fac08e..8af76f5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java
@@ -36,7 +36,6 @@ import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.Collections;
 import java.util.Map;
-import java.util.UUID;
 
 import static java.util.Objects.requireNonNull;
 
@@ -172,7 +171,7 @@ public final class RestServerEndpointConfiguration {
 
 		final Path uploadDir = Paths.get(
 			config.getString(WebOptions.UPLOAD_DIR,	config.getString(WebOptions.TMP_DIR)),
-			"flink-web-upload-" + UUID.randomUUID());
+			"flink-web-upload");
 
 		final int maxContentLength = config.getInteger(RestOptions.REST_SERVER_MAX_CONTENT_LENGTH);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e1805583/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java
index f92946b..3f6516a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java
@@ -23,8 +23,8 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.util.Preconditions;
 
-import java.io.File;
-import java.util.UUID;
+import java.nio.file.Path;
+import java.nio.file.Paths;
 
 /**
  * Configuration object containing values for the rest handler configuration.
@@ -37,20 +37,20 @@ public class RestHandlerConfiguration {
 
 	private final Time timeout;
 
-	private final File tmpDir;
+	private final Path webUiDir;
 
 	public RestHandlerConfiguration(
 			long refreshInterval,
 			int maxCheckpointStatisticCacheEntries,
 			Time timeout,
-			File tmpDir) {
+			Path webUiDir) {
 		Preconditions.checkArgument(refreshInterval > 0L, "The refresh interval (ms) should
be larger than 0.");
 		this.refreshInterval = refreshInterval;
 
 		this.maxCheckpointStatisticCacheEntries = maxCheckpointStatisticCacheEntries;
 
 		this.timeout = Preconditions.checkNotNull(timeout);
-		this.tmpDir = Preconditions.checkNotNull(tmpDir);
+		this.webUiDir = Preconditions.checkNotNull(webUiDir);
 	}
 
 	public long getRefreshInterval() {
@@ -65,8 +65,8 @@ public class RestHandlerConfiguration {
 		return timeout;
 	}
 
-	public File getTmpDir() {
-		return tmpDir;
+	public Path getWebUiDir() {
+		return webUiDir;
 	}
 
 	public static RestHandlerConfiguration fromConfiguration(Configuration configuration) {
@@ -76,13 +76,13 @@ public class RestHandlerConfiguration {
 
 		final Time timeout = Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT));
 
-		final String rootDir = "flink-web-" + UUID.randomUUID();
-		final File tmpDir = new File(configuration.getString(WebOptions.TMP_DIR), rootDir);
+		final String rootDir = "flink-web-ui";
+		final Path webUiDir = Paths.get(configuration.getString(WebOptions.TMP_DIR), rootDir);
 
 		return new RestHandlerConfiguration(
 			refreshInterval,
 			maxCheckpointStatisticCacheEntries,
 			timeout,
-			tmpDir);
+			webUiDir);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e1805583/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
index 50ad7eb..d4aa94e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
@@ -127,6 +127,7 @@ import javax.annotation.Nonnull;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
@@ -498,7 +499,7 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends
RestServerEndp
 			executor,
 			metricFetcher);
 
-		final File tmpDir = restConfiguration.getTmpDir();
+		final Path webUiDir = restConfiguration.getWebUiDir();
 
 		Optional<StaticFileServerHandler<T>> optWebContent;
 
@@ -507,7 +508,7 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends
RestServerEndp
 				leaderRetriever,
 				restAddressFuture,
 				timeout,
-				tmpDir);
+				webUiDir.toFile());
 		} catch (IOException e) {
 			log.warn("Could not load web content handler.", e);
 			optWebContent = Optional.empty();
@@ -635,15 +636,15 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends
RestServerEndp
 
 		final CompletableFuture<Void> shutdownFuture = super.shutDownInternal();
 
-		final File tmpDir = restConfiguration.getTmpDir();
+		final Path webUiDir = restConfiguration.getWebUiDir();
 
 		return FutureUtils.runAfterwardsAsync(
 			shutdownFuture,
 			() -> {
 				Exception exception = null;
 				try {
-					log.info("Removing cache directory {}", tmpDir);
-					FileUtils.deleteDirectory(tmpDir);
+					log.info("Removing cache directory {}", webUiDir);
+					FileUtils.deleteDirectory(webUiDir.toFile());
 				} catch (Exception e) {
 					exception = e;
 				}

http://git-wip-us.apache.org/repos/asf/flink/blob/e1805583/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
index 784c141..e510798 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
@@ -154,7 +154,7 @@ public class RestServerEndpointITCase extends TestLogger {
 		}
 
 		if (serverEndpoint != null) {
-			serverEndpoint.shutDownAsync().get();
+			serverEndpoint.close();
 			serverEndpoint = null;
 		}
 	}


Mime
View raw message