flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [2/2] flink git commit: [FLINK-4656] [rpc] Port the existing code to Flink's own future abstraction
Date Wed, 21 Sep 2016 16:20:47 GMT
[FLINK-4656] [rpc] Port the existing code to Flink's own future abstraction

This closes #2530.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/31a091b9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/31a091b9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/31a091b9

Branch: refs/heads/flip-6
Commit: 31a091b930178bf2aec2881ee273fe0e5e17464d
Parents: 04fbdb3
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Wed Sep 21 17:26:21 2016 +0200
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Wed Sep 21 18:20:17 2016 +0200

----------------------------------------------------------------------
 .../concurrent/impl/FlinkCompletableFuture.java | 22 +++++-
 .../runtime/concurrent/impl/FlinkFuture.java    |  4 ++
 .../flink/runtime/jobmaster/JobMaster.java      |  2 +-
 .../runtime/jobmaster/JobMasterGateway.java     |  2 +-
 .../registration/RetryingRegistration.java      | 65 ++++++++---------
 .../resourcemanager/ResourceManager.java        | 13 ++--
 .../resourcemanager/ResourceManagerGateway.java |  9 ++-
 .../slotmanager/SlotManager.java                |  9 ++-
 .../flink/runtime/rpc/MainThreadExecutable.java | 64 +++++++++++++++++
 .../flink/runtime/rpc/MainThreadExecutor.java   | 64 -----------------
 .../apache/flink/runtime/rpc/RpcEndpoint.java   | 60 ++++++----------
 .../apache/flink/runtime/rpc/RpcService.java    | 17 +++--
 .../runtime/rpc/akka/AkkaInvocationHandler.java | 42 +++++------
 .../flink/runtime/rpc/akka/AkkaRpcActor.java    | 21 +++++-
 .../flink/runtime/rpc/akka/AkkaRpcService.java  | 28 ++++----
 .../runtime/taskexecutor/TaskExecutor.java      | 12 ++--
 .../taskexecutor/TaskExecutorGateway.java       |  6 +-
 ...TaskExecutorToResourceManagerConnection.java | 34 +++++----
 .../registration/RetryingRegistrationTest.java  | 75 ++++++++++----------
 .../registration/TestRegistrationGateway.java   |  6 +-
 .../resourcemanager/ResourceManagerHATest.java  |  4 +-
 .../slotmanager/SlotProtocolTest.java           | 14 ++--
 .../flink/runtime/rpc/AsyncCallsTest.java       | 13 ++--
 .../flink/runtime/rpc/RpcCompletenessTest.java  |  9 +--
 .../flink/runtime/rpc/TestingGatewayBase.java   | 18 ++---
 .../flink/runtime/rpc/TestingRpcService.java    | 20 +++---
 .../runtime/rpc/TestingSerialRpcService.java    | 54 +++++++-------
 .../runtime/rpc/akka/AkkaRpcActorTest.java      | 19 ++---
 .../runtime/rpc/akka/AkkaRpcServiceTest.java    |  4 +-
 .../rpc/akka/MainThreadValidationTest.java      |  7 +-
 .../rpc/akka/MessageSerializationTest.java      | 19 +++--
 .../runtime/taskexecutor/TaskExecutorTest.java  |  9 ++-
 32 files changed, 376 insertions(+), 369 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/31a091b9/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkCompletableFuture.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkCompletableFuture.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkCompletableFuture.java
index 5566880..e648a71 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkCompletableFuture.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkCompletableFuture.java
@@ -18,9 +18,11 @@
 
 package org.apache.flink.runtime.concurrent.impl;
 
+import akka.dispatch.Futures;
 import org.apache.flink.runtime.concurrent.CompletableFuture;
 import org.apache.flink.util.Preconditions;
 import scala.concurrent.Promise;
+import scala.concurrent.Promise$;
 
 import java.util.concurrent.CancellationException;
 
@@ -34,7 +36,17 @@ public class FlinkCompletableFuture<T> extends FlinkFuture<T> implements Complet
 	private final Promise<T> promise;
 
 	public FlinkCompletableFuture() {
-		promise = new scala.concurrent.impl.Promise.DefaultPromise<>();
+		promise = Futures.promise();
+		scalaFuture = promise.future();
+	}
+
+	private FlinkCompletableFuture(T value) {
+		promise = Promise$.MODULE$.successful(value);
+		scalaFuture = promise.future();
+	}
+
+	private FlinkCompletableFuture(Throwable t) {
+		promise = Promise$.MODULE$.failed(t);
 		scalaFuture = promise.future();
 	}
 
@@ -68,4 +80,12 @@ public class FlinkCompletableFuture<T> extends FlinkFuture<T> implements Complet
 	public boolean cancel(boolean mayInterruptIfRunning) {
 		return completeExceptionally(new CancellationException("Future has been canceled."));
 	}
