flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [2/9] flink git commit: [FLINK-8669] Add completeAll and runAfterwards(Async) to FutureUtils
Date Mon, 19 Feb 2018 17:08:25 GMT
[FLINK-8669] Add completeAll and runAfterwards(Async) to FutureUtils

FutureUtils#completeAll(Collection) takes a collection of futures and returns
a future which is completed after all of the given futures are completed. This
also includes exceptional completions. Potentially occurring exceptions are
recorded and combined into a single exception with which the resulting future
is completed.

FutureUtils#runAfterwards takes a future and runs a given action after the
completion of the given future. This also includes an exceptional completion.
In this case, a potentially occurring exception as the result of the provided
action will be combined with the future's exception.

This closes #5503.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/62b6cea6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/62b6cea6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/62b6cea6

Branch: refs/heads/master
Commit: 62b6cea685fbf83e475d3dd5d05614fc533b410c
Parents: 5909b5b
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Fri Feb 16 10:08:04 2018 +0100
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Mon Feb 19 16:04:17 2018 +0100

----------------------------------------------------------------------
 .../flink/runtime/concurrent/FutureUtils.java   | 178 ++++++++++++++++---
 .../runtime/concurrent/FutureUtilsTest.java     | 154 ++++++++++++++++
 2 files changed, 309 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/62b6cea6/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
index e9310c0..181bc5d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
@@ -20,7 +20,9 @@ package org.apache.flink.runtime.concurrent;
 
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.RunnableWithException;
 
 import akka.dispatch.OnComplete;
 
@@ -241,6 +243,83 @@ public class FutureUtils {
 		}
 	}
 
+	/**
+	 * Times the given future out after the timeout.
+	 *
+	 * @param future to time out
+	 * @param timeout after which the given future is timed out
+	 * @param timeUnit time unit of the timeout
+	 * @param <T> type of the given future
+	 * @return The timeout enriched future
+	 */
+	public static <T> CompletableFuture<T> orTimeout(CompletableFuture<T>
future, long timeout, TimeUnit timeUnit) {
+		if (!future.isDone()) {
+			final ScheduledFuture<?> timeoutFuture = Delayer.delay(new Timeout(future), timeout,
timeUnit);
+
+			future.whenComplete((T value, Throwable throwable) -> {
+				if (!timeoutFuture.isDone()) {
+					timeoutFuture.cancel(false);
+				}
+			});
+		}
+
+		return future;
+	}
+
+	// ------------------------------------------------------------------------
+	//  Future actions
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Run the given action after the completion of the given future. The given future can be
+	 * completed normally or exceptionally. In case of an exceptional completion the, the
+	 * action's exception will be added to the initial exception.
+	 *
+	 * @param future to wait for its completion
+	 * @param runnable action which is triggered after the future's completion
+	 * @return Future which is completed after the action has completed. This future can contain
an exception,
+	 * if an error occurred in the given future or action.
+	 */
+	public static CompletableFuture<Void> runAfterwards(CompletableFuture<?> future,
RunnableWithException runnable) {
+		return runAfterwardsAsync(future, runnable, Executors.directExecutor());
+	}
+
+	/**
+	 * Run the given action after the completion of the given future. The given future can be
+	 * completed normally or exceptionally. In case of an exceptional completion the, the
+	 * action's exception will be added to the initial exception.
+	 *
+	 * @param future to wait for its completion
+	 * @param runnable action which is triggered after the future's completion
+	 * @param executor to run the given action
+	 * @return Future which is completed after the action has completed. This future can contain
an exception,
+	 * if an error occurred in the given future or action.
+	 */
+	public static CompletableFuture<Void> runAfterwardsAsync(
+		CompletableFuture<?> future,
+		RunnableWithException runnable,
+		Executor executor) {
+		final CompletableFuture<Void> resultFuture = new CompletableFuture<>();
+
+		future.whenCompleteAsync(
+			(Object ignored, Throwable throwable) -> {
+				try {
+					runnable.run();
+				} catch (Throwable e) {
+					throwable = ExceptionUtils.firstOrSuppressed(e, throwable);
+				}
+
+				if (throwable != null) {
+					resultFuture.completeExceptionally(throwable);
+				} else {
+					resultFuture.complete(null);
+				}
+			},
+			executor);
+
+		return resultFuture;
+	}
+
 	// ------------------------------------------------------------------------
 	//  composing futures
 	// ------------------------------------------------------------------------
@@ -414,6 +493,82 @@ public class FutureUtils {
 		}
 	}
 
