flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject flink git commit: [FLINK-7335] [futures] Remove Flink's own Future implementation
Date Thu, 03 Aug 2017 11:54:05 GMT
Repository: flink
Updated Branches:
  refs/heads/master eddafc1ac -> 1fabe0cd0


[FLINK-7335] [futures] Remove Flink's own Future implementation

This closes #4463.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1fabe0cd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1fabe0cd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1fabe0cd

Branch: refs/heads/master
Commit: 1fabe0cd04306fd5bd607729bf9ad9f22cc25ed2
Parents: eddafc1
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Wed Aug 2 14:40:36 2017 +0200
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Thu Aug 3 13:50:22 2017 +0200

----------------------------------------------------------------------
 .../runtime/concurrent/AcceptFunction.java      |  34 --
 .../flink/runtime/concurrent/ApplyFunction.java |  36 --
 .../flink/runtime/concurrent/BiFunction.java    |  38 --
 .../runtime/concurrent/CompletableFuture.java   |  47 --
 .../apache/flink/runtime/concurrent/Future.java | 239 --------
 .../flink/runtime/concurrent/FutureUtils.java   |  95 +--
 .../concurrent/impl/FlinkCompletableFuture.java |  90 ---
 .../runtime/concurrent/impl/FlinkFuture.java    | 398 ------------
 .../executiongraph/ExecutionGraphUtils.java     |   2 +-
 .../flink/runtime/jobmanager/JobManager.scala   |   4 +-
 .../runtime/concurrent/FlinkFutureTest.java     | 607 -------------------
 11 files changed, 26 insertions(+), 1564 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1fabe0cd/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/AcceptFunction.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/AcceptFunction.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/AcceptFunction.java
