flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kl0u <...@git.apache.org>
Subject [GitHub] flink pull request #5062: [FLINK-7880][QS] Wait for proper resource cleanup ...
Date Wed, 06 Dec 2017 10:45:18 GMT
Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5062#discussion_r155201806
  
    --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
---
    @@ -251,34 +263,81 @@ private boolean attemptToBind(final int port) throws Throwable {
     			throw future.cause();
     		} catch (BindException e) {
     			log.debug("Failed to start {} on port {}: {}.", serverName, port, e.getMessage());
    -			shutdown();
    +			try {
    +				shutdownServer(Time.seconds(10L)).get();
    +			} catch (Exception r) {
    +
    +				// Here we were seeing this problem:
    +				// https://github.com/netty/netty/issues/4357 if we do a get().
    +				// this is why we now simply wait a bit so that everything is
    +				// shut down and then we check
    +
    +				log.warn("Problem while shutting down {}: {}", serverName, r.getMessage());
    +			}
     		}
     		// any other type of exception we let it bubble up.
     		return false;
     	}
     
     	/**
     	 * Shuts down the server and all related thread pools.
    +	 * @param timeout The time to wait for the shutdown process to complete.
    +	 * @return A {@link CompletableFuture} that will be completed upon termination of the
shutdown process.
     	 */
    -	public void shutdown() {
    -		log.info("Shutting down {} @ {}", serverName, serverAddress);
    -
    -		if (handler != null) {
    -			handler.shutdown();
    -			handler = null;
    -		}
    -
    -		if (queryExecutor != null) {
    -			queryExecutor.shutdown();
    -		}
    +	public CompletableFuture<Void> shutdownServer(Time timeout) throws InterruptedException
{
    +		CompletableFuture<Void> shutdownFuture = new CompletableFuture<>();
    +		if (serverShutdownFuture.compareAndSet(null, shutdownFuture)) {
    +			log.info("Shutting down {} @ {}", serverName, serverAddress);
    +
    +			final CompletableFuture<Void> groupShutdownFuture = new CompletableFuture<>();
    +			if (bootstrap != null) {
    +				EventLoopGroup group = bootstrap.group();
    +				if (group != null && !group.isShutdown()) {
    +					group.shutdownGracefully(0L, 0L, TimeUnit.MILLISECONDS)
    +							.addListener(finished -> {
    +								if (finished.isSuccess()) {
    +									groupShutdownFuture.complete(null);
    +								} else {
    +									groupShutdownFuture.completeExceptionally(finished.cause());
    +								}
    +							});
    +				} else {
    +					groupShutdownFuture.complete(null);
    +				}
    +			} else {
    +				groupShutdownFuture.complete(null);
    +			}
     
    -		if (bootstrap != null) {
    -			EventLoopGroup group = bootstrap.group();
    -			if (group != null) {
    -				group.shutdownGracefully(0L, 10L, TimeUnit.SECONDS);
    +			final CompletableFuture<Void> handlerShutdownFuture = new CompletableFuture<>();
    +			if (handler == null) {
    +				handlerShutdownFuture.complete(null);
    +			} else {
    +				handler.shutdown().whenComplete((result, throwable) -> {
    +					if (throwable != null) {
    +						handlerShutdownFuture.completeExceptionally(throwable);
    +					} else {
    +						handlerShutdownFuture.complete(null);
    +					}
    +				});
     			}
    +
    +			final CompletableFuture<Void> queryExecShutdownFuture = CompletableFuture.runAsync(()
-> {
    +				if (queryExecutor != null) {
    +					ExecutorUtils.gracefulShutdown(timeout.toMilliseconds(), TimeUnit.MILLISECONDS,
queryExecutor);
    +				}
    +			});
    +
    +			CompletableFuture.allOf(
    +					queryExecShutdownFuture, groupShutdownFuture, handlerShutdownFuture
    +			).whenComplete((result, throwable) -> {
    +				if (throwable != null) {
    +					shutdownFuture.completeExceptionally(throwable);
    +				} else {
    +					shutdownFuture.complete(null);
    +				}
    +			});
     		}
    -		serverAddress = null;
    +		return serverShutdownFuture.get();
    --- End diff --
    
    Ok I will add the check in the `start()`.


---

Mime
View raw message