+	/**
+	 * Creates a {@link ConjunctFuture} which is only completed after all given futures have
completed.
+	 * Unlike {@link FutureUtils#waitForAll(Collection)}, the resulting future won't be completed
directly
+	 * if one of the given futures is completed exceptionally. Instead, all occurring exception
will be
+	 * collected and combined to a single exception. If at least on exception occurs, then the
resulting
+	 * future will be completed exceptionally.
+	 *
+	 * @param futuresToComplete futures to complete
+	 * @return Future which is completed after all given futures have been completed.
+	 */
+	public static ConjunctFuture<Void> completeAll(Collection<? extends CompletableFuture<?>>
futuresToComplete) {
+		return new CompletionConjunctFuture(futuresToComplete);
+	}
+
+	/**
+	 * {@link ConjunctFuture} implementation which is completed after all the given futures
have been
+	 * completed. Exceptional completions of the input futures will be recorded but it won't
trigger the
+	 * early completion of this future.
+	 */
+	private static final class CompletionConjunctFuture extends ConjunctFuture<Void> {
+
+		private final Object lock = new Object();
+
+		private final int numFuturesTotal;
+
+		private int futuresCompleted;
+
+		private Throwable globalThrowable;
+
+		private CompletionConjunctFuture(Collection<? extends CompletableFuture<?>>
futuresToComplete) {
+			numFuturesTotal = futuresToComplete.size();
+
+			futuresCompleted = 0;
+
+			globalThrowable = null;
+
+			if (futuresToComplete.isEmpty()) {
+				complete(null);
+			} else {
+				for (CompletableFuture<?> completableFuture : futuresToComplete) {
+					completableFuture.whenComplete(this::completeFuture);
+				}
+			}
+		}
+
+		private void completeFuture(Object ignored, Throwable throwable) {
+			synchronized (lock) {
+				futuresCompleted++;
+
+				if (throwable != null) {
+					globalThrowable = ExceptionUtils.firstOrSuppressed(throwable, globalThrowable);
+				}
+
+				if (futuresCompleted == numFuturesTotal) {
+					if (globalThrowable != null) {
+						completeExceptionally(globalThrowable);
+					} else {
+						complete(null);
+					}
+				}
+			}
+		}
+
+		@Override
+		public int getNumFuturesTotal() {
+			return numFuturesTotal;
+		}
+
+		@Override
+		public int getNumFuturesCompleted() {
+			synchronized (lock) {
+				return futuresCompleted;
+			}
+		}
+	}
+
 	// ------------------------------------------------------------------------
 	//  Helper methods
 	// ------------------------------------------------------------------------
