flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tillrohrmann <...@git.apache.org>
Subject [GitHub] flink pull request #4918: [FLINK-7940] Add FutureUtils.orTimeout
Date Mon, 30 Oct 2017 10:04:35 GMT
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4918#discussion_r147658822
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
---
    @@ -446,19 +449,67 @@ public void onComplete(Throwable failure, T success) throws Throwable
{
     		return result;
     	}
     
    -	// ------------------------------------------------------------------------
    -	//  Future Completed with an exception.
    -	// ------------------------------------------------------------------------
    +	/**
    +	 * Times the given future out after the timeout.
    +	 *
    +	 * @param future to time out
    +	 * @param timeout after which the given future is timed out
    +	 * @param timeUnit time unit of the timeout
    +	 * @param <T> type of the given future
    +	 * @return The timeout enriched future
    +	 */
    +	public static <T> CompletableFuture<T> orTimeout(CompletableFuture<T>
future, long timeout, TimeUnit timeUnit) {
    +		final ScheduledFuture<?> timeoutFuture = Delayer.delay(new Timeout(future), timeout,
timeUnit);
    +
    +		future.whenComplete((T value, Throwable throwable) -> {
    +			if (!timeoutFuture.isDone()) {
    +				timeoutFuture.cancel(false);
    +			}
    +		});
    +
    +		return future;
    +	}
     
     	/**
    -	 * Returns a {@link CompletableFuture} that has failed with the exception
    -	 * provided as argument.
    -	 * @param throwable the exception to fail the future with.
    -	 * @return The failed future.
    +	 * Runnable to complete the given future with a {@link TimeoutException}.
     	 */
    -	public static <T> CompletableFuture<T> getFailedFuture(Throwable throwable)
{
    -		CompletableFuture<T> failedAttempt = new CompletableFuture<>();
    -		failedAttempt.completeExceptionally(throwable);
    -		return failedAttempt;
    +	static final class Timeout implements Runnable {
    +
    +		private final CompletableFuture<?> future;
    +
    +		Timeout(CompletableFuture<?> future) {
    +			this.future = Preconditions.checkNotNull(future);
    +		}
    +
    +		@Override
    +		public void run() {
    +			future.completeExceptionally(new TimeoutException());
    +		}
    +	}
    +
    +	/**
    +	 * Delay scheduler used to timeout futures.
    +	 *
    +	 * <p>This class creates a singleton scheduler used to run the provided actions.
    +	 */
    +	private static final class Delayer {
    +		static final ScheduledThreadPoolExecutor delayer = new ScheduledThreadPoolExecutor(
    +			1,
    +			new ExecutorThreadFactory("CompletableFutureDelayScheduler"));
    --- End diff --
    
    Good point. Will add it.


---

Mime
View raw message