+
+	public static <T> FlinkCompletableFuture<T> completed(T value) {
+		return new FlinkCompletableFuture<>(value);
+	}
+
+	public static <T> FlinkCompletableFuture<T> completedExceptionally(Throwable t) {
+		return new FlinkCompletableFuture<>(t);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/31a091b9/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java
index 361cd3d..b28a1bd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java
@@ -59,6 +59,10 @@ public class FlinkFuture<T> implements Future<T> {
 		this.scalaFuture = Preconditions.checkNotNull(scalaFuture);
 	}
 
+	public scala.concurrent.Future<T> getScalaFuture() {
+		return scalaFuture;
+	}
+
 	//-----------------------------------------------------------------------------------
 	// Future's methods
 	//-----------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/31a091b9/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 0a6a7ef..1537396 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -36,7 +36,7 @@ import java.util.UUID;
 
 /**
  * JobMaster implementation. The job master is responsible for the execution of a single
- * {@link org.apache.flink.runtime.jobgraph.JobGraph}.
+ * {@link JobGraph}.
  * <p>
  * It offers the following methods as part of its rpc interface to interact with the JobMaster
  * remotely:

http://git-wip-us.apache.org/repos/asf/flink/blob/31a091b9/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
index a53e383..86bf17c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
@@ -18,10 +18,10 @@
 
 package org.apache.flink.runtime.jobmaster;
 
+import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
-import scala.concurrent.Future;
 
 /**
  * {@link JobMaster} rpc gateway interface

http://git-wip-us.apache.org/repos/asf/flink/blob/31a091b9/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java
index ea49e42..32dd978 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java
@@ -18,19 +18,17 @@
 
 package org.apache.flink.runtime.registration;
 
-import akka.dispatch.OnFailure;
-import akka.dispatch.OnSuccess;
-
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.concurrent.AcceptFunction;
+import org.apache.flink.runtime.concurrent.ApplyFunction;
+import org.apache.flink.runtime.concurrent.CompletableFuture;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcService;
 
 import org.slf4j.Logger;
 
-import scala.concurrent.Future;
-import scala.concurrent.Promise;
-import scala.concurrent.impl.Promise.DefaultPromise;
-
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -86,7 +84,7 @@ public abstract class RetryingRegistration<Gateway extends RpcGateway, Success e
 
 	private final UUID leaderId;
 
-	private final Promise<Tuple2<Gateway, Success>> completionPromise;
+	private final CompletableFuture<Tuple2<Gateway, Success>> completionFuture;
 
 	private final long initialRegistrationTimeout;
 
@@ -140,7 +138,7 @@ public abstract class RetryingRegistration<Gateway extends RpcGateway, Success e
 		this.delayOnError = delayOnError;
 		this.delayOnRefusedRegistration = delayOnRefusedRegistration;
 
-		this.completionPromise = new DefaultPromise<>();
+		this.completionFuture = new FlinkCompletableFuture<>();
 	}
 
 	// ------------------------------------------------------------------------
@@ -148,7 +146,7 @@ public abstract class RetryingRegistration<Gateway extends RpcGateway, Success e
 	// ------------------------------------------------------------------------
 
 	public Future<Tuple2<Gateway, Success>> getFuture() {
-		return completionPromise.future();
+		return completionFuture;
 	}
 
 	/**
@@ -184,28 +182,30 @@ public abstract class RetryingRegistration<Gateway extends RpcGateway, Success e
 			Future<Gateway> resourceManagerFuture = rpcService.connect(targetAddress, targetType);
 	
 			// upon success, start the registration attempts
-			resourceManagerFuture.onSuccess(new OnSuccess<Gateway>() {
+			resourceManagerFuture.thenAcceptAsync(new AcceptFunction<Gateway>() {
 				@Override
-				public void onSuccess(Gateway result) {
+				public void accept(Gateway result) {
 					log.info("Resolved {} address, beginning registration", targetName);
 					register(result, 1, initialRegistrationTimeout);
 				}
-			}, rpcService.getExecutionContext());
-	
+			}, rpcService.getExecutor());
+
 			// upon failure, retry, unless this is cancelled
-			resourceManagerFuture.onFailure(new OnFailure() {
+			resourceManagerFuture.exceptionallyAsync(new ApplyFunction<Throwable, Void>() {
 				@Override
-				public void onFailure(Throwable failure) {
+				public Void apply(Throwable failure) {
 					if (!isCanceled()) {
 						log.warn("Could not resolve {} address {}, retrying...", targetName, targetAddress, failure);
 						startRegistration();
 					}
+
+					return null;
 				}
-			}, rpcService.getExecutionContext());
+			}, rpcService.getExecutor());
 		}
 		catch (Throwable t) {
 			cancel();
-			completionPromise.tryFailure(t);
+			completionFuture.completeExceptionally(t);
 		}
 	}
 
@@ -225,15 +225,14 @@ public abstract class RetryingRegistration<Gateway extends RpcGateway, Success e
 			Future<RegistrationResponse> registrationFuture = invokeRegistration(gateway, leaderId, timeoutMillis);
 	
 			// if the registration was successful, let the TaskExecutor know
-			registrationFuture.onSuccess(new OnSuccess<RegistrationResponse>() {
-				
+			registrationFuture.thenAcceptAsync(new AcceptFunction<RegistrationResponse>() {
 				@Override
-				public void onSuccess(RegistrationResponse result) throws Throwable {
+				public void accept(RegistrationResponse result) {
 					if (!isCanceled()) {
 						if (result instanceof RegistrationResponse.Success) {
 							// registration successful!
 							Success success = (Success) result;
-							completionPromise.success(new Tuple2<>(gateway, success));
+							completionFuture.complete(Tuple2.of(gateway, success));
 						}
 						else {
 							// registration refused or unknown
@@ -241,7 +240,7 @@ public abstract class RetryingRegistration<Gateway extends RpcGateway, Success e
 								RegistrationResponse.Decline decline = (RegistrationResponse.Decline) result;
 								log.info("Registration at {} was declined: {}", targetName, decline.getReason());
 							} else {
-								log.error("Received unknown response to registration attempt: " + result);
+								log.error("Received unknown response to registration attempt: {}", result);
 							}
 
 							log.info("Pausing and re-attempting registration in {} ms", delayOnRefusedRegistration);
@@ -249,12 +248,12 @@ public abstract class RetryingRegistration<Gateway extends RpcGateway, Success e
 						}
 					}
 				}
-			}, rpcService.getExecutionContext());
+			}, rpcService.getExecutor());
 	
 			// upon failure, retry
-			registrationFuture.onFailure(new OnFailure() {
+			registrationFuture.exceptionallyAsync(new ApplyFunction<Throwable, Void>() {
 				@Override
-				public void onFailure(Throwable failure) {
+				public Void apply(Throwable failure) {
 					if (!isCanceled()) {
 						if (failure instanceof TimeoutException) {
 							// we simply have not received a response in time. maybe the timeout was
@@ -262,26 +261,28 @@ public abstract class RetryingRegistration<Gateway extends RpcGateway, Success e
 							// currently down.
 							if (log.isDebugEnabled()) {
 								log.debug("Registration at {} ({}) attempt {} timed out after {} ms",
-										targetName, targetAddress, attempt, timeoutMillis);
+									targetName, targetAddress, attempt, timeoutMillis);
 							}
-	
+
 							long newTimeoutMillis = Math.min(2 * timeoutMillis, maxRegistrationTimeout);
 							register(gateway, attempt + 1, newTimeoutMillis);
 						}
 						else {
 							// a serious failure occurred. we still should not give up, but keep trying
-							log.error("Registration at " + targetName + " failed due to an error", failure);
+							log.error("Registration at {} failed due to an error", targetName, failure);
 							log.info("Pausing and re-attempting registration in {} ms", delayOnError);
-	
+
 							registerLater(gateway, 1, initialRegistrationTimeout, delayOnError);
 						}
 					}
+
+					return null;
 				}
-			}, rpcService.getExecutionContext());
+			}, rpcService.getExecutor());
 		}
 		catch (Throwable t) {
 			cancel();
-			completionPromise.tryFailure(t);
+			completionFuture.completeExceptionally(t);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/31a091b9/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index d9a7134..5370710 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -18,12 +18,11 @@
 
 package org.apache.flink.runtime.resourcemanager;
 
-import akka.dispatch.Futures;
-import akka.dispatch.Mapper;
-
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.ApplyFunction;
+import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.leaderelection.LeaderContender;
@@ -38,7 +37,6 @@ import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.concurrent.Future;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -126,10 +124,9 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> impleme
 			getRpcService().connect(jobMasterRegistration.getAddress(), JobMasterGateway.class);
 		final JobID jobID = jobMasterRegistration.getJobID();
 
-		return jobMasterFuture.map(new Mapper<JobMasterGateway, RegistrationResponse>() {
+		return jobMasterFuture.thenApplyAsync(new ApplyFunction<JobMasterGateway, RegistrationResponse>() {
 			@Override
-			public RegistrationResponse apply(final JobMasterGateway jobMasterGateway) {
-
+			public RegistrationResponse apply(JobMasterGateway jobMasterGateway) {
 				final JobMasterGateway existingGateway = jobMasterGateways.put(jobID, jobMasterGateway);
 				if (existingGateway != null) {
 					LOG.info("Replacing existing gateway {} for JobID {} with  {}.",
@@ -137,7 +134,7 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> impleme
 				}
 				return new RegistrationResponse(true);
 			}
-		}, getMainThreadExecutionContext());
+		}, getMainThreadExecutor());
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/31a091b9/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
index c8e3488..5c8786c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
@@ -18,14 +18,13 @@
 
 package org.apache.flink.runtime.resourcemanager;
 
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcTimeout;
 import org.apache.flink.runtime.jobmaster.JobMaster;
 
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
 import java.util.UUID;
 
 /**
@@ -42,7 +41,7 @@ public interface ResourceManagerGateway extends RpcGateway {
 	 */
 	Future<RegistrationResponse> registerJobMaster(
 		JobMasterRegistration jobMasterRegistration,
-		@RpcTimeout FiniteDuration timeout);
+		@RpcTimeout Time timeout);
 
 	/**
 	 * Register a {@link JobMaster} at the resource manager.
@@ -73,5 +72,5 @@ public interface ResourceManagerGateway extends RpcGateway {
 			UUID resourceManagerLeaderId,
 			String taskExecutorAddress,
 			ResourceID resourceID,
-			@RpcTimeout FiniteDuration timeout);
+			@RpcTimeout Time timeout);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/31a091b9/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
index 96fde7d..97176b2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
@@ -19,11 +19,13 @@
 package org.apache.flink.runtime.resourcemanager.slotmanager;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.clusterframework.types.ResourceSlot;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
 import org.apache.flink.runtime.resourcemanager.SlotRequest;
 import org.apache.flink.runtime.resourcemanager.SlotRequestRegistered;
@@ -33,14 +35,11 @@ import org.apache.flink.runtime.taskexecutor.SlotStatus;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
 
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.UUID;
-import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -79,7 +78,7 @@ public abstract class SlotManager implements LeaderRetrievalListener {
 	/** All allocations, we can lookup allocations either by SlotID or AllocationID */
 	private final AllocationMap allocationMap;
 