@@ -481,29 +636,6 @@ public class FutureUtils {
 	}
 
 	/**
-	 * Times the given future out after the timeout.
-	 *
-	 * @param future to time out
-	 * @param timeout after which the given future is timed out
-	 * @param timeUnit time unit of the timeout
-	 * @param <T> type of the given future
-	 * @return The timeout enriched future
-	 */
-	public static <T> CompletableFuture<T> orTimeout(CompletableFuture<T>
future, long timeout, TimeUnit timeUnit) {
-		if (!future.isDone()) {
-			final ScheduledFuture<?> timeoutFuture = Delayer.delay(new Timeout(future), timeout,
timeUnit);
-
-			future.whenComplete((T value, Throwable throwable) -> {
-				if (!timeoutFuture.isDone()) {
-					timeoutFuture.cancel(false);
-				}
-			});
-		}
-
-		return future;
-	}
-
-	/**
 	 * Runnable to complete the given future with a {@link TimeoutException}.
 	 */
 	private static final class Timeout implements Runnable {

http://git-wip-us.apache.org/repos/asf/flink/blob/62b6cea6/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
index cbc8a9a..7ad1638 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
@@ -27,7 +27,9 @@ import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 
+import java.util.Arrays;
 import java.util.Collection;
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutionException;
@@ -41,10 +43,15 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Supplier;
 
 import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.Matchers.arrayContaining;
+import static org.hamcrest.Matchers.emptyArray;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 /**
  * Tests for the utility methods in {@link FutureUtils}.
@@ -270,4 +277,151 @@ public class FutureUtilsTest extends TestLogger {
 			retryExecutor.shutdownNow();
 		}
 	}
+
+	@Test
+	public void testRunAfterwards() throws Exception {
+		final CompletableFuture<Void> inputFuture = new CompletableFuture<>();
+		final OneShotLatch runnableLatch = new OneShotLatch();
+
+		final CompletableFuture<Void> runFuture = FutureUtils.runAfterwards(
+			inputFuture,
+			runnableLatch::trigger);
+
+		assertThat(runnableLatch.isTriggered(), is(false));
+		assertThat(runFuture.isDone(), is(false));
+
+		inputFuture.complete(null);
+
+		assertThat(runnableLatch.isTriggered(), is(true));
+		assertThat(runFuture.isDone(), is(true));
+
+		// check that this future is not exceptionally completed
+		runFuture.get();
+	}
+
+	@Test
+	public void testRunAfterwardsExceptional() throws Exception {
+		final CompletableFuture<Void> inputFuture = new CompletableFuture<>();
+		final OneShotLatch runnableLatch = new OneShotLatch();
+		final FlinkException testException = new FlinkException("Test exception");
+
+		final CompletableFuture<Void> runFuture = FutureUtils.runAfterwards(
+			inputFuture,
+			runnableLatch::trigger);
+
+		assertThat(runnableLatch.isTriggered(), is(false));
+		assertThat(runFuture.isDone(), is(false));
+
+		inputFuture.completeExceptionally(testException);
+
+		assertThat(runnableLatch.isTriggered(), is(true));
+		assertThat(runFuture.isDone(), is(true));
+
+		try {
+			runFuture.get();
+			fail("Expected an exceptional completion");
+		} catch (ExecutionException ee) {
+			assertThat(ExceptionUtils.stripExecutionException(ee), is(testException));
+		}
+	}
+
+	@Test
+	public void testCompleteAll() throws Exception {
+		final CompletableFuture<String> inputFuture1 = new CompletableFuture<>();
+		final CompletableFuture<Integer> inputFuture2 = new CompletableFuture<>();
+
+		final List<CompletableFuture<?>> futuresToComplete = Arrays.asList(inputFuture1,
inputFuture2);
+		final FutureUtils.ConjunctFuture<Void> completeFuture = FutureUtils.completeAll(futuresToComplete);
+
+		assertThat(completeFuture.isDone(), is(false));
+		assertThat(completeFuture.getNumFuturesCompleted(), is(0));
+		assertThat(completeFuture.getNumFuturesTotal(), is(futuresToComplete.size()));
+
+		inputFuture2.complete(42);
+
+		assertThat(completeFuture.isDone(), is(false));
+		assertThat(completeFuture.getNumFuturesCompleted(), is(1));
+
+		inputFuture1.complete("foobar");
+
+		assertThat(completeFuture.isDone(), is(true));
+		assertThat(completeFuture.getNumFuturesCompleted(), is(2));
+
+		completeFuture.get();
+	}
+
+	@Test
+	public void testCompleteAllPartialExceptional() throws Exception {
+		final CompletableFuture<String> inputFuture1 = new CompletableFuture<>();
+		final CompletableFuture<Integer> inputFuture2 = new CompletableFuture<>();
+
+		final List<CompletableFuture<?>> futuresToComplete = Arrays.asList(inputFuture1,
inputFuture2);
+		final FutureUtils.ConjunctFuture<Void> completeFuture = FutureUtils.completeAll(futuresToComplete);
+
+		assertThat(completeFuture.isDone(), is(false));
+		assertThat(completeFuture.getNumFuturesCompleted(), is(0));
+		assertThat(completeFuture.getNumFuturesTotal(), is(futuresToComplete.size()));
+
+		final FlinkException testException1 = new FlinkException("Test exception 1");
+		inputFuture2.completeExceptionally(testException1);
+
+		assertThat(completeFuture.isDone(), is(false));
+		assertThat(completeFuture.getNumFuturesCompleted(), is(1));
+
+		inputFuture1.complete("foobar");
+
+		assertThat(completeFuture.isDone(), is(true));
+		assertThat(completeFuture.getNumFuturesCompleted(), is(2));
+
+		try {
+			completeFuture.get();
+			fail("Expected an exceptional completion");
+		} catch (ExecutionException ee) {
+			assertThat(ExceptionUtils.stripExecutionException(ee), is(testException1));
+		}
+	}
+
+	@Test
+	public void testCompleteAllExceptional() throws Exception {
+		final CompletableFuture<String> inputFuture1 = new CompletableFuture<>();
+		final CompletableFuture<Integer> inputFuture2 = new CompletableFuture<>();
+
+		final List<CompletableFuture<?>> futuresToComplete = Arrays.asList(inputFuture1,
inputFuture2);
+		final FutureUtils.ConjunctFuture<Void> completeFuture = FutureUtils.completeAll(futuresToComplete);
+
+		assertThat(completeFuture.isDone(), is(false));
+		assertThat(completeFuture.getNumFuturesCompleted(), is(0));
+		assertThat(completeFuture.getNumFuturesTotal(), is(futuresToComplete.size()));
+
+		final FlinkException testException1 = new FlinkException("Test exception 1");
+		inputFuture1.completeExceptionally(testException1);
+
+		assertThat(completeFuture.isDone(), is(false));
+		assertThat(completeFuture.getNumFuturesCompleted(), is(1));
+
+		final FlinkException testException2 = new FlinkException("Test exception 2");
+		inputFuture2.completeExceptionally(testException2);
+
+		assertThat(completeFuture.isDone(), is(true));
+		assertThat(completeFuture.getNumFuturesCompleted(), is(2));
+
+		try {
+			completeFuture.get();
+			fail("Expected an exceptional completion");
+		} catch (ExecutionException ee) {
+			final Throwable actual = ExceptionUtils.stripExecutionException(ee);
+
+			final Throwable[] suppressed = actual.getSuppressed();
+			final FlinkException suppressedException;
+
+			if (actual.equals(testException1)) {
+				 suppressedException = testException2;
+			} else {
+				suppressedException = testException1;
+			}
+
+			assertThat(suppressed, is(not(emptyArray())));
+			assertThat(suppressed, arrayContaining(suppressedException));
+		}
+	}
 }


Mime
View raw message