flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [1/3] flink git commit: [FLINK-7334] [futures] Replace Flink's futures with Java 8's CompletableFuture in RpcEndpoint, RpcGateways and RpcService
Date Thu, 03 Aug 2017 11:49:03 GMT
Repository: flink
Updated Branches:
  refs/heads/master 74e479e51 -> eddafc1ac


http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
index b56bf6b..14cf35a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
@@ -21,10 +21,10 @@ package org.apache.flink.runtime.rpc;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
 
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -92,20 +92,19 @@ public class TestingRpcService extends AkkaRpcService {
 	}
 
 	@Override
-	public <C extends RpcGateway> Future<C> connect(String address, Class<C>
clazz) {
+	public <C extends RpcGateway> CompletableFuture<C> connect(String address, Class<C>
clazz) {
 		RpcGateway gateway = registeredConnections.get(address);
 
 		if (gateway != null) {
 			if (clazz.isAssignableFrom(gateway.getClass())) {
 				@SuppressWarnings("unchecked")
 				C typedGateway = (C) gateway;
-				return FlinkCompletableFuture.completed(typedGateway);
+				return CompletableFuture.completedFuture(typedGateway);
 			} else {
-				return FlinkCompletableFuture.completedExceptionally(
-					new Exception("Gateway registered under " + address + " is not of type " + clazz));
+				return FutureUtils.completedExceptionally(new Exception("Gateway registered under " +
address + " is not of type " + clazz));
 			}
 		} else {
-			return FlinkCompletableFuture.completedExceptionally(new Exception("No gateway registered
under that name"));
+			return FutureUtils.completedExceptionally(new Exception("No gateway registered under "
+ address + '.'));
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
index ac3f40b..37349a1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
@@ -20,11 +20,9 @@ package org.apache.flink.runtime.rpc;
 
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.concurrent.CompletableFuture;
-import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.util.DirectExecutorService;
 import org.apache.flink.util.Preconditions;
 
@@ -36,10 +34,12 @@ import java.util.BitSet;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Delayed;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
+import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
@@ -65,7 +65,7 @@ public class TestingSerialRpcService implements RpcService {
 		executorService = new DirectExecutorService();
 		scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
 		this.registeredConnections = new ConcurrentHashMap<>(16);
-		this.terminationFuture = new FlinkCompletableFuture<>();
+		this.terminationFuture = new CompletableFuture<>();
 
 		this.scheduledExecutorServiceAdapter = new ScheduledExecutorServiceAdapter(scheduledExecutorService);
 	}
@@ -88,13 +88,13 @@ public class TestingSerialRpcService implements RpcService {
 	}
 
 	@Override
-	public <T> Future<T> execute(Callable<T> callable) {
+	public <T> CompletableFuture<T> execute(Callable<T> callable) {
 		try {
 			T result = callable.call();
 
-			return FlinkCompletableFuture.completed(result);
+			return CompletableFuture.completedFuture(result);
 		} catch (Exception e) {
-			return FlinkCompletableFuture.completedExceptionally(e);
+			return FutureUtils.completedExceptionally(e);
 		}
 	}
 
@@ -134,7 +134,7 @@ public class TestingSerialRpcService implements RpcService {
 	}
 
 	@Override
-	public Future<Void> getTerminationFuture() {
+	public CompletableFuture<Void> getTerminationFuture() {
 		return terminationFuture;
 	}
 
@@ -178,20 +178,19 @@ public class TestingSerialRpcService implements RpcService {
 	}
 
 	@Override
-	public <C extends RpcGateway> Future<C> connect(String address, Class<C>
clazz) {
+	public <C extends RpcGateway> CompletableFuture<C> connect(String address, Class<C>
clazz) {
 		RpcGateway gateway = registeredConnections.get(address);
 
 		if (gateway != null) {
 			if (clazz.isAssignableFrom(gateway.getClass())) {
 				@SuppressWarnings("unchecked")
 				C typedGateway = (C) gateway;
-				return FlinkCompletableFuture.completed(typedGateway);
+				return CompletableFuture.completedFuture(typedGateway);
 			} else {
-				return FlinkCompletableFuture.completedExceptionally(
-					new Exception("Gateway registered under " + address + " is not of type " + clazz));
+				return FutureUtils.completedExceptionally(new Exception("Gateway registered under " +
address + " is not of type " + clazz));
 			}
 		} else {
-			return FlinkCompletableFuture.completedExceptionally(new Exception("No gateway registered
under that name"));
+			return FutureUtils.completedExceptionally(new Exception("No gateway registered under "
+ address + '.'));
 		}
 	}
 
@@ -251,12 +250,12 @@ public class TestingSerialRpcService implements RpcService {
 
 				Class<?> returnType = method.getReturnType();
 
-				if (returnType.equals(Future.class)) {
+				if (returnType.equals(CompletableFuture.class)) {
 					try {
 						Object result = handleRpcInvocationSync(methodName, filteredArguments.f0, filteredArguments.f1,
futureTimeout);
-						return FlinkCompletableFuture.completed(result);
+						return CompletableFuture.completedFuture(result);
 					} catch (Throwable e) {
-						return FlinkCompletableFuture.completedExceptionally(e);
+						return FutureUtils.completedExceptionally(e);
 					}
 				} else {
 					return handleRpcInvocationSync(methodName, filteredArguments.f0, filteredArguments.f1,
futureTimeout);
@@ -290,11 +289,11 @@ public class TestingSerialRpcService implements RpcService {
 		}
 
 		@Override
-		public <V> Future<V> callAsync(Callable<V> callable, Time callTimeout)
{
+		public <V> CompletableFuture<V> callAsync(Callable<V> callable, Time
callTimeout) {
 			try {
-				return FlinkCompletableFuture.completed(callable.call());
+				return CompletableFuture.completedFuture(callable.call());
 			} catch (Throwable e) {
-				return FlinkCompletableFuture.completedExceptionally(e);
+				return FutureUtils.completedExceptionally(e);
 			}
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
index 0b06267..793d292 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
@@ -21,9 +21,6 @@ package org.apache.flink.runtime.rpc.akka;
 import akka.actor.ActorSystem;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
-import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcMethod;
@@ -37,7 +34,7 @@ import org.hamcrest.core.Is;
 import org.junit.AfterClass;
 import org.junit.Test;
 
-import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 
 import static org.junit.Assert.assertEquals;
@@ -74,7 +71,7 @@ public class AkkaRpcActorTest extends TestLogger {
 	public void testAddressResolution() throws Exception {
 		DummyRpcEndpoint rpcEndpoint = new DummyRpcEndpoint(akkaRpcService);
 
-		Future<DummyRpcGateway> futureRpcGateway = akkaRpcService.connect(rpcEndpoint.getAddress(),
DummyRpcGateway.class);
+		CompletableFuture<DummyRpcGateway> futureRpcGateway = akkaRpcService.connect(rpcEndpoint.getAddress(),
DummyRpcGateway.class);
 
 		DummyRpcGateway rpcGateway = futureRpcGateway.get(timeout.getSize(), timeout.getUnit());
 
@@ -86,7 +83,7 @@ public class AkkaRpcActorTest extends TestLogger {
 	 */
 	@Test
 	public void testFailingAddressResolution() throws Exception {
-		Future<DummyRpcGateway> futureRpcGateway = akkaRpcService.connect("foobar", DummyRpcGateway.class);
+		CompletableFuture<DummyRpcGateway> futureRpcGateway = akkaRpcService.connect("foobar",
DummyRpcGateway.class);
 
 		try {
 			futureRpcGateway.get(timeout.getSize(), timeout.getUnit());
@@ -111,7 +108,7 @@ public class AkkaRpcActorTest extends TestLogger {
 		DummyRpcGateway rpcGateway = rpcEndpoint.getSelf();
 
 		// this message should be discarded and completed with an AkkaRpcException
-		Future<Integer> result = rpcGateway.foobar();
+		CompletableFuture<Integer> result = rpcGateway.foobar();
 
 		try {
 			result.get(timeout.getSize(), timeout.getUnit());
@@ -150,14 +147,14 @@ public class AkkaRpcActorTest extends TestLogger {
 
 		rpcEndpoint.start();
 
-		Future<WrongRpcGateway> futureGateway = akkaRpcService.connect(rpcEndpoint.getAddress(),
WrongRpcGateway.class);
+		CompletableFuture<WrongRpcGateway> futureGateway = akkaRpcService.connect(rpcEndpoint.getAddress(),
WrongRpcGateway.class);
 
 		WrongRpcGateway gateway = futureGateway.get(timeout.getSize(), timeout.getUnit());
 
 		// since it is a tell operation we won't receive a RpcConnectionException, it's only logged
 		gateway.tell("foobar");
 
-		Future<Boolean> result = gateway.barfoo();
+		CompletableFuture<Boolean> result = gateway.barfoo();
 
 		try {
 			result.get(timeout.getSize(), timeout.getUnit());
@@ -178,18 +175,13 @@ public class AkkaRpcActorTest extends TestLogger {
 		final DummyRpcEndpoint rpcEndpoint = new DummyRpcEndpoint(akkaRpcService);
 		rpcEndpoint.start();
 
-		Future<Void> terminationFuture = rpcEndpoint.getTerminationFuture();
+		CompletableFuture<Void> terminationFuture = rpcEndpoint.getTerminationFuture();
 
 		assertFalse(terminationFuture.isDone());
 
-		FlinkFuture.supplyAsync(new Callable<Void>() {
-			@Override
-			public Void call() throws Exception {
-				rpcEndpoint.shutDown();
-
-				return null;
-			}
-		}, actorSystem.dispatcher());
+		CompletableFuture.runAsync(
+			() -> rpcEndpoint.shutDown(),
+			actorSystem.dispatcher());
 
 		// wait until the rpc endpoint has terminated
 		terminationFuture.get();
@@ -201,7 +193,7 @@ public class AkkaRpcActorTest extends TestLogger {
 		rpcEndpoint.start();
 
 		ExceptionalGateway rpcGateway = rpcEndpoint.getSelf();
-		Future<Integer> result = rpcGateway.doStuff();
+		CompletableFuture<Integer> result = rpcGateway.doStuff();
 
 		try {
 			result.get(timeout.getSize(), timeout.getUnit());
@@ -220,7 +212,7 @@ public class AkkaRpcActorTest extends TestLogger {
 		rpcEndpoint.start();
 
 		ExceptionalGateway rpcGateway = rpcEndpoint.getSelf();
-		Future<Integer> result = rpcGateway.doStuff();
+		CompletableFuture<Integer> result = rpcGateway.doStuff();
 
 		try {
 			result.get(timeout.getSize(), timeout.getUnit());
@@ -244,7 +236,7 @@ public class AkkaRpcActorTest extends TestLogger {
 
 		rpcEndpoint.shutDown();
 
-		Future<Void> terminationFuture = rpcEndpoint.getTerminationFuture();
+		CompletableFuture<Void> terminationFuture = rpcEndpoint.getTerminationFuture();
 
 		try {
 			terminationFuture.get();
@@ -263,7 +255,7 @@ public class AkkaRpcActorTest extends TestLogger {
 
 		simpleRpcEndpoint.shutDown();
 
-		Future<Void> terminationFuture = simpleRpcEndpoint.getTerminationFuture();
+		CompletableFuture<Void> terminationFuture = simpleRpcEndpoint.getTerminationFuture();
 
 		// check that we executed the postStop method in the main thread, otherwise an exception
 		// would be thrown here.
@@ -275,11 +267,11 @@ public class AkkaRpcActorTest extends TestLogger {
 	// ------------------------------------------------------------------------
 
 	private interface DummyRpcGateway extends RpcGateway {
-		Future<Integer> foobar();
+		CompletableFuture<Integer> foobar();
 	}
 
 	private interface WrongRpcGateway extends RpcGateway {
-		Future<Boolean> barfoo();
+		CompletableFuture<Boolean> barfoo();
 		void tell(String message);
 	}
 
@@ -304,7 +296,7 @@ public class AkkaRpcActorTest extends TestLogger {
 	// ------------------------------------------------------------------------
 
 	private interface ExceptionalGateway extends RpcGateway {
-		Future<Integer> doStuff();
+		CompletableFuture<Integer> doStuff();
 	}
 
 	private static class ExceptionalEndpoint extends RpcEndpoint<ExceptionalGateway> {
@@ -326,8 +318,8 @@ public class AkkaRpcActorTest extends TestLogger {
 		}
 
 		@RpcMethod
-		public Future<Integer> doStuff() {
-			final FlinkCompletableFuture<Integer> future = new FlinkCompletableFuture<>();
+		public CompletableFuture<Integer> doStuff() {
+			final CompletableFuture<Integer> future = new CompletableFuture<>();
 
 			// complete the future slightly in the, well, future...
 			new Thread() {

http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
index 42f63ef..e0d1110 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
@@ -22,15 +22,14 @@ import akka.actor.ActorSystem;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
-import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.AfterClass;
 import org.junit.Test;
 
 import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ScheduledFuture;
@@ -91,26 +90,21 @@ public class AkkaRpcServiceTest extends TestLogger {
 	public void testExecuteRunnable() throws Exception {
 		final OneShotLatch latch = new OneShotLatch();
 
-		akkaRpcService.execute(new Runnable() {
-			@Override
-			public void run() {
-				latch.trigger();
-			}
-		});
+		akkaRpcService.execute(() -> latch.trigger());
 
 		latch.await(30L, TimeUnit.SECONDS);
 	}
 
 	/**
 	 * Tests that the {@link AkkaRpcService} can execute callables and returns their result
as
-	 * a {@link Future}.
+	 * a {@link CompletableFuture}.
 	 */
 	@Test
 	public void testExecuteCallable() throws InterruptedException, ExecutionException, TimeoutException
{
 		final OneShotLatch latch = new OneShotLatch();
 		final int expected = 42;
 
-		Future<Integer> result = akkaRpcService.execute(new Callable<Integer>() {
+		CompletableFuture<Integer> result = akkaRpcService.execute(new Callable<Integer>()
{
 			@Override
 			public Integer call() throws Exception {
 				latch.trigger();
@@ -145,18 +139,11 @@ public class AkkaRpcServiceTest extends TestLogger {
 		final ActorSystem actorSystem = AkkaUtils.createDefaultActorSystem();
 		final AkkaRpcService rpcService = new AkkaRpcService(actorSystem, Time.milliseconds(1000));
 
-		Future<Void> terminationFuture = rpcService.getTerminationFuture();
+		CompletableFuture<Void> terminationFuture = rpcService.getTerminationFuture();
 
 		assertFalse(terminationFuture.isDone());
 
-		FlinkFuture.supplyAsync(new Callable<Void>() {
-			@Override
-			public Void call() throws Exception {
-				rpcService.stopService();
-
-				return null;
-			}
-		}, actorSystem.dispatcher());
+		CompletableFuture.runAsync(() -> rpcService.stopService(), actorSystem.dispatcher());
 
 		terminationFuture.get();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java
index d640a97..34cf412 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java
@@ -23,7 +23,6 @@ import com.typesafe.config.Config;
 import com.typesafe.config.ConfigValueFactory;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcMethod;
@@ -35,6 +34,7 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.LinkedBlockingQueue;
 
 import static org.junit.Assert.assertThat;
@@ -108,7 +108,7 @@ public class MessageSerializationTest extends TestLogger {
 
 		String address = testEndpoint.getAddress();
 
-		Future<TestGateway> remoteGatewayFuture = akkaRpcService2.connect(address, TestGateway.class);
+		CompletableFuture<TestGateway> remoteGatewayFuture = akkaRpcService2.connect(address,
TestGateway.class);
 
 		TestGateway remoteGateway = remoteGatewayFuture.get(timeout.getSize(), timeout.getUnit());
 
@@ -129,7 +129,7 @@ public class MessageSerializationTest extends TestLogger {
 
 		String address = testEndpoint.getAddress();
 
-		Future<TestGateway> remoteGatewayFuture = akkaRpcService2.connect(address, TestGateway.class);
+		CompletableFuture<TestGateway> remoteGatewayFuture = akkaRpcService2.connect(address,
TestGateway.class);
 
 		TestGateway remoteGateway = remoteGatewayFuture.get(timeout.getSize(), timeout.getUnit());
 
@@ -153,7 +153,7 @@ public class MessageSerializationTest extends TestLogger {
 
 		String address = testEndpoint.getAddress();
 
-		Future<TestGateway> remoteGatewayFuture = akkaRpcService2.connect(address, TestGateway.class);
+		CompletableFuture<TestGateway> remoteGatewayFuture = akkaRpcService2.connect(address,
TestGateway.class);
 
 		TestGateway remoteGateway = remoteGatewayFuture.get(timeout.getSize(), timeout.getUnit());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
index 6a0bd87..53c435e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
@@ -26,8 +26,6 @@ import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.filecache.FileCache;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
@@ -62,6 +60,7 @@ import org.junit.Test;
 import java.net.InetAddress;
 import java.util.Arrays;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 
@@ -159,13 +158,13 @@ public class TaskExecutorITCase extends TestLogger {
 		JobMasterGateway jmGateway = mock(JobMasterGateway.class);
 
 		when(jmGateway.registerTaskManager(any(String.class), any(TaskManagerLocation.class), eq(jmLeaderId),
any(Time.class)))
-			.thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new JMTMRegistrationSuccess(taskManagerResourceId,
1234)));
+			.thenReturn(CompletableFuture.completedFuture(new JMTMRegistrationSuccess(taskManagerResourceId,
1234)));
 		when(jmGateway.getHostname()).thenReturn(jmAddress);
 		when(jmGateway.offerSlots(
 			eq(taskManagerResourceId),
 			any(Iterable.class),
 			eq(jmLeaderId),
-			any(Time.class))).thenReturn(mock(Future.class, RETURNS_MOCKS));
+			any(Time.class))).thenReturn(mock(CompletableFuture.class, RETURNS_MOCKS));
 
 
 		rpcService.registerGateway(rmAddress, resourceManager.getSelf());
@@ -185,7 +184,7 @@ public class TaskExecutorITCase extends TestLogger {
 			// notify the TM about the new RM leader
 			rmLeaderRetrievalService.notifyListener(rmAddress, rmLeaderId);
 
-			Future<RegistrationResponse> registrationResponseFuture = resourceManager.registerJobManager(
+			CompletableFuture<RegistrationResponse> registrationResponseFuture = resourceManager.registerJobManager(
 				rmLeaderId,
 				jmLeaderId,
 				jmResourceId,

http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index b596f75..a4f0e03 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -28,10 +28,7 @@ import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
-import org.apache.flink.runtime.concurrent.CompletableFuture;
-import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
@@ -65,7 +62,6 @@ import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
-import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.rpc.TestingRpcService;
 import org.apache.flink.runtime.rpc.TestingSerialRpcService;
@@ -95,6 +91,7 @@ import java.net.URL;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeoutException;
 
@@ -168,7 +165,7 @@ public class TaskExecutorTest extends TestLogger {
 				eq(taskManagerLocation),
 				eq(jmLeaderId),
 				any(Time.class)
-		)).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new JMTMRegistrationSuccess(jmResourceId,
blobPort)));
+		)).thenReturn(CompletableFuture.completedFuture(new JMTMRegistrationSuccess(jmResourceId,
blobPort)));
 		when(jobMasterGateway.getAddress()).thenReturn(jobMasterAddress);
 		when(jobMasterGateway.getHostname()).thenReturn("localhost");
 
@@ -230,7 +227,7 @@ public class TaskExecutorTest extends TestLogger {
 		when(rmGateway.registerTaskExecutor(
 			any(UUID.class), anyString(), any(ResourceID.class), any(SlotReport.class), any(Time.class)))
 			.thenReturn(
-				FlinkCompletableFuture.<RegistrationResponse>completed(
+				CompletableFuture.completedFuture(
 					new TaskExecutorRegistrationSuccess(
 						new InstanceID(),
 						rmResourceId,
@@ -334,7 +331,7 @@ public class TaskExecutorTest extends TestLogger {
 		when(rmGateway.registerTaskExecutor(
 			any(UUID.class), anyString(), any(ResourceID.class), any(SlotReport.class), any(Time.class)))
 			.thenReturn(
-				FlinkCompletableFuture.<RegistrationResponse>completed(
+				CompletableFuture.completedFuture(
 					new TaskExecutorRegistrationSuccess(
 						new InstanceID(),
 						rmResourceId,
@@ -467,7 +464,7 @@ public class TaskExecutorTest extends TestLogger {
 			ResourceManagerGateway rmGateway = mock(ResourceManagerGateway.class);
 			when(rmGateway.registerTaskExecutor(
 					any(UUID.class), anyString(), any(ResourceID.class), any(SlotReport.class), any(Time.class)))
-				.thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new TaskExecutorRegistrationSuccess(
+				.thenReturn(CompletableFuture.completedFuture(new TaskExecutorRegistrationSuccess(
 					new InstanceID(), resourceManagerResourceId, 10L)));
 
 			TaskManagerConfiguration taskManagerServicesConfiguration = mock(TaskManagerConfiguration.class);
@@ -539,11 +536,11 @@ public class TaskExecutorTest extends TestLogger {
 
 			when(rmGateway1.registerTaskExecutor(
 					any(UUID.class), anyString(), any(ResourceID.class), any(SlotReport.class), any(Time.class)))
-					.thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(
+					.thenReturn(CompletableFuture.completedFuture(
 						new TaskExecutorRegistrationSuccess(new InstanceID(), rmResourceId1, 10L)));
 			when(rmGateway2.registerTaskExecutor(
 					any(UUID.class), anyString(), any(ResourceID.class), any(SlotReport.class), any(Time.class)))
-					.thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(
+					.thenReturn(CompletableFuture.completedFuture(
 						new TaskExecutorRegistrationSuccess(new InstanceID(), rmResourceId2, 10L)));
 
 			rpc.registerGateway(address1, rmGateway1);
@@ -728,7 +725,7 @@ public class TaskExecutorTest extends TestLogger {
 
 			taskManager.submitTask(tdd, jobManagerLeaderId);
 
-			Future<Boolean> completionFuture = TestInvokable.completableFuture;
+			CompletableFuture<Boolean> completionFuture = TestInvokable.completableFuture;
 
 			completionFuture.get();
 
@@ -744,7 +741,7 @@ public class TaskExecutorTest extends TestLogger {
 	 */
 	public static class TestInvokable extends AbstractInvokable {
 
-		static final CompletableFuture<Boolean> completableFuture = new FlinkCompletableFuture<>();
+		static final CompletableFuture<Boolean> completableFuture = new CompletableFuture<>();
 
 		@Override
 		public void invoke() throws Exception {
@@ -793,7 +790,7 @@ public class TaskExecutorTest extends TestLogger {
 			any(String.class),
 			eq(resourceId),
 			any(SlotReport.class),
-			any(Time.class))).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new
TaskExecutorRegistrationSuccess(registrationId, resourceManagerResourceId, 1000L)));
+			any(Time.class))).thenReturn(CompletableFuture.completedFuture(new TaskExecutorRegistrationSuccess(registrationId,
resourceManagerResourceId, 1000L)));
 
 		final String jobManagerAddress = "jm";
 		final UUID jobManagerLeaderId = UUID.randomUUID();
@@ -807,13 +804,13 @@ public class TaskExecutorTest extends TestLogger {
 				eq(taskManagerLocation),
 				eq(jobManagerLeaderId),
 				any(Time.class)
-		)).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new JMTMRegistrationSuccess(jmResourceId,
blobPort)));
+		)).thenReturn(CompletableFuture.completedFuture(new JMTMRegistrationSuccess(jmResourceId,
blobPort)));
 		when(jobMasterGateway.getHostname()).thenReturn(jobManagerAddress);
 		when(jobMasterGateway.offerSlots(
 			any(ResourceID.class),
 			any(Iterable.class),
 			any(UUID.class),
-			any(Time.class))).thenReturn(mock(Future.class, RETURNS_MOCKS));
+			any(Time.class))).thenReturn(mock(CompletableFuture.class, RETURNS_MOCKS));
 
 		rpc.registerGateway(resourceManagerAddress, resourceManagerGateway);
 		rpc.registerGateway(jobManagerAddress, jobMasterGateway);
@@ -906,7 +903,7 @@ public class TaskExecutorTest extends TestLogger {
 			any(String.class),
 			eq(resourceId),
 			any(SlotReport.class),
-			any(Time.class))).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new
TaskExecutorRegistrationSuccess(registrationId, resourceManagerResourceId, 1000L)));
+			any(Time.class))).thenReturn(CompletableFuture.completedFuture(new TaskExecutorRegistrationSuccess(registrationId,
resourceManagerResourceId, 1000L)));
 
 		final ResourceID jmResourceId = new ResourceID(jobManagerAddress);
 		final int blobPort = 42;
@@ -923,12 +920,12 @@ public class TaskExecutorTest extends TestLogger {
 				eq(taskManagerLocation),
 				eq(jobManagerLeaderId),
 				any(Time.class)
-		)).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new JMTMRegistrationSuccess(jmResourceId,
blobPort)));
+		)).thenReturn(CompletableFuture.completedFuture(new JMTMRegistrationSuccess(jmResourceId,
blobPort)));
 		when(jobMasterGateway.getHostname()).thenReturn(jobManagerAddress);
 
 		when(jobMasterGateway.offerSlots(
 				any(ResourceID.class), any(Iterable.class), eq(jobManagerLeaderId), any(Time.class)))
-			.thenReturn(FlinkCompletableFuture.completed((Iterable<SlotOffer>)Collections.singleton(offer1)));
+			.thenReturn(CompletableFuture.completedFuture((Iterable<SlotOffer>)Collections.singleton(offer1)));
 
 		rpc.registerGateway(resourceManagerAddress, resourceManagerGateway);
 		rpc.registerGateway(jobManagerAddress, jobMasterGateway);
@@ -1120,7 +1117,7 @@ public class TaskExecutorTest extends TestLogger {
 			eq(resourceId),
 			any(SlotReport.class),
 			any(Time.class))).thenReturn(
-				FlinkCompletableFuture.<RegistrationResponse>completed(new TaskExecutorRegistrationSuccess(registrationId,
resourceManagerResourceId, 1000L)));
+				CompletableFuture.completedFuture(new TaskExecutorRegistrationSuccess(registrationId,
resourceManagerResourceId, 1000L)));
 
 		final ResourceID jmResourceId = new ResourceID(jobManagerAddress);
 		final int blobPort = 42;
@@ -1137,7 +1134,7 @@ public class TaskExecutorTest extends TestLogger {
 			eq(taskManagerLocation),
 			eq(jobManagerLeaderId),
 			any(Time.class)
-		)).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new JMTMRegistrationSuccess(jmResourceId,
blobPort)));
+		)).thenReturn(CompletableFuture.completedFuture(new JMTMRegistrationSuccess(jmResourceId,
blobPort)));
 		when(jobMasterGateway.getHostname()).thenReturn(jobManagerAddress);
 
 
@@ -1224,7 +1221,7 @@ public class TaskExecutorTest extends TestLogger {
 				Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
 				Collections.<InputGateDeploymentDescriptor>emptyList());
 
-			CompletableFuture<Iterable<SlotOffer>> offerResultFuture = new FlinkCompletableFuture<>();
+			CompletableFuture<Iterable<SlotOffer>> offerResultFuture = new CompletableFuture<>();
 
 			// submit task first and then return acceptance response
 			when(

http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index e790ea8..fdae251 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -35,8 +35,6 @@ import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.akka.FlinkUntypedActor;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.concurrent.CompletableFuture;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
@@ -110,6 +108,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionProducerState;
@@ -1612,7 +1611,7 @@ public class TaskManagerTest extends TestLogger {
 
 				Await.result(result, timeout);
 
-				org.apache.flink.runtime.concurrent.Future<Boolean> cancelFuture = TestInvokableRecordCancel.gotCanceled();
+				CompletableFuture<Boolean> cancelFuture = TestInvokableRecordCancel.gotCanceled();
 
 				assertEquals(true, cancelFuture.get());
 			} finally {
@@ -2070,7 +2069,7 @@ public class TaskManagerTest extends TestLogger {
 	public static final class TestInvokableRecordCancel extends AbstractInvokable {
 
 		private static final Object lock = new Object();
-		private static CompletableFuture<Boolean> gotCanceledFuture = new FlinkCompletableFuture<>();
+		private static CompletableFuture<Boolean> gotCanceledFuture = new CompletableFuture<>();
 
 		@Override
 		public void invoke() throws Exception {
@@ -2099,11 +2098,11 @@ public class TaskManagerTest extends TestLogger {
 
 		public static void resetGotCanceledFuture() {
 			synchronized (lock) {
-				gotCanceledFuture = new FlinkCompletableFuture<>();
+				gotCanceledFuture = new CompletableFuture<>();
 			}
 		}
 
-		public static org.apache.flink.runtime.concurrent.Future<Boolean> gotCanceled() {
+		public static CompletableFuture<Boolean> gotCanceled() {
 			synchronized (lock) {
 				return gotCanceledFuture;
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueueTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueueTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueueTest.java
index 5832b89..f3b68c4 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueueTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueueTest.java
@@ -18,8 +18,7 @@
 
 package org.apache.flink.streaming.api.operators.async.queue;
 
-import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.streaming.api.operators.async.OperatorActions;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -34,7 +33,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
-import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
@@ -92,17 +91,20 @@ public class OrderedStreamElementQueueTest extends TestLogger {
 			queue.put(entry);
 		}
 
-		Future<List<AsyncResult>> pollOperation = FlinkFuture.supplyAsync(new Callable<List<AsyncResult>>()
{
-			@Override
-			public List<AsyncResult> call() throws Exception {
+		CompletableFuture<List<AsyncResult>> pollOperation = CompletableFuture.supplyAsync(
+			() -> {
 				List<AsyncResult> result = new ArrayList<>(4);
 				while (!queue.isEmpty()) {
-					result.add(queue.poll());
+					try {
+						result.add(queue.poll());
+					} catch (InterruptedException e) {
+						throw new FlinkFutureException(e);
+					}
 				}
 
 				return result;
-			}
-		}, executor);
+			},
+			executor);
 
 		Thread.sleep(10L);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueTest.java
index fe9db95..d396756 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueTest.java
@@ -18,8 +18,7 @@
 
 package org.apache.flink.streaming.api.operators.async.queue;
 
-import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.streaming.api.operators.async.OperatorActions;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -37,7 +36,7 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
-import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
@@ -176,14 +175,15 @@ public class StreamElementQueueTest extends TestLogger {
 
 		Assert.assertEquals(1, queue.size());
 
-		Future<Void> putOperation = FlinkFuture.supplyAsync(new Callable<Void>() {
-			@Override
-			public Void call() throws Exception {
-				queue.put(streamRecordQueueEntry2);
-
-				return null;
-			}
-		}, executor);
+		CompletableFuture<Void> putOperation = CompletableFuture.runAsync(
+			() -> {
+				try {
+					queue.put(streamRecordQueueEntry2);
+				} catch (InterruptedException e) {
+					throw new FlinkFutureException(e);
+				}
+			},
+			executor);
 
 		// give the future a chance to complete
 		Thread.sleep(10L);
@@ -215,12 +215,15 @@ public class StreamElementQueueTest extends TestLogger {
 
 		Assert.assertTrue(queue.isEmpty());
 
-		Future<AsyncResult> peekOperation = FlinkFuture.supplyAsync(new Callable<AsyncResult>()
{
-			@Override
-			public AsyncResult call() throws Exception {
-				return queue.peekBlockingly();
-			}
-		}, executor);
+		CompletableFuture<AsyncResult> peekOperation = CompletableFuture.supplyAsync(
+			() -> {
+				try {
+					return queue.peekBlockingly();
+				} catch (InterruptedException e) {
+					throw new FlinkFutureException(e);
+				}
+			},
+			executor);
 
 		Thread.sleep(10L);
 
@@ -236,12 +239,15 @@ public class StreamElementQueueTest extends TestLogger {
 		Assert.assertEquals(watermarkQueueEntry, queue.poll());
 		Assert.assertTrue(queue.isEmpty());
 
-		Future<AsyncResult> pollOperation = FlinkFuture.supplyAsync(new Callable<AsyncResult>()
{
-			@Override
-			public AsyncResult call() throws Exception {
-				return queue.poll();
-			}
-		}, executor);
+		CompletableFuture<AsyncResult> pollOperation = CompletableFuture.supplyAsync(
+			() -> {
+				try {
+					return queue.poll();
+				} catch (InterruptedException e) {
+					throw new FlinkFutureException(e);
+				}
+			},
+			executor);
 
 		Thread.sleep(10L);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueueTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueueTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueueTest.java
index ba6ce42..cc0bc30 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueueTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueueTest.java
@@ -18,8 +18,7 @@
 
 package org.apache.flink.streaming.api.operators.async.queue;
 
-import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.streaming.api.operators.async.OperatorActions;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -35,7 +34,7 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
-import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
@@ -100,12 +99,15 @@ public class UnorderedStreamElementQueueTest extends TestLogger {
 
 		Assert.assertTrue(8 == queue.size());
 
-		Future<AsyncResult> firstPoll = FlinkFuture.supplyAsync(new Callable<AsyncResult>()
{
-			@Override
-			public AsyncResult call() throws Exception {
-				return queue.poll();
-			}
-		}, executor);
+		CompletableFuture<AsyncResult> firstPoll = CompletableFuture.supplyAsync(
+			() -> {
+				try {
+					return queue.poll();
+				} catch (InterruptedException e) {
+					throw new FlinkFutureException(e);
+				}
+			},
+			executor);
 
 		// this should not fulfill the poll, because R3 is behind W1
 		record3.collect(Collections.<Integer>emptyList());
@@ -118,12 +120,15 @@ public class UnorderedStreamElementQueueTest extends TestLogger {
 
 		Assert.assertEquals(record2, firstPoll.get());
 
-		Future<AsyncResult> secondPoll = FlinkFuture.supplyAsync(new Callable<AsyncResult>()
{
-			@Override
-			public AsyncResult call() throws Exception {
-				return queue.poll();
-			}
-		}, executor);
+		CompletableFuture<AsyncResult> secondPoll = CompletableFuture.supplyAsync(
+			() -> {
+				try {
+					return queue.poll();
+				} catch (InterruptedException e) {
+					throw new FlinkFutureException(e);
+				}
+			},
+			executor);
 
 		record6.collect(Collections.<Integer>emptyList());
 		record4.collect(Collections.<Integer>emptyList());
@@ -161,12 +166,15 @@ public class UnorderedStreamElementQueueTest extends TestLogger {
 		// only R5 left in the queue
 		Assert.assertTrue(1 == queue.size());
 
-		Future<AsyncResult> thirdPoll = FlinkFuture.supplyAsync(new Callable<AsyncResult>()
{
-			@Override
-			public AsyncResult call() throws Exception {
-				return queue.poll();
-			}
-		}, executor);
+		CompletableFuture<AsyncResult> thirdPoll = CompletableFuture.supplyAsync(
+			() -> {
+				try {
+					return queue.poll();
+				} catch (InterruptedException e) {
+					throw new FlinkFutureException(e);
+				}
+			},
+			executor);
 
 		Thread.sleep(10L);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
index f021b38..4f2135d 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
@@ -28,8 +28,6 @@ import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.execution.Environment;
@@ -75,6 +73,7 @@ import java.io.IOException;
 import java.net.URL;
 import java.util.Collections;
 import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.FutureTask;
 
 import static org.junit.Assert.assertEquals;
@@ -160,14 +159,9 @@ public class StreamTaskTerminationTest extends TestLogger {
 			mock(PartitionProducerStateChecker.class),
 			Executors.directExecutor());
 
-		Future<Void> taskRun = FlinkCompletableFuture.supplyAsync(new Callable<Void>()
{
-			@Override
-			public Void call() throws Exception {
-				task.run();
-
-				return null;
-			}
-		}, TestingUtils.defaultExecutor());
+		CompletableFuture<Void> taskRun = CompletableFuture.runAsync(
+			() -> task.run(),
+			TestingUtils.defaultExecutor());
 
 		// wait until the stream task started running
 		RUN_LATCH.await();


Mime
View raw message