-	private final FiniteDuration timeout;
+	private final Time timeout;
 
 	/** The current leader id set by the ResourceManager */
 	private UUID leaderID;
@@ -90,7 +89,7 @@ public abstract class SlotManager implements LeaderRetrievalListener {
 		this.freeSlots = new HashMap<>(16);
 		this.allocationMap = new AllocationMap();
 		this.taskManagerGateways = new HashMap<>();
-		this.timeout = new FiniteDuration(10, TimeUnit.SECONDS);
+		this.timeout = Time.seconds(10);
 	}
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/31a091b9/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutable.java
new file mode 100644
index 0000000..ec1c984
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutable.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.Future;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Interface to execute {@link Runnable} and {@link Callable} in the main thread of the underlying
+ * RPC endpoint.
+ *
+ * <p>This interface is intended to be implemented by the self gateway in a {@link RpcEndpoint}
+ * implementation which allows to dispatch local procedures to the main thread of the underlying
+ * RPC endpoint.
+ */
+public interface MainThreadExecutable {
+
+	/**
+	 * Execute the runnable in the main thread of the underlying RPC endpoint.
+	 *
+	 * @param runnable Runnable to be executed
+	 */
+	void runAsync(Runnable runnable);
+
+	/**
+	 * Execute the callable in the main thread of the underlying RPC endpoint and return a future for
+	 * the callable result. If the future is not completed within the given timeout, the returned
+	 * future will throw a {@link TimeoutException}.
+	 *
+	 * @param callable Callable to be executed
+	 * @param callTimeout Timeout for the future to complete
+	 * @param <V> Return value of the callable
+	 * @return Future of the callable result
+	 */
+	<V> Future<V> callAsync(Callable<V> callable, Time callTimeout);
+
+	/**
+	 * Execute the runnable in the main thread of the underlying RPC endpoint, with
+	 * a delay of the given number of milliseconds.
+	 *
+	 * @param runnable Runnable to be executed
+	 * @param delay    The delay, in milliseconds, after which the runnable will be executed
+	 */
+	void scheduleRunAsync(Runnable runnable, long delay);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/31a091b9/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutor.java
deleted file mode 100644
index 5e4fead..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutor.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rpc;
-
-import akka.util.Timeout;
-import scala.concurrent.Future;
-
-import java.util.concurrent.Callable;
-import java.util.concurrent.TimeoutException;
-
-/**
- * Interface to execute {@link Runnable} and {@link Callable} in the main thread of the underlying
- * RPC endpoint.
- *
- * <p>This interface is intended to be implemented by the self gateway in a {@link RpcEndpoint}
- * implementation which allows to dispatch local procedures to the main thread of the underlying
- * RPC endpoint.
- */
-public interface MainThreadExecutor {
-
-	/**
-	 * Execute the runnable in the main thread of the underlying RPC endpoint.
-	 *
-	 * @param runnable Runnable to be executed
-	 */
-	void runAsync(Runnable runnable);
-
-	/**
-	 * Execute the callable in the main thread of the underlying RPC endpoint and return a future for
-	 * the callable result. If the future is not completed within the given timeout, the returned
-	 * future will throw a {@link TimeoutException}.
-	 *
-	 * @param callable Callable to be executed
-	 * @param callTimeout Timeout for the future to complete
-	 * @param <V> Return value of the callable
-	 * @return Future of the callable result
-	 */
-	<V> Future<V> callAsync(Callable<V> callable, Timeout callTimeout);
-
-	/**
-	 * Execute the runnable in the main thread of the underlying RPC endpoint, with
-	 * a delay of the given number of milliseconds.
-	 *
-	 * @param runnable Runnable to be executed
-	 * @param delay    The delay, in milliseconds, after which the runnable will be executed
-	 */
-	void scheduleRunAsync(Runnable runnable, long delay);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/31a091b9/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
index e9e2b2c..4e5e49a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
@@ -18,16 +18,15 @@
 
 package org.apache.flink.runtime.rpc;
 
-import akka.util.Timeout;
-
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.ReflectionUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import scala.concurrent.ExecutionContext;
-import scala.concurrent.Future;
-
 import java.util.concurrent.Callable;
+import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -49,8 +48,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * thread, we don't have to reason about concurrent accesses, in the same way in the Actor Model
  * of Erlang or Akka.
  *
- * <p>The RPC endpoint provides provides {@link #runAsync(Runnable)}, {@link #callAsync(Callable, Timeout)}
-  * and the {@link #getMainThreadExecutionContext()} to execute code in the RPC endoint's main thread.
+ * <p>The RPC endpoint provides provides {@link #runAsync(Runnable)}, {@link #callAsync(Callable, Time)}
+  * and the {@link #getMainThreadExecutor()} to execute code in the RPC endoint's main thread.
  *
  * @param <C> The RPC gateway counterpart for the implementing RPC endpoint
  */
@@ -69,9 +68,9 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
 	/** Self gateway which can be used to schedule asynchronous calls on yourself */
 	private final C self;
 
-	/** The main thread execution context to be used to execute future callbacks in the main thread
+	/** The main thread executor to be used to execute future callbacks in the main thread
 	 * of the executing rpc server. */
-	private final ExecutionContext mainThreadExecutionContext;
+	private final Executor mainThreadExecutor;
 
 	/** A reference to the endpoint's main thread, if the current method is called by the main thread */
 	final AtomicReference<Thread> currentMainThread = new AtomicReference<>(null); 
@@ -89,7 +88,7 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
 		this.selfGatewayType = ReflectionUtil.getTemplateType1(getClass());
 		this.self = rpcService.startServer(this);
 		
-		this.mainThreadExecutionContext = new MainThreadExecutionContext((MainThreadExecutor) self);
+		this.mainThreadExecutor = new MainThreadExecutor((MainThreadExecutable) self);
 	}
 
 	/**
@@ -120,7 +119,7 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
 	 * Shuts down the underlying RPC endpoint via the RPC service.
 	 * After this method was called, the RPC endpoint will no longer be reachable, neither remotely,
 	 * not via its {@link #getSelf() self gateway}. It will also not accepts executions in main thread
-	 * any more (via {@link #callAsync(Callable, Timeout)} and {@link #runAsync(Runnable)}).
+	 * any more (via {@link #callAsync(Callable, Time)} and {@link #runAsync(Runnable)}).
 	 * 
 	 * <p>This method can be overridden to add RPC endpoint specific shut down code.
 	 * The overridden method should always call the parent shut down method.
@@ -161,8 +160,8 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
 	 *
 	 * @return Main thread execution context
 	 */
-	protected ExecutionContext getMainThreadExecutionContext() {
-		return mainThreadExecutionContext;
+	protected Executor getMainThreadExecutor() {
+		return mainThreadExecutor;
 	}
 
 	/**
@@ -185,7 +184,7 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
 	 * @param runnable Runnable to be executed in the main thread of the underlying RPC endpoint
 	 */
 	protected void runAsync(Runnable runnable) {
-		((MainThreadExecutor) self).runAsync(runnable);
+		((MainThreadExecutable) self).runAsync(runnable);
 	}
 
 	/**
@@ -196,7 +195,7 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
 	 * @param delay    The delay after which the runnable will be executed
 	 */
 	protected void scheduleRunAsync(Runnable runnable, long delay, TimeUnit unit) {
-		((MainThreadExecutor) self).scheduleRunAsync(runnable, unit.toMillis(delay));
+		((MainThreadExecutable) self).scheduleRunAsync(runnable, unit.toMillis(delay));
 	}
 
 	/**
@@ -209,8 +208,8 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
 	 * @param <V> Return type of the callable
 	 * @return Future for the result of the callable.
 	 */
-	protected <V> Future<V> callAsync(Callable<V> callable, Timeout timeout) {
-		return ((MainThreadExecutor) self).callAsync(callable, timeout);
+	protected <V> Future<V> callAsync(Callable<V> callable, Time timeout) {
+		return ((MainThreadExecutable) self).callAsync(callable, timeout);
 	}
 
 	// ------------------------------------------------------------------------
@@ -241,36 +240,19 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
 	// ------------------------------------------------------------------------
 	
 	/**
-	 * Execution context which executes runnables in the main thread context. A reported failure
-	 * will cause the underlying rpc server to shut down.
+	 * Executor which executes runnables in the main thread context.
 	 */
-	private class MainThreadExecutionContext implements ExecutionContext {
+	private class MainThreadExecutor implements Executor {
 
-		private final MainThreadExecutor gateway;
+		private final MainThreadExecutable gateway;
 
-		MainThreadExecutionContext(MainThreadExecutor gateway) {
-			this.gateway = gateway;
+		MainThreadExecutor(MainThreadExecutable gateway) {
+			this.gateway = Preconditions.checkNotNull(gateway);
 		}
 
 		@Override
 		public void execute(Runnable runnable) {
 			gateway.runAsync(runnable);
 		}
-
-		@Override
-		public void reportFailure(final Throwable t) {
-			gateway.runAsync(new Runnable() {
-				@Override
-				public void run() {
-					log.error("Encountered failure in the main thread execution context.", t);
-					shutDown();
-				}
-			});
-		}
-
-		@Override
-		public ExecutionContext prepare() {
-			return this;
-		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/31a091b9/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
index 78c1cec..a367ff2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
@@ -18,10 +18,10 @@
 
 package org.apache.flink.runtime.rpc;
 
+import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
-import scala.concurrent.ExecutionContext;
-import scala.concurrent.Future;
 
+import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -68,23 +68,22 @@ public interface RpcService {
 	void stopService();
 
 	/**
-	 * Gets the execution context, provided by this RPC service. This execution
-	 * context can be used for example for the {@code onComplete(...)} or {@code onSuccess(...)}
-	 * methods of Futures.
+	 * Gets the executor, provided by this RPC service. This executor can be used for example for
+	 * the {@code handleAsync(...)} or {@code thenAcceptAsync(...)} methods of futures.
 	 * 
-	 * <p><b>IMPORTANT:</b> This execution context does not isolate the method invocations against
+	 * <p><b>IMPORTANT:</b> This executor does not isolate the method invocations against
 	 * any concurrent invocations and is therefore not suitable to run completion methods of futures
 	 * that modify state of an {@link RpcEndpoint}. For such operations, one needs to use the
-	 * {@link RpcEndpoint#getMainThreadExecutionContext() MainThreadExecutionContext} of that
+	 * {@link RpcEndpoint#getMainThreadExecutor() MainThreadExecutionContext} of that
 	 * {@code RpcEndpoint}.
 	 * 
 	 * @return The execution context provided by the RPC service
 	 */
-	ExecutionContext getExecutionContext();
+	Executor getExecutor();
 
 	/**
 	 * Execute the runnable in the execution context of this RPC Service, as returned by
-	 * {@link #getExecutionContext()}, after a scheduled delay.
+	 * {@link #getExecutor()}, after a scheduled delay.
 	 *
 	 * @param runnable Runnable to be executed
 	 * @param delay    The delay after which the runnable will be executed

http://git-wip-us.apache.org/repos/asf/flink/blob/31a091b9/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
index bfa04f6..8f4deff 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
@@ -20,9 +20,11 @@ package org.apache.flink.runtime.rpc.akka;
 
 import akka.actor.ActorRef;
 import akka.pattern.Patterns;
-import akka.util.Timeout;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.rpc.MainThreadExecutor;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
+import org.apache.flink.runtime.rpc.MainThreadExecutable;
 import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcTimeout;
 import org.apache.flink.runtime.rpc.StartStoppable;
@@ -34,9 +36,6 @@ import org.apache.flink.runtime.rpc.akka.messages.RpcInvocation;
 import org.apache.flink.runtime.rpc.akka.messages.RunAsync;
 import org.apache.flink.util.Preconditions;
 import org.apache.log4j.Logger;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
 
 import java.io.IOException;
 import java.lang.annotation.Annotation;
@@ -53,7 +52,7 @@ import static org.apache.flink.util.Preconditions.checkArgument;
  * rpc in a {@link LocalRpcInvocation} message and then sends it to the {@link AkkaRpcActor} where it is
  * executed.
  */
-class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, MainThreadExecutor, StartStoppable {
+class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, MainThreadExecutable, StartStoppable {
 	private static final Logger LOG = Logger.getLogger(AkkaInvocationHandler.class);
 
 	private final String address;
@@ -64,11 +63,11 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, MainThrea
 	private final boolean isLocal;
 
 	// default timeout for asks
-	private final Timeout timeout;
+	private final Time timeout;
 
 	private final long maximumFramesize;
 
-	AkkaInvocationHandler(String address, ActorRef rpcEndpoint, Timeout timeout, long maximumFramesize) {
+	AkkaInvocationHandler(String address, ActorRef rpcEndpoint, Time timeout, long maximumFramesize) {
 		this.address = Preconditions.checkNotNull(address);
 		this.rpcEndpoint = Preconditions.checkNotNull(rpcEndpoint);
 		this.isLocal = this.rpcEndpoint.path().address().hasLocalScope();
@@ -82,7 +81,7 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, MainThrea
 
 		Object result;
 
-		if (declaringClass.equals(AkkaGateway.class) || declaringClass.equals(MainThreadExecutor.class) ||
+		if (declaringClass.equals(AkkaGateway.class) || declaringClass.equals(MainThreadExecutable.class) ||
 			declaringClass.equals(Object.class) || declaringClass.equals(StartStoppable.class) ||
 			declaringClass.equals(RpcGateway.class)) {
 			result = method.invoke(this, args);
@@ -90,7 +89,7 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, MainThrea
 			String methodName = method.getName();
 			Class<?>[] parameterTypes = method.getParameterTypes();
 			Annotation[][] parameterAnnotations = method.getParameterAnnotations();
-			Timeout futureTimeout = extractRpcTimeout(parameterAnnotations, args, timeout);
+			Time futureTimeout = extractRpcTimeout(parameterAnnotations, args, timeout);
 
 			Tuple2<Class<?>[], Object[]> filteredArguments = filterArguments(
 				parameterTypes,
@@ -130,13 +129,14 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, MainThrea
 				result = null;
 			} else if (returnType.equals(Future.class)) {
 				// execute an asynchronous call
-				result = Patterns.ask(rpcEndpoint, rpcInvocation, futureTimeout);
+				result = new FlinkFuture<>(Patterns.ask(rpcEndpoint, rpcInvocation, futureTimeout.toMilliseconds()));
 			} else {
 				// execute a synchronous call
-				Future<?> futureResult = Patterns.ask(rpcEndpoint, rpcInvocation, futureTimeout);
-				FiniteDuration duration = timeout.duration();
+				scala.concurrent.Future<?> scalaFuture = Patterns.ask(rpcEndpoint, rpcInvocation, futureTimeout.toMilliseconds());
 
-				result = Await.result(futureResult, duration);
+				Future<?> futureResult = new FlinkFuture<>(scalaFuture);
+
+				return futureResult.get(futureTimeout.getSize(), futureTimeout.getUnit());
 			}
 		}
 
@@ -167,12 +167,12 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, MainThrea
 	}
 
 	@Override
-	public <V> Future<V> callAsync(Callable<V> callable, Timeout callTimeout) {
+	public <V> Future<V> callAsync(Callable<V> callable, Time callTimeout) {
 		if(isLocal) {
 			@SuppressWarnings("unchecked")
-			Future<V> result = (Future<V>) Patterns.ask(rpcEndpoint, new CallAsync(callable), callTimeout);
+			scala.concurrent.Future<V> result = (scala.concurrent.Future<V>) Patterns.ask(rpcEndpoint, new CallAsync(callable), callTimeout.toMilliseconds());
 
-			return result;
+			return new FlinkFuture<>(result);
 		} else {
 			throw new RuntimeException("Trying to send a Callable to a remote actor at " +
 				rpcEndpoint.path() + ". This is not supported.");
@@ -204,17 +204,17 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, MainThrea
 	 *                       has been found
 	 * @return Timeout extracted from the array of arguments or the default timeout
 	 */
-	private static Timeout extractRpcTimeout(Annotation[][] parameterAnnotations, Object[] args, Timeout defaultTimeout) {
+	private static Time extractRpcTimeout(Annotation[][] parameterAnnotations, Object[] args, Time defaultTimeout) {
 		if (args != null) {
 			Preconditions.checkArgument(parameterAnnotations.length == args.length);
 
 			for (int i = 0; i < parameterAnnotations.length; i++) {
 				if (isRpcTimeout(parameterAnnotations[i])) {
-					if (args[i] instanceof FiniteDuration) {
-						return new Timeout((FiniteDuration) args[i]);
+					if (args[i] instanceof Time) {
+						return (Time) args[i];
 					} else {
 						throw new RuntimeException("The rpc timeout parameter must be of type " +
-							FiniteDuration.class.getName() + ". The type " + args[i].getClass().getName() +
+							Time.class.getName() + ". The type " + args[i].getClass().getName() +
 							" is not supported.");
 					}
 				}

http://git-wip-us.apache.org/repos/asf/flink/blob/31a091b9/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
index 2373be9..59daa46 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
@@ -21,8 +21,11 @@ package org.apache.flink.runtime.rpc.akka;
 import akka.actor.ActorRef;
 import akka.actor.Status;
 import akka.actor.UntypedActorWithStash;
+import akka.dispatch.Futures;
 import akka.japi.Procedure;
 import akka.pattern.Patterns;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
 import org.apache.flink.runtime.rpc.MainThreadValidatorUtil;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcGateway;
@@ -35,7 +38,6 @@ import org.apache.flink.runtime.rpc.akka.messages.RunAsync;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.io.IOException;
@@ -146,8 +148,23 @@ class AkkaRpcActor<C extends RpcGateway, T extends RpcEndpoint<C>> extends Untyp
 					Object result = rpcMethod.invoke(rpcEndpoint, rpcInvocation.getArgs());
 
 					if (result instanceof Future) {
+						final Future<?> future = (Future<?>) result;
+
 						// pipe result to sender
-						Patterns.pipe((Future<?>) result, getContext().dispatcher()).to(getSender());
+						if (future instanceof FlinkFuture) {
+							// FlinkFutures are currently backed by Scala's futures
+							FlinkFuture<?> flinkFuture = (FlinkFuture<?>) future;
+
+							Patterns.pipe(flinkFuture.getScalaFuture(), getContext().dispatcher()).to(getSender());
+						} else {
+							// We have to unpack the Flink future and pack it into a Scala future
+							Patterns.pipe(Futures.future(new Callable<Object>() {
+								@Override
+								public Object call() throws Exception {
+									return future.get();
+								}
+							}, getContext().dispatcher()), getContext().dispatcher());
+						}
 					} else {
 						// tell the sender the result of the computation
 						getSender().tell(new Status.Success(result), getSelf());

http://git-wip-us.apache.org/repos/asf/flink/blob/31a091b9/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
index 060a1ef..36f1115 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
@@ -26,11 +26,13 @@ import akka.actor.Identify;
 import akka.actor.PoisonPill;
 import akka.actor.Props;
 import akka.dispatch.Mapper;
-import akka.pattern.AskableActorSelection;
-import akka.util.Timeout;
 
+import akka.pattern.Patterns;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.rpc.MainThreadExecutor;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
+import org.apache.flink.runtime.rpc.MainThreadExecutable;
 import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcService;
@@ -39,8 +41,6 @@ import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import scala.concurrent.ExecutionContext;
-import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 
 import javax.annotation.concurrent.ThreadSafe;
@@ -48,6 +48,7 @@ import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.Proxy;
 import java.util.HashSet;
 import java.util.Set;
+import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -68,13 +69,13 @@ public class AkkaRpcService implements RpcService {
 	private final Object lock = new Object();
 
 	private final ActorSystem actorSystem;
-	private final Timeout timeout;
+	private final Time timeout;
 	private final Set<ActorRef> actors = new HashSet<>(4);
 	private final long maximumFramesize;
 
 	private volatile boolean stopped;
 
-	public AkkaRpcService(final ActorSystem actorSystem, final Timeout timeout) {
+	public AkkaRpcService(final ActorSystem actorSystem, final Time timeout) {
 		this.actorSystem = checkNotNull(actorSystem, "actor system");
 		this.timeout = checkNotNull(timeout, "timeout");
 
@@ -95,10 +96,9 @@ public class AkkaRpcService implements RpcService {
 				address, clazz.getName());
 
 		final ActorSelection actorSel = actorSystem.actorSelection(address);
-		final AskableActorSelection asker = new AskableActorSelection(actorSel);
 
-		final Future<Object> identify = asker.ask(new Identify(42), timeout);
-		return identify.map(new Mapper<Object, C>(){
+		final scala.concurrent.Future<Object> identify = Patterns.ask(actorSel, new Identify(42), timeout.toMilliseconds());
+		final scala.concurrent.Future<C> resultFuture = identify.map(new Mapper<Object, C>(){
 			@Override
 			public C checkedApply(Object obj) throws Exception {
 
@@ -128,6 +128,8 @@ public class AkkaRpcService implements RpcService {
 				}
 			}
 		}, actorSystem.dispatcher());
+
+		return new FlinkFuture<>(resultFuture);
 	}
 
 	@Override
@@ -159,7 +161,7 @@ public class AkkaRpcService implements RpcService {
 			classLoader,
 			new Class<?>[]{
 				rpcEndpoint.getSelfGatewayType(),
-				MainThreadExecutor.class,
+				MainThreadExecutable.class,
 				StartStoppable.class,
 				AkkaGateway.class},
 			akkaInvocationHandler);
@@ -209,7 +211,7 @@ public class AkkaRpcService implements RpcService {
 	}
 
 	@Override
-	public ExecutionContext getExecutionContext() {
+	public Executor getExecutor() {
 		return actorSystem.dispatcher();
 	}
 
@@ -219,6 +221,6 @@ public class AkkaRpcService implements RpcService {
 		checkNotNull(unit, "unit");
 		checkArgument(delay >= 0, "delay must be zero or larger");
 
-		actorSystem.scheduler().scheduleOnce(new FiniteDuration(delay, unit), runnable, getExecutionContext());
+		actorSystem.scheduler().scheduleOnce(new FiniteDuration(delay, unit), runnable, actorSystem.dispatcher());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/31a091b9/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index fadae5f..d84a6a9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -19,8 +19,8 @@
 package org.apache.flink.runtime.taskexecutor;
 
 import akka.actor.ActorSystem;
-import akka.util.Timeout;
 import com.typesafe.config.Config;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.io.network.ConnectionManager;
 import org.apache.flink.runtime.io.network.LocalConnectionManager;
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.resourcemanager.SlotRequestRegistered;
 import org.apache.flink.runtime.resourcemanager.SlotRequestReply;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.Preconditions;
+import org.jboss.netty.channel.ChannelException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -78,6 +79,7 @@ import scala.concurrent.duration.FiniteDuration;
 
 import java.io.File;
 import java.io.IOException;
+import java.net.BindException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.util.UUID;
@@ -198,7 +200,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 					this,
 					newLeaderAddress,
 					newLeaderId,
-					getMainThreadExecutionContext());
+					getMainThreadExecutor());
 			resourceManagerConnection.start();
 		}
 	}
@@ -302,9 +304,9 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 			LOG.debug("Using akka configuration\n " + akkaConfig);
 			taskManagerSystem = AkkaUtils.createActorSystem(akkaConfig);
 		} catch (Throwable t) {
-			if (t instanceof org.jboss.netty.channel.ChannelException) {
+			if (t instanceof ChannelException) {
 				Throwable cause = t.getCause();
-				if (cause != null && t.getCause() instanceof java.net.BindException) {
+				if (cause != null && t.getCause() instanceof BindException) {
 					String address = NetUtils.hostAndPortToUrlString(taskManagerHostname, actorSystemPort);
 					throw new IOException("Unable to bind TaskManager actor system to address " +
 						address + " - " + cause.getMessage(), t);
@@ -314,7 +316,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 		}
 
 		// start akka rpc service based on actor system
-		final Timeout timeout = new Timeout(AkkaUtils.getTimeout(configuration).toMillis(), TimeUnit.MILLISECONDS);
+		final Time timeout = Time.milliseconds(AkkaUtils.getTimeout(configuration).toMillis());
 		final AkkaRpcService akkaRpcService = new AkkaRpcService(taskManagerSystem, timeout);
 
 		// start high availability service to implement getResourceManagerLeaderRetriever method only

http://git-wip-us.apache.org/repos/asf/flink/blob/31a091b9/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
index 65323a8..0962802 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
@@ -18,12 +18,12 @@
 
 package org.apache.flink.runtime.taskexecutor;
 
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.resourcemanager.SlotRequestReply;
 import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcTimeout;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
 
 import java.util.UUID;
 
@@ -48,5 +48,5 @@ public interface TaskExecutorGateway extends RpcGateway {
 	Future<SlotRequestReply> requestSlot(
 		AllocationID allocationID,
 		UUID resourceManagerLeaderID,
-		@RpcTimeout FiniteDuration timeout);
+		@RpcTimeout Time timeout);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/31a091b9/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
index 28062b6..647359d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
@@ -18,11 +18,12 @@
 
 package org.apache.flink.runtime.taskexecutor;
 
-import akka.dispatch.OnFailure;
-import akka.dispatch.OnSuccess;
-
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.AcceptFunction;
+import org.apache.flink.runtime.concurrent.ApplyFunction;
+import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.registration.RegistrationResponse;
@@ -31,12 +32,8 @@ import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 
 import org.slf4j.Logger;
 
-import scala.concurrent.ExecutionContext;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
 import java.util.UUID;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.Executor;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
@@ -57,7 +54,7 @@ public class TaskExecutorToResourceManagerConnection {
 	private final String resourceManagerAddress;
 
 	/** Execution context to be used to execute the on complete action of the ResourceManagerRegistration */
-	private final ExecutionContext executionContext;
+	private final Executor executor;
 
 	private TaskExecutorToResourceManagerConnection.ResourceManagerRegistration pendingRegistration;
 
@@ -74,13 +71,13 @@ public class TaskExecutorToResourceManagerConnection {
 		TaskExecutor taskExecutor,
 		String resourceManagerAddress,
 		UUID resourceManagerLeaderId,
-		ExecutionContext executionContext) {
+		Executor executor) {
 
 		this.log = checkNotNull(log);
 		this.taskExecutor = checkNotNull(taskExecutor);
 		this.resourceManagerAddress = checkNotNull(resourceManagerAddress);
 		this.resourceManagerLeaderId = checkNotNull(resourceManagerLeaderId);
-		this.executionContext = checkNotNull(executionContext);
+		this.executor = checkNotNull(executor);
 	}
 
 	// ------------------------------------------------------------------------
@@ -100,21 +97,22 @@ public class TaskExecutorToResourceManagerConnection {
 
 		Future<Tuple2<ResourceManagerGateway, TaskExecutorRegistrationSuccess>> future = pendingRegistration.getFuture();
 
-		future.onSuccess(new OnSuccess<Tuple2<ResourceManagerGateway, TaskExecutorRegistrationSuccess>>() {
+		future.thenAcceptAsync(new AcceptFunction<Tuple2<ResourceManagerGateway, TaskExecutorRegistrationSuccess>>() {
 			@Override
-			public void onSuccess(Tuple2<ResourceManagerGateway, TaskExecutorRegistrationSuccess> result) {
+			public void accept(Tuple2<ResourceManagerGateway, TaskExecutorRegistrationSuccess> result) {
 				registrationId = result.f1.getRegistrationId();
 				registeredResourceManager = result.f0;
 			}
-		}, executionContext);
+		}, executor);
 		
 		// this future should only ever fail if there is a bug, not if the registration is declined
-		future.onFailure(new OnFailure() {
+		future.exceptionallyAsync(new ApplyFunction<Throwable, Void>() {
 			@Override
-			public void onFailure(Throwable failure) {
+			public Void apply(Throwable failure) {
 				taskExecutor.onFatalErrorAsync(failure);
+				return null;
 			}
-		}, executionContext);
+		}, executor);
 	}
 
 	public void close() {
@@ -197,7 +195,7 @@ public class TaskExecutorToResourceManagerConnection {
 		protected Future<RegistrationResponse> invokeRegistration(
 				ResourceManagerGateway resourceManager, UUID leaderId, long timeoutMillis) throws Exception {
 
-			FiniteDuration timeout = new FiniteDuration(timeoutMillis, TimeUnit.MILLISECONDS);
+			Time timeout = Time.milliseconds(timeoutMillis);
 			return resourceManager.registerTaskExecutor(leaderId, taskExecutorAddress, resourceID, timeout);
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/31a091b9/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java
index 80fa19c..e56a9ec 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java
@@ -18,9 +18,9 @@
 
 package org.apache.flink.runtime.registration;
 
-import akka.dispatch.Futures;
-
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.TestingRpcService;
 import org.apache.flink.util.TestLogger;
@@ -29,18 +29,13 @@ import org.junit.Test;
 
 import org.slf4j.LoggerFactory;
 
-import scala.concurrent.Await;
-import scala.concurrent.ExecutionContext$;
-import scala.concurrent.Future;
-import scala.concurrent.Promise;
-import scala.concurrent.duration.FiniteDuration;
-
 import java.util.UUID;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
-import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.junit.Assert.*;
 import static org.mockito.Mockito.*;
 
@@ -71,8 +66,8 @@ public class RetryingRegistrationTest extends TestLogger {
 			// multiple accesses return the same future
 			assertEquals(future, registration.getFuture());
 
-			Tuple2<TestRegistrationGateway, TestRegistrationSuccess> success = 
-					Await.result(future, new FiniteDuration(10, SECONDS));
+			Tuple2<TestRegistrationGateway, TestRegistrationSuccess> success =
+					future.get(10L, TimeUnit.SECONDS);
 
 			// validate correct invocation and result
 			assertEquals(testId, success.f1.getCorrelationId());
@@ -83,7 +78,7 @@ public class RetryingRegistrationTest extends TestLogger {
 			rpc.stopService();
 		}
 	}
-	
+
 	@Test
 	public void testPropagateFailures() throws Exception {
 		final String testExceptionMessage = "testExceptionMessage";
@@ -96,9 +91,15 @@ public class RetryingRegistrationTest extends TestLogger {
 		registration.startRegistration();
 
 		Future<?> future = registration.getFuture();
-		assertTrue(future.failed().isCompleted());
+		assertTrue(future.isDone());
 
-		assertEquals(testExceptionMessage, future.failed().value().get().get().getMessage());
+		try {
+			future.get();
+
+			fail("We expected an ExecutionException.");
+		} catch (ExecutionException e) {
+			assertEquals(testExceptionMessage, e.getCause().getMessage());
+		}
 	}
 
 	@Test
@@ -113,16 +114,16 @@ public class RetryingRegistrationTest extends TestLogger {
 			// RPC service that fails upon the first connection, but succeeds on the second
 			RpcService rpc = mock(RpcService.class);
 			when(rpc.connect(anyString(), any(Class.class))).thenReturn(
-					Futures.failed(new Exception("test connect failure")),  // first connection attempt fails
-					Futures.successful(testGateway)                         // second connection attempt succeeds
+					FlinkCompletableFuture.completedExceptionally(new Exception("test connect failure")),  // first connection attempt fails
+					FlinkCompletableFuture.completed(testGateway)                         // second connection attempt succeeds
 			);
-			when(rpc.getExecutionContext()).thenReturn(ExecutionContext$.MODULE$.fromExecutor(executor));
+			when(rpc.getExecutor()).thenReturn(executor);
 
 			TestRetryingRegistration registration = new TestRetryingRegistration(rpc, "foobar address", leaderId);
 			registration.startRegistration();
 
 			Tuple2<TestRegistrationGateway, TestRegistrationSuccess> success =
-					Await.result(registration.getFuture(), new FiniteDuration(10, SECONDS));
+				registration.getFuture().get(10L, TimeUnit.SECONDS);
 
 			// validate correct invocation and result
 			assertEquals(testId, success.f1.getCorrelationId());
@@ -151,23 +152,23 @@ public class RetryingRegistrationTest extends TestLogger {
 
 		try {
 			rpc.registerGateway(testEndpointAddress, testGateway);
-	
+
 			TestRetryingRegistration registration = new TestRetryingRegistration(rpc, testEndpointAddress, leaderId);
-	
+
 			long started = System.nanoTime();
 			registration.startRegistration();
-	
+
 			Future<Tuple2<TestRegistrationGateway, TestRegistrationSuccess>> future = registration.getFuture();
 			Tuple2<TestRegistrationGateway, TestRegistrationSuccess> success =
-					Await.result(future, new FiniteDuration(10, SECONDS));
-	
+					future.get(10L, TimeUnit.SECONDS);
+
 			long finished = System.nanoTime();
 			long elapsedMillis = (finished - started) / 1000000;
-	
+
 			// validate correct invocation and result
 			assertEquals(testId, success.f1.getCorrelationId());
 			assertEquals(leaderId, testGateway.getInvocations().take().leaderId());
-	
+
 			// validate that some retry-delay / back-off behavior happened
 			assertTrue("retries did not properly back off", elapsedMillis >= 3 * TestRetryingRegistration.INITIAL_TIMEOUT);
 		}
@@ -199,10 +200,10 @@ public class RetryingRegistrationTest extends TestLogger {
 
 			long started = System.nanoTime();
 			registration.startRegistration();
-	
+
 			Future<Tuple2<TestRegistrationGateway, TestRegistrationSuccess>> future = registration.getFuture();
 			Tuple2<TestRegistrationGateway, TestRegistrationSuccess> success =
-					Await.result(future, new FiniteDuration(10, SECONDS));
+					future.get(10L, TimeUnit.SECONDS);
 
 			long finished = System.nanoTime();
 			long elapsedMillis = (finished - started) / 1000000;
@@ -212,7 +213,7 @@ public class RetryingRegistrationTest extends TestLogger {
 			assertEquals(leaderId, testGateway.getInvocations().take().leaderId());
 
 			// validate that some retry-delay / back-off behavior happened
-			assertTrue("retries did not properly back off", elapsedMillis >= 
+			assertTrue("retries did not properly back off", elapsedMillis >=
 					2 * TestRetryingRegistration.INITIAL_TIMEOUT + TestRetryingRegistration.DELAY_ON_DECLINE);
 		}
 		finally {
@@ -220,7 +221,7 @@ public class RetryingRegistrationTest extends TestLogger {
 			rpc.stopService();
 		}
 	}
-	
+
 	@Test
 	@SuppressWarnings("unchecked")
 	public void testRetryOnError() throws Exception {
@@ -235,9 +236,9 @@ public class RetryingRegistrationTest extends TestLogger {
 			TestRegistrationGateway testGateway = mock(TestRegistrationGateway.class);
 
 			when(testGateway.registrationCall(any(UUID.class), anyLong())).thenReturn(
-					Futures.<RegistrationResponse>failed(new Exception("test exception")),
-					Futures.<RegistrationResponse>successful(new TestRegistrationSuccess(testId)));
-			
+					FlinkCompletableFuture.<RegistrationResponse>completedExceptionally(new Exception("test exception")),
+					FlinkCompletableFuture.<RegistrationResponse>completed(new TestRegistrationSuccess(testId)));
+
 			rpc.registerGateway(testEndpointAddress, testGateway);
 
 			TestRetryingRegistration registration = new TestRetryingRegistration(rpc, testEndpointAddress, leaderId);
@@ -247,11 +248,11 @@ public class RetryingRegistrationTest extends TestLogger {
 
 			Future<Tuple2<TestRegistrationGateway, TestRegistrationSuccess>> future = registration.getFuture();
 			Tuple2<TestRegistrationGateway, TestRegistrationSuccess> success =
-					Await.result(future, new FiniteDuration(10, SECONDS));
+					future.get(10, TimeUnit.SECONDS);
 
 			long finished = System.nanoTime();
 			long elapsedMillis = (finished - started) / 1000000;
-			
+
 			assertEquals(testId, success.f1.getCorrelationId());
 
 			// validate that some retry-delay / back-off behavior happened
@@ -271,10 +272,10 @@ public class RetryingRegistrationTest extends TestLogger {
 		TestingRpcService rpc = new TestingRpcService();
 
 		try {
-			Promise<RegistrationResponse> result = Futures.promise();
+			FlinkCompletableFuture<RegistrationResponse> result = new FlinkCompletableFuture<>();
 
 			TestRegistrationGateway testGateway = mock(TestRegistrationGateway.class);
-			when(testGateway.registrationCall(any(UUID.class), anyLong())).thenReturn(result.future());
+			when(testGateway.registrationCall(any(UUID.class), anyLong())).thenReturn(result);
 
 			rpc.registerGateway(testEndpointAddress, testGateway);
 
@@ -283,7 +284,7 @@ public class RetryingRegistrationTest extends TestLogger {
 
 			// cancel and fail the current registration attempt
 			registration.cancel();
-			result.failure(new TimeoutException());
+			result.completeExceptionally(new TimeoutException());
 
 			// there should not be a second registration attempt
 			verify(testGateway, atMost(1)).registrationCall(any(UUID.class), anyLong());

http://git-wip-us.apache.org/repos/asf/flink/blob/31a091b9/flink-runtime/src/test/java/org/apache/flink/runtime/registration/TestRegistrationGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/TestRegistrationGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/TestRegistrationGateway.java
index 431fbe8..2843aeb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/TestRegistrationGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/TestRegistrationGateway.java
@@ -20,11 +20,11 @@ package org.apache.flink.runtime.registration;
 
 import akka.dispatch.Futures;
 
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.rpc.TestingGatewayBase;
 import org.apache.flink.util.Preconditions;
 
-import scala.concurrent.Future;
-
 import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -56,7 +56,7 @@ public class TestRegistrationGateway extends TestingGatewayBase {
 		}
 
 		// return a completed future (for a proper value), or one that never completes and will time out (for null)
-		return response != null ? Futures.successful(response) : this.<RegistrationResponse>futureWithTimeout(timeout);
+		return response != null ? FlinkCompletableFuture.completed(response) : this.<RegistrationResponse>futureWithTimeout(timeout);
 	}
 
 	public BlockingQueue<RegistrationCall> getInvocations() {

http://git-wip-us.apache.org/repos/asf/flink/blob/31a091b9/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
index 8183c0a..64a1191 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.resourcemanager;
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
-import org.apache.flink.runtime.rpc.MainThreadExecutor;
+import org.apache.flink.runtime.rpc.MainThreadExecutable;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcService;
@@ -68,7 +68,7 @@ public class ResourceManagerHATest {
 		Assert.assertNull(resourceManager.getLeaderSessionID());
 	}
 
-	private static abstract class TestingResourceManagerGatewayProxy implements MainThreadExecutor, StartStoppable, RpcGateway {
+	private static abstract class TestingResourceManagerGatewayProxy implements MainThreadExecutable, StartStoppable, RpcGateway {
 		@Override
 		public void runAsync(Runnable runnable) {
 			runnable.run();

http://git-wip-us.apache.org/repos/asf/flink/blob/31a091b9/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
index 85d2880..1f9e7e8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
@@ -18,10 +18,12 @@
 package org.apache.flink.runtime.resourcemanager.slotmanager;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.highavailability.NonHaServices;
 import org.apache.flink.runtime.jobmaster.JobMasterGateway;
 import org.apache.flink.runtime.resourcemanager.JobMasterRegistration;
@@ -40,10 +42,6 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.mockito.Mockito;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.Duration;
-import scala.concurrent.duration.FiniteDuration;
 
 import java.util.Collections;
 import java.util.UUID;
@@ -99,7 +97,7 @@ public class SlotProtocolTest extends TestLogger {
 		Future<RegistrationResponse> registrationFuture =
 			resourceManager.registerJobMaster(new JobMasterRegistration(jmAddress, jobID));
 		try {
-			Await.ready(registrationFuture, Duration.create(5, TimeUnit.SECONDS));
+			registrationFuture.get(5, TimeUnit.SECONDS);
 		} catch (Exception e) {
 			Assert.fail("JobManager registration Future didn't become ready.");
 		}
@@ -141,7 +139,7 @@ public class SlotProtocolTest extends TestLogger {
 		slotManager.updateSlotStatus(slotReport);
 
 		// 4) Slot becomes available and TaskExecutor gets a SlotRequest
-		verify(taskExecutorGateway, timeout(5000)).requestSlot(eq(allocationID), any(UUID.class), any(FiniteDuration.class));
+		verify(taskExecutorGateway, timeout(5000)).requestSlot(eq(allocationID), any(UUID.class), any(Time.class));
 	}
 
 	/**
@@ -171,7 +169,7 @@ public class SlotProtocolTest extends TestLogger {
 		Future<RegistrationResponse> registrationFuture =
 			resourceManager.registerJobMaster(new JobMasterRegistration(jmAddress, jobID));
 		try {
-			Await.ready(registrationFuture, Duration.create(5, TimeUnit.SECONDS));
+			registrationFuture.get(5, TimeUnit.SECONDS);
 		} catch (Exception e) {
 			Assert.fail("JobManager registration Future didn't become ready.");
 		}
@@ -207,7 +205,7 @@ public class SlotProtocolTest extends TestLogger {
 
 
 		// 4) a SlotRequest is routed to the TaskExecutor
-		verify(taskExecutorGateway, timeout(5000)).requestSlot(eq(allocationID), any(UUID.class), any(FiniteDuration.class));
+		verify(taskExecutorGateway, timeout(5000)).requestSlot(eq(allocationID), any(UUID.class), any(Time.class));
 	}
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/31a091b9/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
index 1791056..7c6b0ee 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
@@ -21,18 +21,16 @@ package org.apache.flink.runtime.rpc;
 import akka.actor.ActorSystem;
 import akka.util.Timeout;
 
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.akka.AkkaUtils;
 
+import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
 import org.apache.flink.util.TestLogger;
 import org.junit.AfterClass;
 import org.junit.Test;
 
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
 import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -49,7 +47,7 @@ public class AsyncCallsTest extends TestLogger {
 	private static ActorSystem actorSystem = AkkaUtils.createDefaultActorSystem();
 
 	private static AkkaRpcService akkaRpcService =
-			new AkkaRpcService(actorSystem, new Timeout(10000, TimeUnit.MILLISECONDS));
+			new AkkaRpcService(actorSystem, Time.milliseconds(10000L));
 
 	@AfterClass
 	public static void shutdown() {
@@ -104,8 +102,9 @@ public class AsyncCallsTest extends TestLogger {
 				}
 				return "test";
 			}
-		}, new Timeout(30, TimeUnit.SECONDS));
-		String str = Await.result(result, new FiniteDuration(30, TimeUnit.SECONDS));
+		}, Time.seconds(30L));
+
+		String str = result.get(30, TimeUnit.SECONDS);
 		assertEquals("test", str);
 
 		// validate that no concurrent access happened

http://git-wip-us.apache.org/repos/asf/flink/blob/31a091b9/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
index b431eb9..ee3f784 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
@@ -18,14 +18,14 @@
 
 package org.apache.flink.runtime.rpc;
 
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.util.ReflectionUtil;
 import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 import org.reflections.Reflections;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
 
 import java.lang.annotation.Annotation;
 import java.lang.reflect.Method;
@@ -43,6 +43,7 @@ import static org.junit.Assert.fail;
 public class RpcCompletenessTest extends TestLogger {
 
 	private static final Class<?> futureClass = Future.class;
+	private static final Class<?> timeoutClass = Time.class;
 
 	@Test
 	@SuppressWarnings({"rawtypes", "unchecked"})
@@ -147,8 +148,8 @@ public class RpcCompletenessTest extends TestLogger {
 		for (int i = 0; i < parameterAnnotations.length; i++) {
 			if (RpcCompletenessTest.isRpcTimeout(parameterAnnotations[i])) {
 				assertTrue(
-					"The rpc timeout has to be of type " + FiniteDuration.class.getName() + ".",
-					parameterTypes[i].equals(FiniteDuration.class));
+					"The rpc timeout has to be of type " + timeoutClass.getName() + ".",
+					parameterTypes[i].equals(timeoutClass));
 
 				rpcTimeoutParameters++;
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/31a091b9/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingGatewayBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingGatewayBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingGatewayBase.java
index 8133a87..caf5e81 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingGatewayBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingGatewayBase.java
@@ -18,9 +18,9 @@
 
 package org.apache.flink.runtime.rpc;
 
-import akka.dispatch.Futures;
-import scala.concurrent.Future;
-import scala.concurrent.Promise;
+import org.apache.flink.runtime.concurrent.CompletableFuture;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -73,25 +73,25 @@ public abstract class TestingGatewayBase implements RpcGateway {
 	// ------------------------------------------------------------------------
 
 	public <T> Future<T> futureWithTimeout(long timeoutMillis) {
-		Promise<T> promise = Futures.<T>promise();
-		executor.schedule(new FutureTimeout(promise), timeoutMillis, TimeUnit.MILLISECONDS);
-		return promise.future();
+		FlinkCompletableFuture<T> future = new FlinkCompletableFuture<>();
+		executor.schedule(new FutureTimeout(future), timeoutMillis, TimeUnit.MILLISECONDS);
+		return future;
 	}
 
 	// ------------------------------------------------------------------------
 	
 	private static final class FutureTimeout implements Runnable {
 
-		private final Promise<?> promise;
+		private final CompletableFuture<?> promise;
 
-		private FutureTimeout(Promise<?> promise) {
+		private FutureTimeout(CompletableFuture<?> promise) {
 			this.promise = promise;
 		}
 
 		@Override
 		public void run() {
 			try {
-				promise.failure(new TimeoutException());
+				promise.completeExceptionally(new TimeoutException());
 			} catch (Throwable t) {
 				System.err.println("CAUGHT AN ERROR IN THE TEST: " + t.getMessage());
 				t.printStackTrace();

http://git-wip-us.apache.org/repos/asf/flink/blob/31a091b9/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
index 2212680..f164056 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
@@ -18,18 +18,14 @@
 
 package org.apache.flink.runtime.rpc;
 
-import akka.dispatch.Futures;
-import akka.util.Timeout;
-
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
 
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -69,7 +65,7 @@ public class TestingRpcService extends AkkaRpcService {
 	 * Creates a new {@code TestingRpcService}, using the given configuration. 
 	 */
 	public TestingRpcService(Configuration configuration) {
-		super(AkkaUtils.createLocalActorSystem(configuration), new Timeout(new FiniteDuration(10, TimeUnit.SECONDS)));
+		super(AkkaUtils.createLocalActorSystem(configuration), Time.seconds(10));
 
 		this.registeredConnections = new ConcurrentHashMap<>();
 	}
@@ -103,13 +99,13 @@ public class TestingRpcService extends AkkaRpcService {
 			if (clazz.isAssignableFrom(gateway.getClass())) {
 				@SuppressWarnings("unchecked")
 				C typedGateway = (C) gateway;
-				return Futures.successful(typedGateway);
+				return FlinkCompletableFuture.completed(typedGateway);
 			} else {
-				return Futures.failed(
-						new Exception("Gateway registered under " + address + " is not of type " + clazz));
+				return FlinkCompletableFuture.completedExceptionally(
+					new Exception("Gateway registered under " + address + " is not of type " + clazz));
 			}
 		} else {
-			return Futures.failed(new Exception("No gateway registered under that name"));
+			return FlinkCompletableFuture.completedExceptionally(new Exception("No gateway registered under that name"));
 		}
 	}
 


Mime
View raw message