flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [1/9] flink git commit: [FLINK-8675] Add non-blocking shut down method to RestServerEndpoint
Date Mon, 19 Feb 2018 17:08:24 GMT
Repository: flink
Updated Branches:
  refs/heads/master 5909b5bb7 -> 3d52f52e9


[FLINK-8675] Add non-blocking shut down method to RestServerEndpoint

Make shut down method of RestServerEndpoint non blocking.

This closes #5511.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fac5aff9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fac5aff9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fac5aff9

Branch: refs/heads/master
Commit: fac5aff979976b19b49a65243cfcc788dece8bcf
Parents: 62b6cea
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Fri Feb 16 18:01:58 2018 +0100
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Mon Feb 19 16:04:17 2018 +0100

----------------------------------------------------------------------
 .../program/rest/RestClusterClientTest.java     |   5 +-
 .../flink/runtime/concurrent/FutureUtils.java   |  15 +++
 .../runtime/entrypoint/ClusterEntrypoint.java   |   2 +-
 .../flink/runtime/rest/RestServerEndpoint.java  | 133 ++++++++++++-------
 .../runtime/webmonitor/WebMonitorEndpoint.java  |  44 +++---
 .../runtime/rest/RestServerEndpointITCase.java  |   7 +-
 6 files changed, 137 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fac5aff9/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
index f54d9d2..5b1a3f8 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
@@ -596,8 +596,11 @@ public class RestClusterClientTest extends TestLogger {
 		}
 
 		@Override