deleted file mode 100644
index a300647..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/AcceptFunction.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.concurrent;
-
-/**
- * Function which is called with a single argument and does not return a value.
- *
- * @param <T> type of the argument
- */
-public interface AcceptFunction<T> {
-
-	/**
-	 * Method which handles the function call.
-	 *
-	 * @param value is the function's argument
-	 */
-	void accept(T value);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/1fabe0cd/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ApplyFunction.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ApplyFunction.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ApplyFunction.java
deleted file mode 100644
index 64def98..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ApplyFunction.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.concurrent;
-
-/**
- * Function which is called with a single argument.
- *
- * @param <V> type of the argument
- * @param <R> type of the return value
- */
-public interface ApplyFunction<V, R> {
-
-	/**
-	 * Method which handles the function call.
-	 *
-	 * @param value is the single argument
-	 * @return the function value
-	 */
-	R apply(V value);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/1fabe0cd/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/BiFunction.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/BiFunction.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/BiFunction.java
deleted file mode 100644
index 2b09de8..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/BiFunction.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.concurrent;
-
-/**
- * Function which is called with two arguments and returns a value.
- *
- * @param <T> type of the first argument
- * @param <U> type of the second argument
- * @param <R> type of the return value
- */
-public interface BiFunction<T, U, R> {
-
-	/**
-	 * Method which handles the function call.
-	 *
-	 * @param t first argument
-	 * @param u second argument
-	 * @return the function value
-	 */
-	R apply(T t, U u);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/1fabe0cd/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/CompletableFuture.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/CompletableFuture.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/CompletableFuture.java
deleted file mode 100644
index 5288bf2..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/CompletableFuture.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.concurrent;
-
-/**
- * Flink's completable future abstraction. A completable future can be completed with a regular
- * value or an exception.
- *
- * @param <T> type of the future's value
- */
-public interface CompletableFuture<T> extends Future<T> {
-
-	/**
-	 * Completes the future with the given value. The complete operation only succeeds if the future
-	 * has not been completed before. Whether it is successful or not is returned by the method.
-	 *
-	 * @param value to complete the future with
-	 * @return true if the completion was successful; otherwise false
-	 */
-	boolean complete(T value);
-
-	/**
-	 * Completes the future with the given exception. The complete operation only succeeds if the
-	 * future has not been completed before. Whether it is successful or not is returned by the
-	 * method.
-	 *
-	 * @param t the exception to complete the future with
-	 * @return true if the completion was successful; otherwise false
-	 */
-	boolean completeExceptionally(Throwable t);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/1fabe0cd/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Future.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Future.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Future.java
deleted file mode 100644
index a6d5a48..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Future.java
+++ /dev/null
@@ -1,239 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.concurrent;
-
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-/**
- * Flink's basic future abstraction. A future represents an asynchronous operation whose result
- * will be contained in this instance upon completion.
- *
- * @param <T> type of the future's result
- */
-public interface Future<T> {
-
-	/**
-	 * Checks if the future has been completed. A future is completed, if the result has been
-	 * delivered.
-	 *
-	 * @return true if the future is completed; otherwise false
-	 */
-	boolean isDone();
-
-	/**
-	 * Tries to cancel the future's operation. Note that not all future operations can be canceled.
-	 * The result of the cancelling will be returned.
-	 *
-	 * @param mayInterruptIfRunning true iff the future operation may be interrupted
-	 * @return true if the cancelling was successful; otherwise false
-	 */
-	boolean cancel(boolean mayInterruptIfRunning);
-
-	/**
-	 * Gets the result value of the future. If the future has not been completed, then this
-	 * operation will block indefinitely until the result has been delivered.
-	 *
-	 * @return the result value
-	 * @throws CancellationException if the future has been cancelled
-	 * @throws InterruptedException if the current thread was interrupted while waiting for the result
-	 * @throws ExecutionException if the future has been completed with an exception
-	 */
-	T get() throws InterruptedException, ExecutionException;
-
-	/**
-	 * Gets the result value of the future. If the future has not been done, then this operation
-	 * will block the given timeout value. If the result has not been delivered within the timeout,
-	 * then the method throws an {@link TimeoutException}.
-	 *
-	 * @param timeout the time to wait for the future to be done
-	 * @param unit time unit for the timeout argument
-	 * @return the result value
-	 * @throws CancellationException if the future has been cancelled
-	 * @throws InterruptedException if the current thread was interrupted while waiting for the result
-	 * @throws ExecutionException if the future has been completed with an exception
-	 * @throws TimeoutException if the future has not been completed within the given timeout
-	 */
-	T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
-
-	/**
-	 * Gets the value of the future. If the future has not been completed when calling this
-	 * function, the given value is returned.
-	 *
-	 * @param valueIfAbsent value which is returned if the future has not been completed
-	 * @return value of the future or the given value if the future has not been completed
-	 * @throws ExecutionException if the future has been completed with an exception
-	 */
-	T getNow(T valueIfAbsent) throws ExecutionException;
-
-	/**
-	 * Applies the given function to the value of the future. The result of the apply function is
-	 * the value of the newly returned future.
-	 * <p>
-	 * The apply function is executed asynchronously by the given executor.
-	 *
-	 * @param applyFunction function to apply to the future's value
-	 * @param executor used to execute the given apply function asynchronously
-	 * @param <R> type of the apply function's return value
-	 * @return future representing the return value of the given apply function
-	 */
-	<R> Future<R> thenApplyAsync(ApplyFunction<? super T, ? extends R> applyFunction, Executor executor);
-
-	/**
-	 * Applies the given function to the value of the future. The result of the apply function is
-	 * the value of the newly returned future.
-	 *
-	 * @param applyFunction function to apply to the future's value
-	 * @param <R> type of the apply function's return value
-	 * @return future representing the return value of the given apply function
-	 */
-	<R> Future<R> thenApply(ApplyFunction<? super T, ? extends R> applyFunction);
-
-	/**
-	 * Applies the accept function to the value of the future. Unlike the {@link ApplyFunction}, the
-	 * {@link AcceptFunction} does not return a value. The returned future, thus, represents only
-	 * the completion of the accept callback.
-	 * <p>
-	 * The accept function is executed asynchronously by the given executor.
-	 *
-	 * @param acceptFunction function to apply to the future's value
-	 * @param executor used to execute the given apply function asynchronously
-	 * @return future representing the completion of the accept callback
-	 */
-	Future<Void> thenAcceptAsync(AcceptFunction<? super T> acceptFunction, Executor executor);
-
-	/**
-	 * Applies the accept function to the value of the future. Unlike the {@link ApplyFunction}, the
-	 * {@link AcceptFunction} does not return a value. The returned future, thus, represents only
-	 * the completion of the accept callback.
-	 *
-	 * @param acceptFunction function to apply to the future's value
-	 * @return future representing the completion of the accept callback
-	 */
-	Future<Void> thenAccept(AcceptFunction<? super T> acceptFunction);
-
-	/**
-	 * Applies the given function to the value of the future if the future has been completed
-	 * exceptionally. The completing exception is given to the apply function which can return a new
-	 * value which is the value of the returned future.
-	 * <p>
-	 * The apply function is executed asynchronously by the given executor.
-	 *
-	 * @param exceptionallyFunction to apply to the future's value if it is an exception
-	 * @param executor used to execute the given apply function asynchronously
-	 * @param <R> type of the apply function's return value
-	 * @return future representing the return value of the given apply function
-	 */
-	<R> Future<R> exceptionallyAsync(ApplyFunction<Throwable, ? extends R> exceptionallyFunction, Executor executor);
-
-	/**
-	 * Applies the given function to the value of the future if the future has been completed
-	 * exceptionally. The completing exception is given to the apply function which can return a new
-	 * value which is the value of the returned future.
-	 *
-	 * @param exceptionallyFunction to apply to the future's value if it is an exception
-	 * @param <R> type of the apply function's return value
-	 * @return future representing the return value of the given apply function
-	 */
-	<R> Future<R> exceptionally(ApplyFunction<Throwable, ? extends R> exceptionallyFunction);
-
-	/**
-	 * Applies the given function to the value of the future. The apply function returns a future
-	 * result, which is flattened. This means that the resulting future of this method represents
-	 * the future's value of the apply function.
-	 * <p>
-	 * The apply function is executed asynchronously by the given executor.
-	 *
-	 * @param composeFunction to apply to the future's value. The function returns a future which is
-	 *                        flattened
-	 * @param executor used to execute the given apply function asynchronously
-	 * @param <R> type of the returned future's value
-	 * @return future representing the flattened return value of the apply function
-	 */
-	<R> Future<R> thenComposeAsync(ApplyFunction<? super T, ? extends Future<R>> composeFunction, Executor executor);
-
-	/**
-	 * Applies the given function to the value of the future. The apply function returns a future
-	 * result, which is flattened. This means that the resulting future of this method represents
-	 * the future's value of the apply function.
-	 *
-	 * @param composeFunction to apply to the future's value. The function returns a future which is
-	 *                        flattened
-	 * @param <R> type of the returned future's value
-	 * @return future representing the flattened return value of the apply function
-	 */
-	<R> Future<R> thenCompose(ApplyFunction<? super T, ? extends Future<R>> composeFunction);
-
-	/**
-	 * Applies the given handle function to the result of the future. The result can either be the
-	 * future's value or the exception with which the future has been completed. The two cases are
-	 * mutually exclusive. This means that either the left or right argument of the handle function
-	 * are non null. The result of the handle function is the returned future's value.
-	 * <p>
-	 * The handle function is executed asynchronously by the given executor.
-	 *
-	 * @param biFunction applied to the result (normal and exceptional) of the future
-	 * @param executor used to execute the handle function asynchronously
-	 * @param <R> type of the handle function's return value
-	 * @return future representing the handle function's return value
-	 */
-	<R> Future<R> handleAsync(BiFunction<? super T, Throwable, ? extends R> biFunction, Executor executor);
-
-	/**
-	 * Applies the given handle function to the result of the future. The result can either be the
-	 * future's value or the exception with which the future has been completed. The two cases are
-	 * mutually exclusive. This means that either the left or right argument of the handle function
-	 * are non null. The result of the handle function is the returned future's value.
-	 *
-	 * @param biFunction applied to the result (normal and exceptional) of the future
-	 * @param <R> type of the handle function's return value
-	 * @return future representing the handle function's return value
-	 */
-	<R> Future<R> handle(BiFunction<? super T, Throwable, ? extends R> biFunction);
-
-	/**
-	 * Applies the given function to the result of this and the other future after both futures
-	 * have completed. The result of the bi-function is the result of the returned future.
-	 * <p>
-	 * The bi-function is executed asynchronously by the given executor.
-	 *
-	 * @param other future whose result is the right input to the bi-function
-	 * @param biFunction applied to the result of this and that future
-	 * @param executor used to execute the bi-function asynchronously
-	 * @param <U> type of that future's return value
-	 * @param <R> type of the bi-function's return value
-	 * @return future representing the bi-function's return value
-	 */
-	<U, R> Future<R> thenCombineAsync(Future<U> other, BiFunction<? super T, ? super U, ? extends R> biFunction, Executor executor);
-
-	/**
-	 * Applies the given function to the result of this and the other future after both futures
-	 * have completed. The result of the bi-function is the result of the returned future.
-	 *
-	 * @param other future whose result is the right input to the bi-function
-	 * @param biFunction applied to the result of this and that future
-	 * @param <U> type of that future's return value
-	 * @param <R> type of the bi-function's return value
-	 * @return future representing the bi-function's return value
-	 */
-	<U, R> Future<R> thenCombine(Future<U> other, BiFunction<? super T, ? super U, ? extends R> biFunction);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/1fabe0cd/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
index cf218c0..e0a9a0b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.concurrent;
 
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.util.Preconditions;
 
 import akka.dispatch.OnComplete;
@@ -27,13 +26,18 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
 import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiFunction;
+
+import scala.concurrent.Future;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * A collection of utilities that expand the usage of {@link Future} and {@link CompletableFuture}.
+ * A collection of utilities that expand the usage of {@link CompletableFuture}.
  */
 public class FutureUtils {
 
@@ -50,12 +54,12 @@ public class FutureUtils {
 	 * @param <T> type of the result
 	 * @return Future containing either the result of the operation or a {@link RetryException}
 	 */
-	public static <T> java.util.concurrent.CompletableFuture<T> retry(
-		final Callable<java.util.concurrent.CompletableFuture<T>> operation,
+	public static <T> CompletableFuture<T> retry(
+		final Callable<CompletableFuture<T>> operation,
 		final int retries,
 		final Executor executor) {
 
-		java.util.concurrent.CompletableFuture<T> operationResultFuture;
+		CompletableFuture<T> operationResultFuture;
 
 		try {
 			operationResultFuture = operation.call();
@@ -73,7 +77,7 @@ public class FutureUtils {
 							"has been exhausted.", throwable));
 					}
 				} else {
-					return java.util.concurrent.CompletableFuture.completedFuture(t);
+					return CompletableFuture.completedFuture(t);
 				}
 			},
 			executor)
@@ -113,7 +117,7 @@ public class FutureUtils {
 	 * @param futures The futures that make up the conjunction. No null entries are allowed.
 	 * @return The ConjunctFuture that completes once all given futures are complete (or one fails).
 	 */
-	public static <T> ConjunctFuture<Collection<T>> combineAll(Collection<? extends java.util.concurrent.CompletableFuture<? extends T>> futures) {
+	public static <T> ConjunctFuture<Collection<T>> combineAll(Collection<? extends CompletableFuture<? extends T>> futures) {
 		checkNotNull(futures, "futures");
 
 		final ResultConjunctFuture<T> conjunct = new ResultConjunctFuture<>(futures.size());
@@ -122,7 +126,7 @@ public class FutureUtils {
 			conjunct.complete(Collections.emptyList());
 		}
 		else {
-			for (java.util.concurrent.CompletableFuture<? extends T> future : futures) {
+			for (CompletableFuture<? extends T> future : futures) {
 				future.whenComplete(conjunct::handleCompletedFuture);
 			}
 		}
@@ -141,7 +145,7 @@ public class FutureUtils {
 	 * @param futures The futures to wait on. No null entries are allowed.
 	 * @return The WaitingFuture that completes once all given futures are complete (or one fails).
 	 */
-	public static ConjunctFuture<Void> waitForAll(Collection<? extends java.util.concurrent.CompletableFuture<?>> futures) {
+	public static ConjunctFuture<Void> waitForAll(Collection<? extends CompletableFuture<?>> futures) {
 		checkNotNull(futures, "futures");
 
 		return new WaitingConjunctFuture(futures);
@@ -153,10 +157,10 @@ public class FutureUtils {
 	 * one of the Futures in the conjunction fails.
 	 * 
 	 * <p>The advantage of using the ConjunctFuture over chaining all the futures (such as via
-	 * {@link Future#thenCombine(Future, BiFunction)}) is that ConjunctFuture also tracks how
-	 * many of the Futures are already complete.
+	 * {@link CompletableFuture#thenCombine(CompletionStage, BiFunction)} )}) is that ConjunctFuture
+	 * also tracks how many of the Futures are already complete.
 	 */
-	public abstract static class ConjunctFuture<T> extends java.util.concurrent.CompletableFuture<T> {
+	public abstract static class ConjunctFuture<T> extends CompletableFuture<T> {
 
 		/**
 		 * Gets the total number of Futures in the conjunction.
@@ -245,7 +249,7 @@ public class FutureUtils {
 			}
 		}
 
-		private WaitingConjunctFuture(Collection<? extends java.util.concurrent.CompletableFuture<?>> futures) {
+		private WaitingConjunctFuture(Collection<? extends CompletableFuture<?>> futures) {
 			Preconditions.checkNotNull(futures, "Futures must not be null.");
 
 			this.numTotal = futures.size();
@@ -275,14 +279,14 @@ public class FutureUtils {
 	// ------------------------------------------------------------------------
 
 	/**
-	 * Returns an exceptionally completed {@link java.util.concurrent.CompletableFuture}.
+	 * Returns an exceptionally completed {@link CompletableFuture}.
 	 *
 	 * @param cause to complete the future with
 	 * @param <T> type of the future
 	 * @return An exceptionally completed CompletableFuture
 	 */
-	public static <T>java.util.concurrent.CompletableFuture<T> completedExceptionally(Throwable cause) {
-		java.util.concurrent.CompletableFuture<T> result = new java.util.concurrent.CompletableFuture<>();
+	public static <T>CompletableFuture<T> completedExceptionally(Throwable cause) {
+		CompletableFuture<T> result = new CompletableFuture<>();
 		result.completeExceptionally(cause);
 
 		return result;
@@ -293,14 +297,14 @@ public class FutureUtils {
 	// ------------------------------------------------------------------------
 
 	/**
-	 * Converts a Scala {@link scala.concurrent.Future} to a {@link java.util.concurrent.CompletableFuture}.
+	 * Converts a Scala {@link Future} to a {@link CompletableFuture}.
 	 *
 	 * @param scalaFuture to convert to a Java 8 CompletableFuture
 	 * @param <T> type of the future value
 	 * @return Java 8 CompletableFuture
 	 */
-	public static <T> java.util.concurrent.CompletableFuture<T> toJava(scala.concurrent.Future<T> scalaFuture) {
-		final java.util.concurrent.CompletableFuture<T> result = new java.util.concurrent.CompletableFuture<>();
+	public static <T> CompletableFuture<T> toJava(Future<T> scalaFuture) {
+		final CompletableFuture<T> result = new CompletableFuture<>();
 
 		scalaFuture.onComplete(new OnComplete<T>() {
 			@Override
@@ -315,57 +319,4 @@ public class FutureUtils {
 
 		return result;
 	}
-
-	/**
-	 * Converts a Flink {@link Future} into a {@link CompletableFuture}.
-	 *
-	 * @param flinkFuture to convert to a Java 8 CompletableFuture
-	 * @param <T> type of the future value
-	 * @return Java 8 CompletableFuture
-	 *
-	 * @deprecated Will be removed once we completely remove Flink's futures
-	 */
-	@Deprecated
-	public static <T> java.util.concurrent.CompletableFuture<T> toJava(Future<T> flinkFuture) {
-		final java.util.concurrent.CompletableFuture<T> result = new java.util.concurrent.CompletableFuture<>();
-
-		flinkFuture.handle(
-			(t, throwable) -> {
-				if (throwable != null) {
-					result.completeExceptionally(throwable);
-				} else {
-					result.complete(t);
-				}
-
-				return null;
-			}
-		);
-
-		return result;
-	}
-
-	/**
-	 * Converts a Java 8 {@link java.util.concurrent.CompletableFuture} into a Flink {@link Future}.
-	 *
-	 * @param javaFuture to convert to a Flink future
-	 * @param <T> type of the future value
-	 * @return Flink future
-	 *
-	 * @deprecated Will be removed once we completely remove Flink's futures
-	 */
-	@Deprecated
-	public static <T> Future<T> toFlinkFuture(java.util.concurrent.CompletableFuture<T> javaFuture) {
-		FlinkCompletableFuture<T> result = new FlinkCompletableFuture<>();
-
-		javaFuture.whenComplete(
-			(value, throwable) -> {
-				if (throwable == null) {
-					result.complete(value);
-				} else {
-					result.completeExceptionally(throwable);
-				}
-			});
-
-		return result;
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/1fabe0cd/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkCompletableFuture.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkCompletableFuture.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkCompletableFuture.java
deleted file mode 100644
index 14686d7..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkCompletableFuture.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.concurrent.impl;
-
-import akka.dispatch.Futures;
-import org.apache.flink.runtime.concurrent.CompletableFuture;
-import scala.concurrent.Promise;
-import scala.concurrent.Promise$;
-
-import java.util.concurrent.CancellationException;
-
-/**
- * Implementation of {@link CompletableFuture} which is backed by {@link Promise}.
- *
- * @param <T> type of the future's value
- */
-public class FlinkCompletableFuture<T> extends FlinkFuture<T> implements CompletableFuture<T> {
-
-	private final Promise<T> promise;
-
-	public FlinkCompletableFuture() {
-		promise = Futures.promise();
-		scalaFuture = promise.future();
-	}
-
-	private FlinkCompletableFuture(T value) {
-		promise = Promise$.MODULE$.successful(value);
-		scalaFuture = promise.future();
-	}
-
-	private FlinkCompletableFuture(Throwable t) {
-		promise = Promise$.MODULE$.failed(t);
-		scalaFuture = promise.future();
-	}
-
-	@Override
-	public boolean complete(T value) {
-		try {
-			promise.success(value);
-
-			return true;
-		} catch (IllegalStateException e) {
-			return false;
-		}
-	}
-
-	@Override
-	public boolean completeExceptionally(Throwable t) {
-		try {
-			if (t == null) {
-				promise.failure(new NullPointerException("Throwable was null."));
-			} else {
-				promise.failure(t);
-			}
-
-			return true;
-		} catch (IllegalStateException e) {
-			return false;
-		}
-	}
-
-	@Override
-	public boolean cancel(boolean mayInterruptIfRunning) {
-		return completeExceptionally(new CancellationException("Future has been canceled."));
-	}
-
-	public static <T> FlinkCompletableFuture<T> completed(T value) {
-		return new FlinkCompletableFuture<>(value);
-	}
-
-	public static <T> FlinkCompletableFuture<T> completedExceptionally(Throwable t) {
-		return new FlinkCompletableFuture<>(t);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/1fabe0cd/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java
deleted file mode 100644
index ab23fc5..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java
+++ /dev/null
@@ -1,398 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.concurrent.impl;
-
-import akka.dispatch.ExecutionContexts$;
-import akka.dispatch.Futures;
-import akka.dispatch.Mapper;
-import akka.dispatch.OnComplete;
-import akka.dispatch.Recover;
-import akka.japi.Procedure;
-
-import org.apache.flink.runtime.concurrent.AcceptFunction;
-import org.apache.flink.runtime.concurrent.ApplyFunction;
-import org.apache.flink.runtime.concurrent.CompletableFuture;
-import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.concurrent.BiFunction;
-import org.apache.flink.util.Preconditions;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import scala.Option;
-import scala.Tuple2;
-import scala.concurrent.Await;
-import scala.concurrent.ExecutionContext;
-import scala.concurrent.duration.Duration;
-import scala.concurrent.duration.FiniteDuration;
-import scala.util.Failure;
-import scala.util.Success;
-import scala.util.Try;
-
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-/**
- * Implementation of {@link Future} which is backed by {@link scala.concurrent.Future}.
- *
- * @param <T> type of the future's value
- */
-public class FlinkFuture<T> implements Future<T> {
-
-	private static final Logger LOG = LoggerFactory.getLogger(FlinkFuture.class);
-
-	private static final Executor DIRECT_EXECUTOR = Executors.directExecutor();
-
-	private static final ExecutionContext DIRECT_EXECUTION_CONTEXT = executionContextFromExecutor(DIRECT_EXECUTOR);
-
-	// ------------------------------------------------------------------------
-
-	protected scala.concurrent.Future<T> scalaFuture;
-
-	FlinkFuture() {
-		scalaFuture = null;
-	}
-
-	public FlinkFuture(scala.concurrent.Future<T> scalaFuture) {
-		this.scalaFuture = Preconditions.checkNotNull(scalaFuture);
-	}
-
-	public scala.concurrent.Future<T> getScalaFuture() {
-		return scalaFuture;
-	}
-
-	//-----------------------------------------------------------------------------------
-	// Future's methods
-	//-----------------------------------------------------------------------------------
-
-	@Override
-	public boolean isDone() {
-		return scalaFuture.isCompleted();
-	}
-
-	@Override
-	public boolean cancel(boolean mayInterruptIfRunning) {
-		return false;
-	}
-
-	@Override
-	public T get() throws InterruptedException, ExecutionException {
-		Preconditions.checkNotNull(scalaFuture);
-
-		try {
-			return Await.result(scalaFuture, Duration.Inf());
-		} catch (InterruptedException e) {
-			throw e;
-		} catch (FlinkFuture.ThrowableWrapperException e) {
-			throw new ExecutionException(e.getCause());
-		} catch (Exception e) {
-			throw new ExecutionException(e);
-		}
-	}
-
-	@Override
-	public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
-		Preconditions.checkNotNull(scalaFuture);
-		Preconditions.checkArgument(timeout >= 0L, "The timeout value has to be larger or " +
-			"equal than 0.");
-
-		try {
-			return Await.result(scalaFuture, new FiniteDuration(timeout, unit));
-		} catch (InterruptedException | TimeoutException e) {
-			throw e;
-		} catch (Exception e) {
-			throw new ExecutionException(e);
-		}
-	}
-
-	@Override
-	public T getNow(T valueIfAbsent) throws ExecutionException {
-		Preconditions.checkNotNull(scalaFuture);
-
-		Option<Try<T>> value = scalaFuture.value();
-
-		if (value.isDefined()) {
-			Try<T> tri = value.get();
-
-			if (tri instanceof Success) {
-				return ((Success<T>)tri).value();
-			} else {
-				throw new ExecutionException(((Failure<T>)tri).exception());
-			}
-		} else {
-			return valueIfAbsent;
-		}
-	}
-
-	@Override
-	public <R> Future<R> thenApplyAsync(final ApplyFunction<? super T, ? extends R> applyFunction, Executor executor) {
-		Preconditions.checkNotNull(scalaFuture);
-		Preconditions.checkNotNull(applyFunction);
-		Preconditions.checkNotNull(executor);
-
-		scala.concurrent.Future<R> mappedFuture = scalaFuture.map(new Mapper<T, R>() {
-			@Override
-			public R apply(T value) {
-				return applyFunction.apply(value);
-			}
-		}, createExecutionContext(executor));
-
-		return new FlinkFuture<>(mappedFuture);
-	}
-
-	@Override
-	public <R> Future<R> thenApply(final ApplyFunction<? super T, ? extends R> applyFunction) {
-		return thenApplyAsync(applyFunction, Executors.directExecutor());
-	}
-
-	@Override
-	public Future<Void> thenAcceptAsync(final AcceptFunction<? super T> acceptFunction, Executor executor) {
-		Preconditions.checkNotNull(scalaFuture);
-		Preconditions.checkNotNull(acceptFunction);
-		Preconditions.checkNotNull(executor);
-
-		scala.concurrent.Future<Void> acceptedFuture = scalaFuture.map(new Mapper<T, Void>() {
-			@Override
-			public Void apply(T value) {
-				acceptFunction.accept(value);
-
-				return null;
-			}
-		}, createExecutionContext(executor));
-
-		return new FlinkFuture<>(acceptedFuture);
-	}
-
-	@Override
-	public Future<Void> thenAccept(AcceptFunction<? super T> acceptFunction) {
-		return thenAcceptAsync(acceptFunction, Executors.directExecutor());
-	}
-
-	@Override
-	public <R> Future<R> exceptionallyAsync(final ApplyFunction<Throwable, ? extends R> exceptionallyFunction, Executor executor) {
-		Preconditions.checkNotNull(scalaFuture);
-		Preconditions.checkNotNull(exceptionallyFunction);
-		Preconditions.checkNotNull(executor);
-
-		scala.concurrent.Future<R> recoveredFuture = scalaFuture.recover(new Recover<R>() {
-			@Override
-			public R recover(Throwable failure) throws Throwable {
-				return exceptionallyFunction.apply(failure);
-			}
-		}, createExecutionContext(executor));
-
-		return new FlinkFuture<>(recoveredFuture);
-	}
-
-	@Override
-	public <R> Future<R> exceptionally(ApplyFunction<Throwable, ? extends R> exceptionallyFunction) {
-		return exceptionallyAsync(exceptionallyFunction, Executors.directExecutor());
-	}
-
-	@Override
-	public <R> Future<R> thenComposeAsync(final ApplyFunction<? super T, ? extends Future<R>> applyFunction, Executor executor) {
-		Preconditions.checkNotNull(scalaFuture);
-		Preconditions.checkNotNull(applyFunction);
-		Preconditions.checkNotNull(executor);
-
-		final ExecutionContext executionContext = createExecutionContext(executor);
-
-		scala.concurrent.Future<R> flatMappedFuture = scalaFuture.flatMap(new Mapper<T, scala.concurrent.Future<R>>() {
-			@Override
-			public scala.concurrent.Future<R> apply(T value) {
-				final Future<? extends R> future = applyFunction.apply(value);
-
-				if (future instanceof FlinkFuture) {
-					@SuppressWarnings("unchecked")
-					FlinkFuture<R> flinkFuture = (FlinkFuture<R>) future;
-
-					return flinkFuture.scalaFuture;
-				} else {
-					return Futures.future(new Callable<R>() {
-						@Override
-						public R call() throws Exception {
-							try {
-								return future.get();
-							} catch (ExecutionException e) {
-								// unwrap the execution exception if it's not a throwable
-								if (e.getCause() instanceof Exception) {
-									throw (Exception) e.getCause();
-								} else {
-									throw new FlinkFuture.ThrowableWrapperException(e.getCause());
-								}
-							}
-						}
-					}, executionContext);
-				}
-			}
-		}, executionContext);
-
-		return new FlinkFuture<>(flatMappedFuture);
-	}
-
-	@Override
-	public <R> Future<R> thenCompose(ApplyFunction<? super T, ? extends Future<R>> composeFunction) {
-		return thenComposeAsync(composeFunction, Executors.directExecutor());
-	}
-
-	@Override
-	public <R> Future<R> handleAsync(final BiFunction<? super T, Throwable, ? extends R> biFunction, Executor executor) {
-		Preconditions.checkNotNull(scalaFuture);
-		Preconditions.checkNotNull(biFunction);
-		Preconditions.checkNotNull(executor);
-
-		final ExecutionContext executionContext = createExecutionContext(executor);
-
-		final CompletableFuture<R> resultFuture = new FlinkCompletableFuture<>();
-
-		scalaFuture.onComplete(new OnComplete<T>() {
-			@Override
-			public void onComplete(Throwable failure, T success) throws Throwable {
-				final R result = biFunction.apply(success, failure);
-
-				resultFuture.complete(result);
-			}
-		}, executionContext);
-
-		return resultFuture;
-	}
-
-	@Override
-	public <R> Future<R> handle(BiFunction<? super T, Throwable, ? extends R> biFunction) {
-		return handleAsync(biFunction, Executors.directExecutor());
-	}
-
-	@Override
-	public <U, R> Future<R> thenCombineAsync(final Future<U> other, final BiFunction<? super T, ? super U, ? extends R> biFunction, final Executor executor) {
-		Preconditions.checkNotNull(other);
-		Preconditions.checkNotNull(biFunction);
-		Preconditions.checkNotNull(executor);
-
-		final ExecutionContext executionContext = createExecutionContext(executor);
-
-		final scala.concurrent.Future<U> thatScalaFuture;
-
-		if (other instanceof FlinkFuture) {
-			thatScalaFuture = ((FlinkFuture<U>) other).scalaFuture;
-		} else {
-			thatScalaFuture = Futures.future(new Callable<U>() {
-				@Override
-				public U call() throws Exception {
-					try {
-						return other.get();
-					} catch (ExecutionException e) {
-						// unwrap the execution exception if the cause is an Exception
-						if (e.getCause() instanceof Exception) {
-							throw (Exception) e.getCause();
-						} else {
-							// it's an error or a throwable which we have to wrap for the moment
-							throw new FlinkFuture.ThrowableWrapperException(e.getCause());
-						}
-					}
-				}
-			}, executionContext);
-		}
-
-		scala.concurrent.Future<R>  result = scalaFuture.zip(thatScalaFuture).map(new Mapper<Tuple2<T, U>, R>() {
-			@Override
-			public R apply(Tuple2<T, U> tuple2) {
-				return biFunction.apply(tuple2._1, tuple2._2);
-			}
-		}, executionContext);
-
-		return new FlinkFuture<>(result);
-	}
-
-	@Override
-	public <U, R> Future<R> thenCombine(Future<U> other, BiFunction<? super T, ? super U, ? extends R> biFunction) {
-		return thenCombineAsync(other, biFunction, Executors.directExecutor());
-	}
-
-	//-----------------------------------------------------------------------------------
-	// Static factory methods
-	//-----------------------------------------------------------------------------------
-
-	/**
-	 * Creates a future whose value is determined by the asynchronously executed callable.
-	 *
-	 * @param callable whose value is delivered by the future
-	 * @param executor to be used to execute the callable
-	 * @param <T> type of the future's value
-	 * @return future which represents the value of the callable
-	 */
-	public static <T> Future<T> supplyAsync(Callable<T> callable, Executor executor) {
-		Preconditions.checkNotNull(callable);
-		Preconditions.checkNotNull(executor);
-
-		scala.concurrent.Future<T> scalaFuture = Futures.future(callable, createExecutionContext(executor));
-
-		return new FlinkFuture<>(scalaFuture);
-	}
-
-	//-----------------------------------------------------------------------------------
-	// Helper functions and types
-	//-----------------------------------------------------------------------------------
-
-	private static ExecutionContext createExecutionContext(final Executor executor) {
-		if (executor == DIRECT_EXECUTOR) {
-			return DIRECT_EXECUTION_CONTEXT;
-		} else {
-			return executionContextFromExecutor(executor);
-		}
-	}
-
-	private static ExecutionContext executionContextFromExecutor(final Executor executor) {
-		return ExecutionContexts$.MODULE$.fromExecutor(executor, new Procedure<Throwable>() {
-			@Override
-			public void apply(Throwable throwable) throws Exception {
-				if (executor instanceof ExecutorService) {
-					ExecutorService executorService = (ExecutorService) executor;
-					// only log the exception if the executor service is still running
-					if (!executorService.isShutdown()) {
-						logThrowable(throwable);
-					}
-				} else {
-					logThrowable(throwable);
-				}
-			}
-
-			private void logThrowable(Throwable throwable) {
-				LOG.warn("Uncaught exception in execution context.", throwable);
-			}
-		});
-	}
-
-	/**
-	 * Wrapper for {@link Throwable} which is used to emit the proper exception when calling
-	 * {@link Future#get}.
-	 */
-	private static class ThrowableWrapperException extends Exception {
-
-		private static final long serialVersionUID = 3855668690181179801L;
-
-		ThrowableWrapperException(Throwable throwable) {
-			super(Preconditions.checkNotNull(throwable));
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/1fabe0cd/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphUtils.java
index 8558533..f1d793d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphUtils.java
@@ -18,12 +18,12 @@
 
 package org.apache.flink.runtime.executiongraph;
 
-import org.apache.flink.runtime.concurrent.BiFunction;
 import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.util.ExceptionUtils;
 
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.function.BiFunction;
 
 /**
  * Utilities for dealing with the execution graphs and scheduling.

http://git-wip-us.apache.org/repos/asf/flink/blob/1fabe0cd/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index ef4fa86..80fa506 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.jobmanager
 import java.io.IOException
 import java.net._
 import java.util.UUID
-import java.util.concurrent.{Future => JavaFuture, _}
+import java.util.concurrent.{TimeUnit, Future => _, TimeoutException => _, _}
 import java.util.function.{BiFunction, Consumer}
 
 import akka.actor.Status.{Failure, Success}
@@ -45,7 +45,7 @@ import org.apache.flink.runtime.clusterframework.FlinkResourceManager
 import org.apache.flink.runtime.clusterframework.messages._
 import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager
 import org.apache.flink.runtime.clusterframework.types.ResourceID
-import org.apache.flink.runtime.concurrent.{AcceptFunction, ApplyFunction, Executors => FlinkExecutors}
+import org.apache.flink.runtime.concurrent.{Executors => FlinkExecutors}
 import org.apache.flink.runtime.execution.SuppressRestartsException
 import org.apache.flink.runtime.execution.librarycache.{BlobLibraryCacheManager, LibraryCacheManager}
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory

http://git-wip-us.apache.org/repos/asf/flink/blob/1fabe0cd/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FlinkFutureTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FlinkFutureTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FlinkFutureTest.java
deleted file mode 100644
index 6808a5c..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FlinkFutureTest.java
+++ /dev/null
@@ -1,607 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.concurrent;
-
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
-import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
-import org.apache.flink.util.TestLogger;
-
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-/**
- * Tests for Flink's future implementation.
- */
-public class FlinkFutureTest extends TestLogger {
-
-	private static ExecutorService executor;
-
-	@BeforeClass
-	public static void setup() {
-		executor = Executors.newSingleThreadExecutor();
-	}
-
-	@AfterClass
-	public static void teardown() {
-		executor.shutdown();
-	}
-
-	@Test(timeout = 10000L)
-	public void testFutureApplyAsync() throws Exception {
-		int expectedValue = 42;
-
-		CompletableFuture<Integer> initialFuture = new FlinkCompletableFuture<>();
-
-		Future<String> appliedFuture = initialFuture.thenApplyAsync(new ApplyFunction<Integer, String>() {
-			@Override
-			public String apply(Integer value) {
-				return String.valueOf(value);
-			}
-		}, executor);
-
-		initialFuture.complete(expectedValue);
-
-		assertEquals(String.valueOf(expectedValue), appliedFuture.get());
-	}
-
-	@Test(expected = TimeoutException.class)
-	public void testFutureGetTimeout() throws InterruptedException, ExecutionException, TimeoutException {
-		CompletableFuture<Integer> future = new FlinkCompletableFuture<>();
-
-		future.get(10, TimeUnit.MILLISECONDS);
-
-		fail("Get should have thrown a timeout exception.");
-	}
-
-	@Test(expected = TestException.class)
-	public void testExceptionalCompletion() throws Throwable {
-		CompletableFuture<Integer> initialFuture = new FlinkCompletableFuture<>();
-
-		initialFuture.completeExceptionally(new TestException("Test exception"));
-
-		try {
-			initialFuture.get();
-
-			fail("Get should have thrown an exception.");
-		} catch (ExecutionException e) {
-			throw e.getCause();
-		}
-	}
-
-	/**
-	 * Tests that an exception is propagated through an apply function.
-	 */
-	@Test(expected = TestException.class)
-	public void testExceptionPropagation() throws Throwable {
-		CompletableFuture<Integer> initialFuture = new FlinkCompletableFuture<>();
-
-		Future<String> mappedFuture = initialFuture.thenApplyAsync(new ApplyFunction<Integer, String>() {
-			@Override
-			public String apply(Integer value) {
-				throw new TestException("Test exception");
-			}
-		}, executor);
-
-		Future<String> mapped2Future = mappedFuture.thenApplyAsync(new ApplyFunction<String, String>() {
-			@Override
-			public String apply(String value) {
-				return "foobar";
-			}
-		}, executor);
-
-		initialFuture.complete(42);
-
-		try {
-			mapped2Future.get();
-
-			fail("Get should have thrown an exception.");
-		} catch (ExecutionException e) {
-			throw e.getCause();
-		}
-	}
-
-	@Test(timeout = 10000L)
-	public void testExceptionallyAsync() throws ExecutionException, InterruptedException {
-		CompletableFuture<Integer> initialFuture = new FlinkCompletableFuture<>();
-		String exceptionMessage = "Foobar";
-
-		Future<String> recovered = initialFuture.exceptionallyAsync(new ApplyFunction<Throwable, String>() {
-			@Override
-			public String apply(Throwable value) {
-				return value.getMessage();
-			}
-		}, executor);
-
-		initialFuture.completeExceptionally(new TestException(exceptionMessage));
-
-		String actualMessage = recovered.get();
-
-		assertEquals(exceptionMessage, actualMessage);
-	}
-
-	@Test(timeout = 10000L)
-	public void testComposeAsync() throws ExecutionException, InterruptedException {
-		CompletableFuture<Integer> initialFuture = new FlinkCompletableFuture<>();
-
-		final int expectedValue = 42;
-
-		Future<Integer> composedFuture = initialFuture.thenComposeAsync(new ApplyFunction<Integer, Future<Integer>>() {
-			@Override
-			public Future<Integer> apply(Integer value) {
-				return FlinkFuture.supplyAsync(new Callable<Integer>() {
-					@Override
-					public Integer call() throws Exception {
-						return expectedValue;
-					}
-				}, executor);
-			}
-		}, executor);
-
-		initialFuture.complete(42);
-
-		int actualValue = composedFuture.get();
-
-		assertEquals(expectedValue, actualValue);
-	}
-
-	@Test(timeout = 10000L)
-	public void testCombineAsync() throws ExecutionException, InterruptedException {
-		CompletableFuture<Integer> leftFuture = new FlinkCompletableFuture<>();
-		CompletableFuture<String> rightFuture = new FlinkCompletableFuture<>();
-
-		final int expectedLeftValue = 42;
-		final String expectedRightValue = "foobar";
-
-
-		Future<String> resultFuture = leftFuture.thenCombineAsync(rightFuture, new BiFunction<Integer, String, String>() {
-			@Override
-			public String apply(Integer integer, String s) {
-				return s + integer;
-			}
-		}, executor);
-
-		leftFuture.complete(expectedLeftValue);
-		rightFuture.complete(expectedRightValue);
-
-		String result = resultFuture.get();
-
-		assertEquals(expectedRightValue + expectedLeftValue, result);
-	}
-
-	@Test(timeout = 10000L)
-	public void testCombineAsyncLeftFailure() throws InterruptedException {
-		CompletableFuture<Integer> leftFuture = new FlinkCompletableFuture<>();
-		CompletableFuture<String> rightFuture = new FlinkCompletableFuture<>();
-
-		final String expectedRightValue = "foobar";
-		final TestException testException = new TestException("barfoo");
-
-
-		Future<String> resultFuture = leftFuture.thenCombineAsync(rightFuture, new BiFunction<Integer, String, String>() {
-			@Override
-			public String apply(Integer integer, String s) {
-				return s + integer;
-			}
-		}, executor);
-
-		leftFuture.completeExceptionally(testException);
-		rightFuture.complete(expectedRightValue);
-
-		try {
-			resultFuture.get();
-			fail("We should have caught an ExecutionException.");
-		} catch (ExecutionException e) {
-			assertEquals(testException, e.getCause());
-		}
-	}
-
-	@Test(timeout = 10000L)
-	public void testCombineAsyncRightFailure() throws ExecutionException, InterruptedException {
-		CompletableFuture<Integer> leftFuture = new FlinkCompletableFuture<>();
-		CompletableFuture<String> rightFuture = new FlinkCompletableFuture<>();
-
-		final int expectedLeftValue = 42;
-		final TestException testException = new TestException("barfoo");
-
-
-		Future<String> resultFuture = leftFuture.thenCombineAsync(rightFuture, new BiFunction<Integer, String, String>() {
-			@Override
-			public String apply(Integer integer, String s) {
-				return s + integer;
-			}
-		}, executor);
-
-		leftFuture.complete(expectedLeftValue);
-		rightFuture.completeExceptionally(testException);
-
-		try {
-			resultFuture.get();
-			fail("We should have caught an ExecutionException.");
-		} catch (ExecutionException e) {
-			assertEquals(testException, e.getCause());
-		}
-	}
-
-	@Test
-	public void testGetNow() throws ExecutionException {
-		CompletableFuture<Integer> initialFuture = new FlinkCompletableFuture<>();
-
-		final int absentValue = 41;
-
-		assertEquals(new Integer(absentValue), initialFuture.getNow(absentValue));
-	}
-
-	@Test(timeout = 10000L)
-	public void testAcceptAsync() throws ExecutionException, InterruptedException {
-		CompletableFuture<Integer> initialFuture = new FlinkCompletableFuture<>();
-		final AtomicInteger atomicInteger = new AtomicInteger(0);
-		int expectedValue = 42;
-
-		Future<Void> result = initialFuture.thenAcceptAsync(new AcceptFunction<Integer>() {
-			@Override
-			public void accept(Integer value) {
-				atomicInteger.set(value);
-			}
-		}, executor);
-
-		initialFuture.complete(expectedValue);
-
-		result.get();
-
-		assertEquals(expectedValue, atomicInteger.get());
-	}
-
-	@Test(timeout = 10000L)
-	public void testHandleAsync() throws ExecutionException, InterruptedException {
-		CompletableFuture<Integer> initialFuture = new FlinkCompletableFuture<>();
-		int expectedValue = 43;
-
-		Future<String> result = initialFuture.handleAsync(new BiFunction<Integer, Throwable, String>() {
-			@Override
-			public String apply(Integer integer, Throwable throwable) {
-				if (integer != null) {
-					return String.valueOf(integer);
-				} else {
-					return throwable.getMessage();
-				}
-			}
-		}, executor);
-
-		initialFuture.complete(expectedValue);
-
-		assertEquals(String.valueOf(expectedValue), result.get());
-	}
-
-	@Test(timeout = 10000L)
-	public void testHandleAsyncException() throws ExecutionException, InterruptedException {
-		CompletableFuture<Integer> initialFuture = new FlinkCompletableFuture<>();
-		String exceptionMessage = "foobar";
-
-		Future<String> result = initialFuture.handleAsync(new BiFunction<Integer, Throwable, String>() {
-			@Override
-			public String apply(Integer integer, Throwable throwable) {
-				if (integer != null) {
-					return String.valueOf(integer);
-				} else {
-					return throwable.getMessage();
-				}
-			}
-		}, executor);
-
-		initialFuture.completeExceptionally(new TestException(exceptionMessage));
-
-		assertEquals(exceptionMessage, result.get());
-	}
-
-	@Test(timeout = 10000L)
-	public void testMultipleCompleteOperations() throws ExecutionException, InterruptedException {
-		CompletableFuture<Integer> initialFuture = new FlinkCompletableFuture<>();
-		int expectedValue = 42;
-
-		assertTrue(initialFuture.complete(expectedValue));
-
-		assertFalse(initialFuture.complete(1337));
-
-		assertFalse(initialFuture.completeExceptionally(new TestException("foobar")));
-
-		assertEquals(new Integer(expectedValue), initialFuture.get());
-	}
-
-	@Test
-	public void testApply() throws ExecutionException, InterruptedException {
-		int expectedValue = 42;
-
-		CompletableFuture<Integer> initialFuture = new FlinkCompletableFuture<>();
-
-		Future<String> appliedFuture = initialFuture.thenApply(new ApplyFunction<Integer, String>() {
-			@Override
-			public String apply(Integer value) {
-				return String.valueOf(value);
-			}
-		});
-
-		initialFuture.complete(expectedValue);
-
-		assertEquals(String.valueOf(expectedValue), appliedFuture.get());
-	}
-
-	@Test
-	public void testAccept() throws ExecutionException, InterruptedException {
-		int expectedValue = 42;
-		Future<Integer> initialFuture = FlinkCompletableFuture.completed(expectedValue);
-		final AtomicInteger atomicInteger = new AtomicInteger(0);
-
-		Future<Void> result = initialFuture.thenAccept(new AcceptFunction<Integer>() {
-			@Override
-			public void accept(Integer value) {
-				atomicInteger.set(value);
-			}
-		});
-
-		result.get();
-
-		assertEquals(expectedValue, atomicInteger.get());
-	}
-
-	@Test
-	public void testExceptionally() throws ExecutionException, InterruptedException {
-		String exceptionMessage = "Foobar";
-		Future<Integer> initialFuture = FlinkCompletableFuture
-			.completedExceptionally(new TestException(exceptionMessage));
-
-
-		Future<String> recovered = initialFuture.exceptionally(new ApplyFunction<Throwable, String>() {
-			@Override
-			public String apply(Throwable value) {
-				return value.getMessage();
-			}
-		});
-
-		String actualMessage = recovered.get();
-
-		assertEquals(exceptionMessage, actualMessage);
-	}
-
-	@Test
-	public void testHandle() throws ExecutionException, InterruptedException {
-		int expectedValue = 43;
-		Future<Integer> initialFuture = FlinkCompletableFuture.completed(expectedValue);
-
-		Future<String> result = initialFuture.handle(new BiFunction<Integer, Throwable, String>() {
-			@Override
-			public String apply(Integer integer, Throwable throwable) {
-				if (integer != null) {
-					return String.valueOf(integer);
-				} else {
-					return throwable.getMessage();
-				}
-			}
-		});
-
-		assertEquals(String.valueOf(expectedValue), result.get());
-	}
-
-	@Test
-	public void testCompose() throws ExecutionException, InterruptedException {
-		CompletableFuture<Integer> initialFuture = new FlinkCompletableFuture<>();
-		final int expectedValue = 42;
-
-		Future<Integer> composedFuture = initialFuture.thenCompose(new ApplyFunction<Integer, Future<Integer>>() {
-			@Override
-			public Future<Integer> apply(Integer value) {
-				return FlinkFuture.supplyAsync(new Callable<Integer>() {
-					@Override
-					public Integer call() throws Exception {
-						return expectedValue;
-					}
-				}, executor);
-			}
-		});
-
-		initialFuture.complete(42);
-
-		int actualValue = composedFuture.get();
-
-		assertEquals(expectedValue, actualValue);
-	}
-
-	@Test
-	public void testCombine() throws ExecutionException, InterruptedException {
-		int expectedLeftValue = 1;
-		int expectedRightValue = 2;
-
-		Future<Integer> left = FlinkCompletableFuture.completed(expectedLeftValue);
-		Future<Integer> right = FlinkCompletableFuture.completed(expectedRightValue);
-
-		Future<Integer> sum = left.thenCombine(right, new BiFunction<Integer, Integer, Integer>() {
-			@Override
-			public Integer apply(Integer left, Integer right) {
-				return left + right;
-			}
-		});
-
-		int result = sum.get();
-
-		assertEquals(expectedLeftValue + expectedRightValue, result);
-	}
-
-	/**
-	 * Tests that multiple functions can be called on complete futures.
-	 */
-	@Test(timeout = 10000L)
-	public void testMultipleFunctionsOnCompleteFuture() throws Exception {
-		final FlinkCompletableFuture<String> future = FlinkCompletableFuture.completed("test");
-
-		Future<String> result1 = future.handleAsync(new BiFunction<String, Throwable, String>() {
-
-			@Override
-			public String apply(String s, Throwable throwable) {
-				return s != null ? s : throwable.getMessage();
-			}
-		}, executor);
-
-		Future<Void> result2 = future.thenAcceptAsync(new AcceptFunction<String>() {
-			@Override
-			public void accept(String value) {}
-		}, executor);
-
-		assertEquals("test", result1.get());
-		assertNull(result2.get());
-	}
-
-	/**
-	 * Tests that multiple functions can be called on incomplete futures.
-	 */
-	@Test(timeout = 10000L)
-	public void testMultipleFunctionsOnIncompleteFuture() throws Exception {
-		final FlinkCompletableFuture<String> future = new FlinkCompletableFuture<>();
-
-		Future<String> result1 = future.handleAsync(new BiFunction<String, Throwable, String>() {
-			@Override
-			public String apply(String s, Throwable throwable) {
-				return s != null ? s : throwable.getMessage();
-			}
-		}, executor);
-
-		Future<Void> result2 = future.thenAcceptAsync(new AcceptFunction<String>() {
-			@Override
-			public void accept(String value) {}
-		}, executor);
-
-		future.complete("value");
-
-		assertEquals("value", result1.get());
-		assertNull(result2.get());
-	}
-
-	/**
-	 * Tests that multiple functions can be called on complete futures.
-	 */
-	@Test(timeout = 10000)
-	public void testMultipleFunctionsExceptional() throws Exception {
-		final FlinkCompletableFuture<String> future = new FlinkCompletableFuture<>();
-
-		Future<String> result1 = future.handleAsync(new BiFunction<String, Throwable, String>() {
-			@Override
-			public String apply(String s, Throwable throwable) {
-				return s != null ? s : throwable.getMessage();
-			}
-		}, executor);
-
-		Future<Void> result2 = future.thenAcceptAsync(new AcceptFunction<String>() {
-			@Override
-			public void accept(String value) {}
-		}, executor);
-
-		future.completeExceptionally(new TestException("test"));
-
-		assertEquals("test", result1.get());
-
-		try {
-			result2.get();
-			fail("We should have caught an ExecutionException.");
-		} catch (ExecutionException e) {
-			assertTrue(e.getCause() instanceof TestException);
-		}
-	}
-
-	/**
-	 * Tests that a chain of dependent futures will be completed exceptionally if the initial future
-	 * is completed exceptionally.
-	 */
-	@Test(timeout = 10000)
-	public void testChainedFutureExceptionalCompletion() throws ExecutionException, InterruptedException {
-		final FlinkCompletableFuture<String> future = new FlinkCompletableFuture<>();
-
-		Future<String> apply = future.thenApplyAsync(new ApplyFunction<String, String>() {
-			@Override
-			public String apply(String value) {
-				return value;
-			}
-		}, executor);
-
-		Future<Throwable> applyException = apply.exceptionallyAsync(new ApplyFunction<Throwable, Throwable>() {
-			@Override
-			public Throwable apply(Throwable value) {
-				return value;
-			}
-		}, executor);
-
-		Future<Void> accept1 = future.thenAcceptAsync(new AcceptFunction<String>() {
-			@Override
-			public void accept(String value) {
-				// noop
-			}
-		}, executor);
-
-		Future<Throwable> accept1Exception = accept1.exceptionallyAsync(new ApplyFunction<Throwable, Throwable>() {
-			@Override
-			public Throwable apply(Throwable value) {
-				return value;
-			}
-		}, executor);
-
-		Future<Void> accept2 = future.thenAcceptAsync(new AcceptFunction<String>() {
-			@Override
-			public void accept(String value) {
-				// noop
-			}
-		}, executor);
-
-		Future<Throwable> accept2Exception = accept2.exceptionallyAsync(new ApplyFunction<Throwable, Throwable>() {
-			@Override
-			public Throwable apply(Throwable value) {
-				return value;
-			}
-		}, executor);
-
-		TestException testException = new TestException("test");
-
-		// fail the initial future
-		future.completeExceptionally(testException);
-
-		assertEquals(testException, applyException.get());
-		assertEquals(testException, accept1Exception.get());
-		assertEquals(testException, accept2Exception.get());
-	}
-
-	private static class TestException extends RuntimeException {
-
-		private static final long serialVersionUID = -1274022962838535130L;
-
-		public TestException(String message) {
-			super(message);
-		}
-	}
-}


Mime
View raw message