flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [1/5] flink git commit: [FLINK-6555] [futures] Generalize ConjunctFuture to return results
Date Wed, 17 May 2017 06:17:59 GMT
Repository: flink
Updated Branches:
  refs/heads/release-1.3 4104409cc -> 60873b0c5


[FLINK-6555] [futures] Generalize ConjunctFuture to return results

The ConjunctFuture now returns the set of future values once it is completed.

Introduce WaitingConjunctFuture; Fix thread safety issue with ResultConjunctFuture

The WaitingConjunctFuture waits for the completion of its futures. The future values
are discarded making it more efficient than the ResultConjunctFuture which returns
the futures' values. The WaitingConjunctFuture is instantiated via
FutureUtils.waitForAll(Collection<Future>).

This closes #3873.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9c6c9654
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9c6c9654
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9c6c9654

Branch: refs/heads/release-1.3
Commit: 9c6c9654e11dd31fa2323977fc02961811d6e518
Parents: 4104409
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Thu May 11 17:36:17 2017 +0200
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Wed May 17 08:16:50 2017 +0200

----------------------------------------------------------------------
 .../flink/runtime/concurrent/FutureUtils.java   | 131 +++++++++++++++----
 .../runtime/executiongraph/ExecutionGraph.java  |   8 +-
 .../executiongraph/ExecutionJobVertex.java      |   4 +-
 .../executiongraph/failover/FailoverRegion.java |   2 +-
 .../runtime/concurrent/FutureUtilsTest.java     |  83 ++++++++++--
 5 files changed, 184 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9c6c9654/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
index 4948147..a27af56 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
@@ -19,8 +19,11 @@
 package org.apache.flink.runtime.concurrent;
 
 import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.util.Preconditions;
 