+		protected void startInternal() throws Exception {}
+
+		@Override
 		public void close() throws Exception {
-			shutdown(Time.seconds(5));
+			shutDownAsync().get();
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fac5aff9/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
index 181bc5d..4a253b0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
@@ -33,6 +33,7 @@ import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -291,6 +292,20 @@ public class FutureUtils {
 	 *
 	 * @param future to wait for its completion
 	 * @param runnable action which is triggered after the future's completion
+	 * @return Future which is completed after the action has completed. This future can contain
an exception,
+	 * if an error occurred in the given future or action.
+	 */
+	public static CompletableFuture<Void> runAfterwardsAsync(CompletableFuture<?>
future, RunnableWithException runnable) {
+		return runAfterwardsAsync(future, runnable, ForkJoinPool.commonPool());
+	}
+
+	/**
+	 * Run the given action after the completion of the given future. The given future can be
+	 * completed normally or exceptionally. In case of an exceptional completion the, the
+	 * action's exception will be added to the initial exception.
+	 *
+	 * @param future to wait for its completion
+	 * @param runnable action which is triggered after the future's completion
 	 * @param executor to run the given action
 	 * @return Future which is completed after the action has completed. This future can contain
an exception,
 	 * if an error occurred in the given future or action.

http://git-wip-us.apache.org/repos/asf/flink/blob/fac5aff9/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index 584fcae..ccb3ae4 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -454,7 +454,7 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
 			Throwable exception = null;
 
 			if (webMonitorEndpoint != null) {
-				webMonitorEndpoint.shutdown(Time.seconds(10L));
+				webMonitorEndpoint.shutDownAsync().get();
 			}
 
 			if (dispatcherLeaderRetrievalService != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/fac5aff9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
index dbb25a7..bade160 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.rest;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.rest.handler.PipelineErrorHandler;
 import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
 import org.apache.flink.runtime.rest.handler.RouterHandler;
@@ -45,6 +46,7 @@ import org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultThreadFact
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
 import javax.net.ssl.SSLEngine;
 
 import java.io.IOException;
@@ -52,6 +54,7 @@ import java.io.Serializable;
 import java.net.InetSocketAddress;
 import java.nio.file.Files;
 import java.nio.file.Path;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
@@ -73,11 +76,13 @@ public abstract class RestServerEndpoint {
 	private final SSLEngine sslEngine;
 	private final Path uploadDir;
 
+	private final CompletableFuture<Void> terminationFuture;
+
 	private ServerBootstrap bootstrap;
 	private Channel serverChannel;
 	private String restAddress;
 
-	private volatile boolean started;
+	private State state = State.CREATED;
 
 	public RestServerEndpoint(RestServerEndpointConfiguration configuration) throws IOException
{
 		Preconditions.checkNotNull(configuration);
@@ -88,9 +93,9 @@ public abstract class RestServerEndpoint {
 		this.uploadDir = configuration.getUploadDir();
 		createUploadDir(uploadDir, log);
 
-		this.restAddress = null;
+		terminationFuture = new CompletableFuture<>();
 
-		this.started = false;
+		this.restAddress = null;
 	}
 
 	/**
@@ -107,12 +112,9 @@ public abstract class RestServerEndpoint {
 	 *
 	 * @throws Exception if we cannot start the RestServerEndpoint
 	 */
-	public void start() throws Exception {
+	public final void start() throws Exception {
 		synchronized (lock) {
-			if (started) {
-				// RestServerEndpoint already started
-				return;
-			}
+			Preconditions.checkState(state == State.CREATED, "The RestServerEndpoint cannot be restarted.");
 
 			log.info("Starting rest endpoint.");
 
@@ -192,28 +194,40 @@ public abstract class RestServerEndpoint {
 
 			restAddressFuture.complete(restAddress);
 
-			started = true;
+			state = State.RUNNING;
+
+			startInternal();
 		}
 	}
 
 	/**
+	 * Hook to start sub class specific services.
+	 *
+	 * @throws Exception if an error occurred
+	 */
+	protected abstract void startInternal() throws Exception;
+
+	/**
 	 * Returns the address on which this endpoint is accepting requests.
 	 *
-	 * @return address on which this endpoint is accepting requests
+	 * @return address on which this endpoint is accepting requests or null if none
 	 */
+	@Nullable
 	public InetSocketAddress getServerAddress() {
-		Preconditions.checkState(started, "The RestServerEndpoint has not been started yet.");
-		Channel server = this.serverChannel;
-
-		if (server != null) {
-			try {
-				return ((InetSocketAddress) server.localAddress());
-			} catch (Exception e) {
-				log.error("Cannot access local server address", e);
+		synchronized (lock) {
+			Preconditions.checkState(state != State.CREATED, "The RestServerEndpoint has not been
started yet.");
+			Channel server = this.serverChannel;
+
+			if (server != null) {
+				try {
+					return ((InetSocketAddress) server.localAddress());
+				} catch (Exception e) {
+					log.error("Cannot access local server address", e);
+				}
 			}
-		}
 
-		return null;
+			return null;
+		}
 	}
 
 	/**
@@ -222,26 +236,49 @@ public abstract class RestServerEndpoint {
 	 * @return REST address of this endpoint
 	 */
 	public String getRestAddress() {
-		Preconditions.checkState(started, "The RestServerEndpoint has not been started yet.");
-		return restAddress;
+		synchronized (lock) {
+			Preconditions.checkState(state != State.CREATED, "The RestServerEndpoint has not been
started yet.");
+			return restAddress;
+		}
+	}
+
+	public final CompletableFuture<Void> shutDownAsync() {
+		synchronized (lock) {
+			log.info("Shutting down rest endpoint.");
+
+			if (state == State.RUNNING) {
+				final CompletableFuture<Void> shutDownFuture = shutDownInternal();
+
+				shutDownFuture.whenComplete(
+					(Void ignored, Throwable throwable) -> {
+						if (throwable != null) {
+							terminationFuture.completeExceptionally(throwable);
+						} else {
+							terminationFuture.complete(null);
+						}
+					});
+				state = State.SHUTDOWN;
+			} else if (state == State.CREATED) {
+				terminationFuture.complete(null);
+				state = State.SHUTDOWN;
+			}
+
+			return terminationFuture;
+		}
 	}
 
 	/**
 	 * Stops this REST server endpoint.
+	 *
+	 * @return Future which is completed once the shut down has been finished.
 	 */
-	public void shutdown(Time timeout) {
+	protected CompletableFuture<Void> shutDownInternal() {
 
 		synchronized (lock) {
-			if (!started) {
-				// RestServerEndpoint has not been started
-				return;
-			}
-
-			log.info("Shutting down rest endpoint.");
 
 			CompletableFuture<?> channelFuture = new CompletableFuture<>();
-			if (this.serverChannel != null) {
-				this.serverChannel.close().addListener(finished -> {
+			if (serverChannel != null) {
+				serverChannel.close().addListener(finished -> {
 					if (finished.isSuccess()) {
 						channelFuture.complete(null);
 					} else {
@@ -252,11 +289,12 @@ public abstract class RestServerEndpoint {
 			}
 			CompletableFuture<?> groupFuture = new CompletableFuture<>();
 			CompletableFuture<?> childGroupFuture = new CompletableFuture<>();
+			final Time gracePeriod = Time.seconds(10L);
 
 			channelFuture.thenRun(() -> {
 				if (bootstrap != null) {
 					if (bootstrap.group() != null) {
-						bootstrap.group().shutdownGracefully(0L, timeout.toMilliseconds(), TimeUnit.MILLISECONDS)
+						bootstrap.group().shutdownGracefully(0L, gracePeriod.toMilliseconds(), TimeUnit.MILLISECONDS)
 							.addListener(finished -> {
 								if (finished.isSuccess()) {
 									groupFuture.complete(null);
@@ -266,7 +304,7 @@ public abstract class RestServerEndpoint {
 							});
 					}
 					if (bootstrap.childGroup() != null) {
-						bootstrap.childGroup().shutdownGracefully(0L, timeout.toMilliseconds(), TimeUnit.MILLISECONDS)
+						bootstrap.childGroup().shutdownGracefully(0L, gracePeriod.toMilliseconds(), TimeUnit.MILLISECONDS)
 							.addListener(finished -> {
 								if (finished.isSuccess()) {
 									childGroupFuture.complete(null);
@@ -283,22 +321,15 @@ public abstract class RestServerEndpoint {
 				}
 			});
 
-			try {
-				CompletableFuture.allOf(groupFuture, childGroupFuture).get(timeout.toMilliseconds(),
TimeUnit.MILLISECONDS);
-				log.info("Rest endpoint shutdown complete.");
-			} catch (Exception e) {
-				log.warn("Rest endpoint shutdown failed.", e);
-			}
-
-			restAddress = null;
-			started = false;
+			final CompletableFuture<Void> channelTerminationFuture = FutureUtils.completeAll(
+				Arrays.asList(groupFuture, childGroupFuture));
 
-			try {
-				log.info("Cleaning upload directory {}", uploadDir);
-				FileUtils.cleanDirectory(uploadDir.toFile());
-			} catch (IOException e) {
-				log.warn("Error while cleaning upload directory {}", uploadDir, e);
-			}
+			return FutureUtils.runAfterwards(
+				channelTerminationFuture,
+				() -> {
+					log.info("Cleaning upload directory {}", uploadDir);
+					FileUtils.cleanDirectory(uploadDir.toFile());
+				});
 		}
 	}
 
@@ -433,4 +464,10 @@ public abstract class RestServerEndpoint {
 			}
 		}
 	}
+
+	private enum State {
+		CREATED,
+		RUNNING,
+		SHUTDOWN
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fac5aff9/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
index 10ad344..427332f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.TransientBlobService;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.leaderelection.LeaderContender;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
@@ -113,6 +114,7 @@ import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersHeaders;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.Preconditions;
 
@@ -607,31 +609,39 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends
RestServerEndp
 	}
 
 	@Override
-	public void start() throws Exception {
-		super.start();
+	public void startInternal() throws Exception {
 		leaderElectionService.start(this);
 	}
 
 	@Override
-	public void shutdown(Time timeout) {
+	protected CompletableFuture<Void> shutDownInternal() {
 		executionGraphCache.close();
 
-		final File tmpDir = restConfiguration.getTmpDir();
-
-		try {
-			log.info("Removing cache directory {}", tmpDir);
-			FileUtils.deleteDirectory(tmpDir);
-		} catch (Throwable t) {
-			log.warn("Error while deleting cache directory {}", tmpDir, t);
-		}
+		final CompletableFuture<Void> shutdownFuture = super.shutDownInternal();
 
-		try {
-			leaderElectionService.stop();
-		} catch (Exception e) {
-			log.warn("Error while stopping leaderElectionService", e);
-		}
+		final File tmpDir = restConfiguration.getTmpDir();
 
-		super.shutdown(timeout);
+		return FutureUtils.runAfterwardsAsync(
+			shutdownFuture,
+			() -> {
+				Exception exception = null;
+				try {
+					log.info("Removing cache directory {}", tmpDir);
+					FileUtils.deleteDirectory(tmpDir);
+				} catch (Exception e) {
+					exception = e;
+				}
+
+				try {
+					leaderElectionService.stop();
+				} catch (Exception e) {
+					exception = ExceptionUtils.firstOrSuppressed(e, exception);
+				}
+
+				if (exception != null) {
+					throw exception;
+				}
+			});
 	}
 
 	//-------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/fac5aff9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
index 50b26e3..3ad7ee5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
@@ -136,14 +136,14 @@ public class RestServerEndpointITCase extends TestLogger {
 	}
 
 	@After
-	public void teardown() {
+	public void teardown() throws Exception {
 		if (restClient != null) {
 			restClient.shutdown(timeout);
 			restClient = null;
 		}
 
 		if (serverEndpoint != null) {
-			serverEndpoint.shutdown(timeout);
+			serverEndpoint.shutDownAsync().get();
 			serverEndpoint = null;
 		}
 	}
@@ -316,6 +316,9 @@ public class RestServerEndpointITCase extends TestLogger {
 				Tuple2.of(new TestHeaders(), testHandler),
 				Tuple2.of(TestUploadHeaders.INSTANCE, testUploadHandler));
 		}
+
+		@Override
+		protected void startInternal() throws Exception {}
 	}
 
 	private static class TestHandler extends AbstractRestHandler<RestfulGateway, TestRequest,
TestResponse, TestParameters> {


Mime
View raw message