+import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.concurrent.Callable;
 import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -106,8 +109,9 @@ public class FutureUtils {
 
 	/**
 	 * Creates a future that is complete once multiple other futures completed. 
-	 * The ConjunctFuture fails (completes exceptionally) once one of the Futures in the
-	 * conjunction fails.
+	 * The future fails (completes exceptionally) once one of the futures in the
+	 * conjunction fails. Upon successful completion, the future returns the
+	 * collection of the futures' results.
 	 *
 	 * <p>The ConjunctFuture gives access to how many Futures in the conjunction have
already
 	 * completed successfully, via {@link ConjunctFuture#getNumFuturesCompleted()}. 
@@ -115,16 +119,16 @@ public class FutureUtils {
 	 * @param futures The futures that make up the conjunction. No null entries are allowed.
 	 * @return The ConjunctFuture that completes once all given futures are complete (or one
fails).
 	 */
-	public static ConjunctFuture combineAll(Collection<? extends Future<?>> futures)
{
+	public static <T> ConjunctFuture<Collection<T>> combineAll(Collection<?
extends Future<? extends T>> futures) {
 		checkNotNull(futures, "futures");
 
-		final ConjunctFutureImpl conjunct = new ConjunctFutureImpl(futures.size());
+		final ResultConjunctFuture<T> conjunct = new ResultConjunctFuture<>(futures.size());
 
 		if (futures.isEmpty()) {
-			conjunct.complete(null);
+			conjunct.complete(Collections.<T>emptyList());
 		}
 		else {
-			for (Future<?> future : futures) {
+			for (Future<? extends T> future : futures) {
 				future.handle(conjunct.completionHandler);
 			}
 		}
@@ -133,16 +137,32 @@ public class FutureUtils {
 	}
 
 	/**
+	 * Creates a future that is complete once all of the given futures have completed.
+	 * The future fails (completes exceptionally) once one of the given futures
+	 * fails.
+	 *
+	 * <p>The ConjunctFuture gives access to how many Futures have already
+	 * completed successfully, via {@link ConjunctFuture#getNumFuturesCompleted()}.
+	 *
+	 * @param futures The futures to wait on. No null entries are allowed.
+	 * @return The WaitingFuture that completes once all given futures are complete (or one
fails).
+	 */
+	public static ConjunctFuture<Void> waitForAll(Collection<? extends Future<?>>
futures) {
+		checkNotNull(futures, "futures");
+
+		return new WaitingConjunctFuture(futures);
+	}
+
+	/**
 	 * A future that is complete once multiple other futures completed. The futures are not
-	 * necessarily of the same type, which is why the type of this Future is {@code Void}.
-	 * The ConjunctFuture fails (completes exceptionally) once one of the Futures in the
-	 * conjunction fails.
+	 * necessarily of the same type. The ConjunctFuture fails (completes exceptionally) once
+	 * one of the Futures in the conjunction fails.
 	 * 
 	 * <p>The advantage of using the ConjunctFuture over chaining all the futures (such
as via
 	 * {@link Future#thenCombine(Future, BiFunction)}) is that ConjunctFuture also tracks how
 	 * many of the Futures are already complete.
 	 */
-	public interface ConjunctFuture extends CompletableFuture<Void> {
+	public interface ConjunctFuture<T> extends CompletableFuture<T> {
 
 		/**
 		 * Gets the total number of Futures in the conjunction.
@@ -158,39 +178,102 @@ public class FutureUtils {
 	}
 
 	/**
-	 * The implementation of the {@link ConjunctFuture}.
-	 * 
-	 * <p>Implementation notice: The member fields all have package-private access, because
they are
-	 * either accessed by an inner subclass or by the enclosing class.
+	 * The implementation of the {@link ConjunctFuture} which returns its Futures' result as
a collection.
 	 */
-	private static class ConjunctFutureImpl extends FlinkCompletableFuture<Void> implements
ConjunctFuture {
+	private static class ResultConjunctFuture<T> extends FlinkCompletableFuture<Collection<T>>
implements ConjunctFuture<Collection<T>> {
 
 		/** The total number of futures in the conjunction */
-		final int numTotal;
+		private final int numTotal;
+
+		/** The next free index in the results arrays */
+		private final AtomicInteger nextIndex = new AtomicInteger(0);
 
 		/** The number of futures in the conjunction that are already complete */
-		final AtomicInteger numCompleted = new AtomicInteger();
+		private final AtomicInteger numCompleted = new AtomicInteger(0);
+
+		/** The set of collected results so far */
+		private volatile T[] results;
 
 		/** The function that is attached to all futures in the conjunction. Once a future
-		 * is complete, this function tracks the completion or fails the conjunct.  
+		 * is complete, this function tracks the completion or fails the conjunct.
 		 */
-		final BiFunction<Object, Throwable, Void> completionHandler = new BiFunction<Object,
Throwable, Void>() {
+		final BiFunction<T, Throwable, Void> completionHandler = new BiFunction<T, Throwable,
Void>() {
 
 			@Override
-			public Void apply(Object o, Throwable throwable) {
+			public Void apply(T o, Throwable throwable) {
 				if (throwable != null) {
 					completeExceptionally(throwable);
-				}
-				else if (numTotal == numCompleted.incrementAndGet()) {
-					complete(null);
+				} else {
+					int index = nextIndex.getAndIncrement();
+
+					results[index] = o;
+
+					if (numCompleted.incrementAndGet() == numTotal) {
+						complete(Arrays.asList(results));
+					}
 				}
 
 				return null;
 			}
 		};
 
-		ConjunctFutureImpl(int numTotal) {
+		@SuppressWarnings("unchecked")
+		ResultConjunctFuture(int numTotal) {
 			this.numTotal = numTotal;
+			results = (T[])new Object[numTotal];
+		}
+
+		@Override
+		public int getNumFuturesTotal() {
+			return numTotal;
+		}
+
+		@Override
+		public int getNumFuturesCompleted() {
+			return numCompleted.get();
+		}
+	}
+
+	/**
+	 * Implementation of the {@link ConjunctFuture} interface which waits only for the completion
+	 * of its futures and does not return their values.
+	 */
+	private static final class WaitingConjunctFuture extends FlinkCompletableFuture<Void>
implements ConjunctFuture<Void> {
+
+		/** Number of completed futures */
+		private final AtomicInteger numCompleted = new AtomicInteger(0);
+
+		/** Total number of futures to wait on */
+		private final int numTotal;
+
+		/** Handler which increments the atomic completion counter and completes or fails the WaitingFutureImpl
*/
+		private final BiFunction<Object, Throwable, Void> completionHandler = new BiFunction<Object,
Throwable, Void>() {
+			@Override
+			public Void apply(Object o, Throwable throwable) {
+				if (throwable == null) {
+					if (numTotal == numCompleted.incrementAndGet()) {
+						complete(null);
+					}
+				} else {
+					completeExceptionally(throwable);
+				}
+
+				return null;
+			}
+		};
+
+		private WaitingConjunctFuture(Collection<? extends Future<?>> futures) {
+			Preconditions.checkNotNull(futures, "Futures must not be null.");
+
+			this.numTotal = futures.size();
+
+			if (futures.isEmpty()) {
+				complete(null);
+			} else {
+				for (Future<?> future : futures) {
+					future.handle(completionHandler);
+				}
+			}
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/9c6c9654/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 5eaa637..7c13936 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -871,7 +871,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 
 			// this future is complete once all slot futures are complete.
 			// the future fails once one slot future fails.
-			final ConjunctFuture allAllocationsComplete = FutureUtils.combineAll(slotFutures);
+			final ConjunctFuture<Void> allAllocationsComplete = FutureUtils.waitForAll(slotFutures);
 
 			// make sure that we fail if the allocation timeout was exceeded
 			final ScheduledFuture<?> timeoutCancelHandle = futureExecutor.schedule(new Runnable()
{
@@ -892,7 +892,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 			allAllocationsComplete.handleAsync(new BiFunction<Void, Throwable, Void>() {
 
 				@Override
-				public Void apply(Void ignored, Throwable throwable) {
+				public Void apply(Void slots, Throwable throwable) {
 					try {
 						// we do not need the cancellation timeout any more
 						timeoutCancelHandle.cancel(false);
@@ -973,7 +973,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 					}
 
 					// we build a future that is complete once all vertices have reached a terminal state
-					final ConjunctFuture allTerminal = FutureUtils.combineAll(futures);
+					final ConjunctFuture<Void> allTerminal = FutureUtils.waitForAll(futures);
 					allTerminal.thenAccept(new AcceptFunction<Void>() {
 						@Override
 						public void accept(Void value) {
@@ -1102,7 +1102,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 					futures.add(ejv.cancelWithFuture());
 				}
 
-				final ConjunctFuture allTerminal = FutureUtils.combineAll(futures);
+				final ConjunctFuture<Void> allTerminal = FutureUtils.waitForAll(futures);
 				allTerminal.thenAccept(new AcceptFunction<Void>() {
 					@Override
 					public void accept(Void value) {

http://git-wip-us.apache.org/repos/asf/flink/blob/9c6c9654/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index 3a98e0a..f5a592a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -509,7 +509,7 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
 	 */
 	public Future<Void> cancelWithFuture() {
 		// we collect all futures from the task cancellations
-		ArrayList<Future<?>> futures = new ArrayList<>(parallelism);
+		ArrayList<Future<ExecutionState>> futures = new ArrayList<>(parallelism);
 
 		// cancel each vertex
 		for (ExecutionVertex ev : getTaskVertices()) {
@@ -517,7 +517,7 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
 		}
 
 		// return a conjunct future, which is complete once all individual tasks are canceled
-		return FutureUtils.combineAll(futures);
+		return FutureUtils.waitForAll(futures);
 	}
 
 	public void fail(Throwable t) {

http://git-wip-us.apache.org/repos/asf/flink/blob/9c6c9654/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java
index b36cfcf..6066c77 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java
@@ -150,7 +150,7 @@ public class FailoverRegion {
 						futures.add(vertex.cancel());
 					}
 
-					final FutureUtils.ConjunctFuture allTerminal = FutureUtils.combineAll(futures);
+					final FutureUtils.ConjunctFuture<Void> allTerminal = FutureUtils.waitForAll(futures);
 					allTerminal.thenAcceptAsync(new AcceptFunction<Void>() {
 						@Override
 						public void accept(Void value) {

http://git-wip-us.apache.org/repos/asf/flink/blob/9c6c9654/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
index 43710cb..e262459 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
@@ -21,10 +21,15 @@ package org.apache.flink.runtime.concurrent;
 import org.apache.flink.runtime.concurrent.FutureUtils.ConjunctFuture;
 import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 
+import org.apache.flink.util.TestLogger;
+import org.hamcrest.collection.IsIterableContainingInAnyOrder;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.concurrent.ExecutionException;
 
@@ -33,17 +38,26 @@ import static org.junit.Assert.*;
 /**
  * Tests for the utility methods in {@link FutureUtils}
  */
-public class FutureUtilsTest {
+@RunWith(Parameterized.class)
+public class FutureUtilsTest extends TestLogger{
+
+	@Parameterized.Parameters
+	public static Collection<FutureFactory> parameters (){
+		return Arrays.asList(new ConjunctFutureFactory(), new WaitingFutureFactory());
+	}
+
+	@Parameterized.Parameter
+	public FutureFactory futureFactory;
 
 	@Test
 	public void testConjunctFutureFailsOnEmptyAndNull() throws Exception {
 		try {
-			FutureUtils.combineAll(null);
+			futureFactory.createFuture(null);
 			fail();
 		} catch (NullPointerException ignored) {}
 
 		try {
-			FutureUtils.combineAll(Arrays.asList(
+			futureFactory.createFuture(Arrays.asList(
 					new FlinkCompletableFuture<Object>(),
 					null,
 					new FlinkCompletableFuture<Object>()));
@@ -63,11 +77,11 @@ public class FutureUtilsTest {
 		future2.complete(new Object());
 
 		// build the conjunct future
-		ConjunctFuture result = FutureUtils.combineAll(Arrays.asList(future1, future2, future3,
future4));
+		ConjunctFuture<?> result = futureFactory.createFuture(Arrays.asList(future1, future2,
future3, future4));
 
-		Future<Void> resultMapped = result.thenAccept(new AcceptFunction<Void>() {
+		Future<?> resultMapped = result.thenAccept(new AcceptFunction<Object>() {
 			@Override
-			public void accept(Void value) {}
+			public void accept(Object value) {}
 		});
 
 		assertEquals(4, result.getNumFuturesTotal());
@@ -108,11 +122,11 @@ public class FutureUtilsTest {
 		CompletableFuture<Object> future4 = new FlinkCompletableFuture<>();
 
 		// build the conjunct future
-		ConjunctFuture result = FutureUtils.combineAll(Arrays.asList(future1, future2, future3,
future4));
+		ConjunctFuture<?> result = futureFactory.createFuture(Arrays.asList(future1, future2,
future3, future4));
 
-		Future<Void> resultMapped = result.thenAccept(new AcceptFunction<Void>() {
+		Future<?> resultMapped = result.thenAccept(new AcceptFunction<Object>() {
 			@Override
-			public void accept(Void value) {}
+			public void accept(Object value) {}
 		});
 
 		assertEquals(4, result.getNumFuturesTotal());
@@ -150,12 +164,12 @@ public class FutureUtilsTest {
 		CompletableFuture<Object> future4 = new FlinkCompletableFuture<>();
 
 		// build the conjunct future
-		ConjunctFuture result = FutureUtils.combineAll(Arrays.asList(future1, future2, future3,
future4));
+		ConjunctFuture<?> result = futureFactory.createFuture(Arrays.asList(future1, future2,
future3, future4));
 		assertEquals(4, result.getNumFuturesTotal());
 
-		Future<Void> resultMapped = result.thenAccept(new AcceptFunction<Void>() {
+		Future<?> resultMapped = result.thenAccept(new AcceptFunction<Object>() {
 			@Override
-			public void accept(Void value) {}
+			public void accept(Object value) {}
 		});
 
 		future1.complete(new Object());
@@ -183,12 +197,55 @@ public class FutureUtilsTest {
 		}
 	}
 
+	/**
+	 * Tests that the conjunct future returns upon completion the collection of all future values
+	 */
+	@Test
+	public void testConjunctFutureValue() throws ExecutionException, InterruptedException {
+		CompletableFuture<Integer> future1 = FlinkCompletableFuture.completed(1);
+		CompletableFuture<Long> future2 = FlinkCompletableFuture.completed(2L);
+		CompletableFuture<Double> future3 = new FlinkCompletableFuture<>();
+
+		ConjunctFuture<Collection<Number>> result = FutureUtils.<Number>combineAll(Arrays.asList(future1,
future2, future3));
+
+		assertFalse(result.isDone());
+
+		future3.complete(.1);
+
+		assertTrue(result.isDone());
+
+		assertThat(result.get(), IsIterableContainingInAnyOrder.<Number>containsInAnyOrder(1,
2L, .1));
+	}
+
 	@Test
 	public void testConjunctOfNone() throws Exception {
-		final ConjunctFuture result = FutureUtils.combineAll(Collections.<Future<Object>>emptyList());
+		final ConjunctFuture<?> result = futureFactory.createFuture(Collections.<Future<Object>>emptyList());
 
 		assertEquals(0, result.getNumFuturesTotal());
 		assertEquals(0, result.getNumFuturesCompleted());
 		assertTrue(result.isDone());
 	}
+
+	/**
+	 * Factory to create {@link ConjunctFuture} for testing.
+	 */
+	private interface FutureFactory {
+		ConjunctFuture<?> createFuture(Collection<? extends Future<?>> futures);
+	}
+
+	private static class ConjunctFutureFactory implements FutureFactory {
+
+		@Override
+		public ConjunctFuture<?> createFuture(Collection<? extends Future<?>>
futures) {
+			return FutureUtils.combineAll(futures);
+		}
+	}
+
+	private static class WaitingFutureFactory implements FutureFactory {
+
+		@Override
+		public ConjunctFuture<?> createFuture(Collection<? extends Future<?>>
futures) {
+			return FutureUtils.waitForAll(futures);
+		}
+	}
 }


Mime
View raw message