flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [2/3] flink git commit: [FLINK-7387] [rpc] Require RpcEndpoints to directly implement RpcGateways
Date Fri, 11 Aug 2017 11:50:24 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/d95d20eb/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index effa498..4abcdf4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.taskexecutor;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
 import org.apache.flink.runtime.blob.BlobCache;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
@@ -27,6 +28,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
 import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
@@ -59,7 +61,6 @@ import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
-import org.apache.flink.runtime.rpc.RpcMethod;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
 import org.apache.flink.runtime.taskexecutor.exceptions.CheckpointException;
@@ -105,7 +106,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * TaskExecutor implementation. The task executor is responsible for the execution of multiple
  * {@link Task}.
  */
-public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
+public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 
 	public static final String TASK_MANAGER_NAME = "taskmanager";
 
@@ -288,48 +289,51 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 	// Task lifecycle RPCs
 	// ----------------------------------------------------------------------
 
-	@RpcMethod
-	public Acknowledge submitTask(TaskDeploymentDescriptor tdd, UUID jobManagerLeaderId) throws TaskSubmissionException {
+	@Override
+	public CompletableFuture<Acknowledge> submitTask(
+			TaskDeploymentDescriptor tdd,
+			UUID jobManagerLeaderId,
+			Time timeout) {
 
-		// first, deserialize the pre-serialized information
-		final JobInformation jobInformation;
-		final TaskInformation taskInformation;
 		try {
-			jobInformation = tdd.getSerializedJobInformation().deserializeValue(getClass().getClassLoader());
-			taskInformation = tdd.getSerializedTaskInformation().deserializeValue(getClass().getClassLoader());
-		}
-		catch (IOException | ClassNotFoundException e) {
-			throw new TaskSubmissionException("Could not deserialize the job or task information.", e);
-		}
+			// first, deserialize the pre-serialized information
+			final JobInformation jobInformation;
+			final TaskInformation taskInformation;
+			try {
+				jobInformation = tdd.getSerializedJobInformation().deserializeValue(getClass().getClassLoader());
+				taskInformation = tdd.getSerializedTaskInformation().deserializeValue(getClass().getClassLoader());
+			} catch (IOException | ClassNotFoundException e) {
+				throw new TaskSubmissionException("Could not deserialize the job or task information.", e);
+			}
 
-		final JobID jobId = jobInformation.getJobId();
-		final JobManagerConnection jobManagerConnection = jobManagerTable.get(jobId);
+			final JobID jobId = jobInformation.getJobId();
+			final JobManagerConnection jobManagerConnection = jobManagerTable.get(jobId);
 
-		if (jobManagerConnection == null) {
-			final String message = "Could not submit task because there is no JobManager " +
-				"associated for the job " + jobId + '.';
+			if (jobManagerConnection == null) {
+				final String message = "Could not submit task because there is no JobManager " +
+					"associated for the job " + jobId + '.';
 
-			log.debug(message);
-			throw new TaskSubmissionException(message);
-		}
+				log.debug(message);
+				throw new TaskSubmissionException(message);
+			}
 
-		if (!Objects.equals(jobManagerConnection.getLeaderId(), jobManagerLeaderId)) {
-			final String message = "Rejecting the task submission because the job manager leader id " +
-				jobManagerLeaderId + " does not match the expected job manager leader id " +
-				jobManagerConnection.getLeaderId() + '.';
+			if (!Objects.equals(jobManagerConnection.getLeaderId(), jobManagerLeaderId)) {
+				final String message = "Rejecting the task submission because the job manager leader id " +
+					jobManagerLeaderId + " does not match the expected job manager leader id " +
+					jobManagerConnection.getLeaderId() + '.';
 
-			log.debug(message);
-			throw new TaskSubmissionException(message);
-		}
+				log.debug(message);
+				throw new TaskSubmissionException(message);
+			}
 
-		if (!taskSlotTable.existsActiveSlot(jobId, tdd.getAllocationId())) {
-			final String message = "No task slot allocated for job ID " + jobId +
-				" and allocation ID " + tdd.getAllocationId() + '.';
-			log.debug(message);
-			throw new TaskSubmissionException(message);
-		}
+			if (!taskSlotTable.existsActiveSlot(jobId, tdd.getAllocationId())) {
+				final String message = "No task slot allocated for job ID " + jobId +
+					" and allocation ID " + tdd.getAllocationId() + '.';
+				log.debug(message);
+				throw new TaskSubmissionException(message);
+			}
 
-		TaskMetricGroup taskMetricGroup = taskManagerMetricGroup.addTaskForJob(
+			TaskMetricGroup taskMetricGroup = taskManagerMetricGroup.addTaskForJob(
 				jobInformation.getJobId(),
 				jobInformation.getJobName(),
 				taskInformation.getJobVertexId(),
@@ -338,7 +342,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 				tdd.getSubtaskIndex(),
 				tdd.getAttemptNumber());
 
-		InputSplitProvider inputSplitProvider = new RpcInputSplitProvider(
+			InputSplitProvider inputSplitProvider = new RpcInputSplitProvider(
 				jobManagerConnection.getLeaderId(),
 				jobManagerConnection.getJobManagerGateway(),
 				jobInformation.getJobId(),
@@ -346,96 +350,100 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 				tdd.getExecutionAttemptId(),
 				taskManagerConfiguration.getTimeout());
 
-		TaskManagerActions taskManagerActions = jobManagerConnection.getTaskManagerActions();
-		CheckpointResponder checkpointResponder = jobManagerConnection.getCheckpointResponder();
-		LibraryCacheManager libraryCache = jobManagerConnection.getLibraryCacheManager();
-		ResultPartitionConsumableNotifier resultPartitionConsumableNotifier = jobManagerConnection.getResultPartitionConsumableNotifier();
-		PartitionProducerStateChecker partitionStateChecker = jobManagerConnection.getPartitionStateChecker();
-
-		Task task = new Task(
-			jobInformation,
-			taskInformation,
-			tdd.getExecutionAttemptId(),
-			tdd.getAllocationId(),
-			tdd.getSubtaskIndex(),
-			tdd.getAttemptNumber(),
-			tdd.getProducedPartitions(),
-			tdd.getInputGates(),
-			tdd.getTargetSlotNumber(),
-			tdd.getTaskStateHandles(),
-			memoryManager,
-			ioManager,
-			networkEnvironment,
-			broadcastVariableManager,
-			taskManagerActions,
-			inputSplitProvider,
-			checkpointResponder,
-			libraryCache,
-			fileCache,
-			taskManagerConfiguration,
-			taskMetricGroup,
-			resultPartitionConsumableNotifier,
-			partitionStateChecker,
-			getRpcService().getExecutor());
+			TaskManagerActions taskManagerActions = jobManagerConnection.getTaskManagerActions();
+			CheckpointResponder checkpointResponder = jobManagerConnection.getCheckpointResponder();
+			LibraryCacheManager libraryCache = jobManagerConnection.getLibraryCacheManager();
+			ResultPartitionConsumableNotifier resultPartitionConsumableNotifier = jobManagerConnection.getResultPartitionConsumableNotifier();
+			PartitionProducerStateChecker partitionStateChecker = jobManagerConnection.getPartitionStateChecker();
 
-		log.info("Received task {}.", task.getTaskInfo().getTaskNameWithSubtasks());
-
-		boolean taskAdded;
+			Task task = new Task(
+				jobInformation,
+				taskInformation,
+				tdd.getExecutionAttemptId(),
+				tdd.getAllocationId(),
+				tdd.getSubtaskIndex(),
+				tdd.getAttemptNumber(),
+				tdd.getProducedPartitions(),
+				tdd.getInputGates(),
+				tdd.getTargetSlotNumber(),
+				tdd.getTaskStateHandles(),
+				memoryManager,
+				ioManager,
+				networkEnvironment,
+				broadcastVariableManager,
+				taskManagerActions,
+				inputSplitProvider,
+				checkpointResponder,
+				libraryCache,
+				fileCache,
+				taskManagerConfiguration,
+				taskMetricGroup,
+				resultPartitionConsumableNotifier,
+				partitionStateChecker,
+				getRpcService().getExecutor());
+
+			log.info("Received task {}.", task.getTaskInfo().getTaskNameWithSubtasks());
+
+			boolean taskAdded;
 
-		try {
-			taskAdded = taskSlotTable.addTask(task);
-		} catch (SlotNotFoundException | SlotNotActiveException e) {
-			throw new TaskSubmissionException("Could not submit task.", e);
-		}
+			try {
+				taskAdded = taskSlotTable.addTask(task);
+			} catch (SlotNotFoundException | SlotNotActiveException e) {
+				throw new TaskSubmissionException("Could not submit task.", e);
+			}
 
-		if (taskAdded) {
-			task.startTaskThread();
+			if (taskAdded) {
+				task.startTaskThread();
 
-			return Acknowledge.get();
-		} else {
-			final String message = "TaskManager already contains a task for id " +
-				task.getExecutionId() + '.';
+				return CompletableFuture.completedFuture(Acknowledge.get());
+			} else {
+				final String message = "TaskManager already contains a task for id " +
+					task.getExecutionId() + '.';
 
-			log.debug(message);
-			throw new TaskSubmissionException(message);
+				log.debug(message);
+				throw new TaskSubmissionException(message);
+			}
+		} catch (TaskSubmissionException e) {
+			return FutureUtils.completedExceptionally(e);
 		}
 	}
 
-	@RpcMethod
-	public Acknowledge cancelTask(ExecutionAttemptID executionAttemptID) throws TaskException {
+	@Override
+	public CompletableFuture<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptID, Time timeout) {
 		final Task task = taskSlotTable.getTask(executionAttemptID);
 
 		if (task != null) {
 			try {
 				task.cancelExecution();
-				return Acknowledge.get();
+				return CompletableFuture.completedFuture(Acknowledge.get());
 			} catch (Throwable t) {
-				throw new TaskException("Cannot cancel task for execution " + executionAttemptID + '.', t);
+				return FutureUtils.completedExceptionally(
+					new TaskException("Cannot cancel task for execution " + executionAttemptID + '.', t));
 			}
 		} else {
 			final String message = "Cannot find task to stop for execution " + executionAttemptID + '.';
 
 			log.debug(message);
-			throw new TaskException(message);
+			return FutureUtils.completedExceptionally(new TaskException(message));
 		}
 	}
 
-	@RpcMethod
-	public Acknowledge stopTask(ExecutionAttemptID executionAttemptID) throws TaskException {
+	@Override
+	public CompletableFuture<Acknowledge> stopTask(ExecutionAttemptID executionAttemptID, Time timeout) {
 		final Task task = taskSlotTable.getTask(executionAttemptID);
 
 		if (task != null) {
 			try {
 				task.stopExecution();
-				return Acknowledge.get();
+				return CompletableFuture.completedFuture(Acknowledge.get());
 			} catch (Throwable t) {
-				throw new TaskException("Cannot stop task for execution " + executionAttemptID + '.', t);
+				return FutureUtils.completedExceptionally(new TaskException("Cannot stop task for execution " + executionAttemptID + '.', t));
 			}
 		} else {
 			final String message = "Cannot find task to stop for execution " + executionAttemptID + '.';
 
 			log.debug(message);
-			throw new TaskException(message);
+			return FutureUtils.completedExceptionally(new TaskException(message));
 		}
 	}
 
@@ -443,8 +451,11 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 	// Partition lifecycle RPCs
 	// ----------------------------------------------------------------------
 
-	@RpcMethod
-	public Acknowledge updatePartitions(final ExecutionAttemptID executionAttemptID, Iterable<PartitionInfo> partitionInfos) throws PartitionException {
+	@Override
+	public CompletableFuture<Acknowledge> updatePartitions(
+			final ExecutionAttemptID executionAttemptID,
+			Iterable<PartitionInfo> partitionInfos,
+			Time timeout) {
 		final Task task = taskSlotTable.getTask(executionAttemptID);
 
 		if (task != null) {
@@ -455,9 +466,8 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 
 				if (singleInputGate != null) {
 					// Run asynchronously because it might be blocking
-					getRpcService().execute(new Runnable() {
-						@Override
-						public void run() {
+					getRpcService().execute(
+						() -> {
 							try {
 								singleInputGate.updateInputChannel(partitionInfo.getInputChannelDeploymentDescriptor());
 							} catch (IOException | InterruptedException e) {
@@ -470,23 +480,22 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 									log.error("Failed canceling task with execution ID {} after task update failure.", executionAttemptID, re);
 								}
 							}
-						}
-					});
+						});
 				} else {
-					throw new PartitionException("No reader with ID " +
-						intermediateResultPartitionID + " for task " + executionAttemptID +
-						" was found.");
+					return FutureUtils.completedExceptionally(
+						new PartitionException("No reader with ID " + intermediateResultPartitionID +
+							" for task " + executionAttemptID + " was found."));
 				}
 			}
 
-			return Acknowledge.get();
+			return CompletableFuture.completedFuture(Acknowledge.get());
 		} else {
 			log.debug("Discard update for input partitions of task {}. Task is no longer running.", executionAttemptID);
-			return Acknowledge.get();
+			return CompletableFuture.completedFuture(Acknowledge.get());
 		}
 	}
 
-	@RpcMethod
+	@Override
 	public void failPartition(ExecutionAttemptID executionAttemptID) {
 		log.info("Discarding the results produced by task execution {}.", executionAttemptID);
 
@@ -504,12 +513,12 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 	// Heartbeat RPC
 	// ----------------------------------------------------------------------
 
-	@RpcMethod
+	@Override
 	public void heartbeatFromJobManager(ResourceID resourceID) {
 		jobManagerHeartbeatManager.requestHeartbeat(resourceID, null);
 	}
 
-	@RpcMethod
+	@Override
 	public void heartbeatFromResourceManager(ResourceID resourceID) {
 		resourceManagerHeartbeatManager.requestHeartbeat(resourceID, null);
 	}
@@ -518,8 +527,12 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 	// Checkpointing RPCs
 	// ----------------------------------------------------------------------
 
-	@RpcMethod
-	public Acknowledge triggerCheckpoint(ExecutionAttemptID executionAttemptID, long checkpointId, long checkpointTimestamp, CheckpointOptions checkpointOptions) throws CheckpointException {
+	@Override
+	public CompletableFuture<Acknowledge> triggerCheckpoint(
+			ExecutionAttemptID executionAttemptID,
+			long checkpointId,
+			long checkpointTimestamp,
+			CheckpointOptions checkpointOptions) {
 		log.debug("Trigger checkpoint {}@{} for {}.", checkpointId, checkpointTimestamp, executionAttemptID);
 
 		final Task task = taskSlotTable.getTask(executionAttemptID);
@@ -527,17 +540,20 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 		if (task != null) {
 			task.triggerCheckpointBarrier(checkpointId, checkpointTimestamp, checkpointOptions);
 
-			return Acknowledge.get();
+			return CompletableFuture.completedFuture(Acknowledge.get());
 		} else {
 			final String message = "TaskManager received a checkpoint request for unknown task " + executionAttemptID + '.';
 
 			log.debug(message);
-			throw new CheckpointException(message);
+			return FutureUtils.completedExceptionally(new CheckpointException(message));
 		}
 	}
 
-	@RpcMethod
-	public Acknowledge confirmCheckpoint(ExecutionAttemptID executionAttemptID, long checkpointId, long checkpointTimestamp) throws CheckpointException {
+	@Override
+	public CompletableFuture<Acknowledge> confirmCheckpoint(
+			ExecutionAttemptID executionAttemptID,
+			long checkpointId,
+			long checkpointTimestamp) {
 		log.debug("Confirm checkpoint {}@{} for {}.", checkpointId, checkpointTimestamp, executionAttemptID);
 
 		final Task task = taskSlotTable.getTask(executionAttemptID);
@@ -545,12 +561,12 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 		if (task != null) {
 			task.notifyCheckpointComplete(checkpointId);
 
-			return Acknowledge.get();
+			return CompletableFuture.completedFuture(Acknowledge.get());
 		} else {
 			final String message = "TaskManager received a checkpoint confirmation for unknown task " + executionAttemptID + '.';
 
 			log.debug(message);
-			throw new CheckpointException(message);
+			return FutureUtils.completedExceptionally(new CheckpointException(message));
 		}
 	}
 
@@ -569,85 +585,90 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 	 * @throws SlotAllocationException if the slot allocation fails
 	 * @return answer to the slot request
 	 */
-	@RpcMethod
-	public Acknowledge requestSlot(
+	@Override
+	public CompletableFuture<Acknowledge> requestSlot(
 		final SlotID slotId,
 		final JobID jobId,
 		final AllocationID allocationId,
 		final String targetAddress,
-		final UUID rmLeaderId) throws SlotAllocationException {
+		final UUID rmLeaderId,
+		final Time timeout) {
 		// TODO: Filter invalid requests from the resource manager by using the instance/registration Id
 
 		log.info("Receive slot request {} for job {} from resource manager with leader id {}.",
 			allocationId, jobId, rmLeaderId);
 
-		if (resourceManagerConnection == null) {
-			final String message = "TaskManager is not connected to a resource manager.";
-			log.debug(message);
-			throw new SlotAllocationException(message);
-		}
-
-		if (!resourceManagerConnection.getTargetLeaderId().equals(rmLeaderId)) {
-			final String message = "The leader id " + rmLeaderId +
-				" does not match with the leader id of the connected resource manager " +
-				resourceManagerConnection.getTargetLeaderId() + '.';
+		try {
+			if (resourceManagerConnection == null) {
+				final String message = "TaskManager is not connected to a resource manager.";
+				log.debug(message);
+				throw new SlotAllocationException(message);
+			}
 
-			log.debug(message);
-			throw new SlotAllocationException(message);
-		}
+			if (!resourceManagerConnection.getTargetLeaderId().equals(rmLeaderId)) {
+				final String message = "The leader id " + rmLeaderId +
+					" does not match with the leader id of the connected resource manager " +
+					resourceManagerConnection.getTargetLeaderId() + '.';
 
-		if (taskSlotTable.isSlotFree(slotId.getSlotNumber())) {
-			if (taskSlotTable.allocateSlot(slotId.getSlotNumber(), jobId, allocationId, taskManagerConfiguration.getTimeout())) {
-				log.info("Allocated slot for {}.", allocationId);
-			} else {
-				log.info("Could not allocate slot for {}.", allocationId);
-				throw new SlotAllocationException("Could not allocate slot.");
+				log.debug(message);
+				throw new SlotAllocationException(message);
 			}
-		} else if (!taskSlotTable.isAllocated(slotId.getSlotNumber(), jobId, allocationId)) {
-			final String message = "The slot " + slotId + " has already been allocated for a different job.";
 
-			log.info(message);
+			if (taskSlotTable.isSlotFree(slotId.getSlotNumber())) {
+				if (taskSlotTable.allocateSlot(slotId.getSlotNumber(), jobId, allocationId, taskManagerConfiguration.getTimeout())) {
+					log.info("Allocated slot for {}.", allocationId);
+				} else {
+					log.info("Could not allocate slot for {}.", allocationId);
+					throw new SlotAllocationException("Could not allocate slot.");
+				}
+			} else if (!taskSlotTable.isAllocated(slotId.getSlotNumber(), jobId, allocationId)) {
+				final String message = "The slot " + slotId + " has already been allocated for a different job.";
 
-			throw new SlotOccupiedException(message, taskSlotTable.getCurrentAllocation(slotId.getSlotNumber()));
-		}
+				log.info(message);
 
-		if (jobManagerTable.contains(jobId)) {
-			offerSlotsToJobManager(jobId);
-		} else {
-			try {
-				jobLeaderService.addJob(jobId, targetAddress);
-			} catch (Exception e) {
-				// free the allocated slot
+				throw new SlotOccupiedException(message, taskSlotTable.getCurrentAllocation(slotId.getSlotNumber()));
+			}
+
+			if (jobManagerTable.contains(jobId)) {
+				offerSlotsToJobManager(jobId);
+			} else {
 				try {
-					taskSlotTable.freeSlot(allocationId);
-				} catch (SlotNotFoundException slotNotFoundException) {
-					// slot no longer existent, this should actually never happen, because we've
-					// just allocated the slot. So let's fail hard in this case!
-					onFatalError(slotNotFoundException);
-				}
+					jobLeaderService.addJob(jobId, targetAddress);
+				} catch (Exception e) {
+					// free the allocated slot
+					try {
+						taskSlotTable.freeSlot(allocationId);
+					} catch (SlotNotFoundException slotNotFoundException) {
+						// slot no longer existent, this should actually never happen, because we've
+						// just allocated the slot. So let's fail hard in this case!
+						onFatalError(slotNotFoundException);
+					}
 
-				// sanity check
-				if (!taskSlotTable.isSlotFree(slotId.getSlotNumber())) {
-					onFatalError(new Exception("Could not free slot " + slotId));
-				}
+					// sanity check
+					if (!taskSlotTable.isSlotFree(slotId.getSlotNumber())) {
+						onFatalError(new Exception("Could not free slot " + slotId));
+					}
 
-				throw new SlotAllocationException("Could not add job to job leader service.", e);
+					throw new SlotAllocationException("Could not add job to job leader service.", e);
+				}
 			}
+		} catch (SlotAllocationException slotAllocationException) {
+			return FutureUtils.completedExceptionally(slotAllocationException);
 		}
 
-		return Acknowledge.get();
+		return CompletableFuture.completedFuture(Acknowledge.get());
 	}
 
 	// ----------------------------------------------------------------------
 	// Disconnection RPCs
 	// ----------------------------------------------------------------------
 
-	@RpcMethod
+	@Override
 	public void disconnectJobManager(JobID jobId, Exception cause) {
 		closeJobManagerConnection(jobId, cause);
 	}
 
-	@RpcMethod
+	@Override
 	public void disconnectResourceManager(Exception cause) {
 		closeResourceManagerConnection(cause);
 	}
@@ -767,7 +788,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 					reservedSlots.add(offer);
 				}
 
-				CompletableFuture<Iterable<SlotOffer>> acceptedSlotsFuture = jobMasterGateway.offerSlots(
+				CompletableFuture<Collection<SlotOffer>> acceptedSlotsFuture = jobMasterGateway.offerSlots(
 					getResourceID(),
 					reservedSlots,
 					leaderId,

http://git-wip-us.apache.org/repos/asf/flink/blob/d95d20eb/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
index e1144c9..3ca0327 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
@@ -528,7 +528,8 @@ public class ResourceManagerTest extends TestLogger {
 				rmLeaderSessionId,
 				taskManagerAddress,
 				taskManagerResourceID,
-				slotReport);
+				slotReport,
+				Time.milliseconds(0L));
 			RegistrationResponse response = successfulFuture.get(5, TimeUnit.SECONDS);
 			assertTrue(response instanceof TaskExecutorRegistrationSuccess);
 
@@ -627,7 +628,8 @@ public class ResourceManagerTest extends TestLogger {
 				jmLeaderId,
 				jmResourceId,
 				jobMasterAddress,
-				jobId);
+				jobId,
+				Time.milliseconds(0L));
 			RegistrationResponse response = successfulFuture.get(5, TimeUnit.SECONDS);
 			assertTrue(response instanceof JobMasterRegistrationSuccess);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d95d20eb/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index 267f10b..3814684 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -85,7 +85,7 @@ public class DispatcherTest extends TestLogger {
 
 			dispatcher.start();
 
-			DispatcherGateway dispatcherGateway = dispatcher.getSelf();
+			DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);
 
 			CompletableFuture<Acknowledge> acknowledgeFuture = dispatcherGateway.submitJob(jobGraph, timeout);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d95d20eb/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
index 4cc4f11..8d613ac 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
@@ -82,7 +82,7 @@ public class SlotPoolRpcTest {
 		);
 		pool.start(UUID.randomUUID(), "foobar");
 
-		CompletableFuture<SimpleSlot> future = pool.allocateSlot(mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null);
+		CompletableFuture<SimpleSlot> future = pool.allocateSlot(mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, Time.days(1));
 
 		try {
 			future.get(4, TimeUnit.SECONDS);

http://git-wip-us.apache.org/repos/asf/flink/blob/d95d20eb/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
index 3e2293b..aeceb59 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
@@ -101,7 +101,7 @@ public class SlotPoolTest extends TestLogger {
 		slotPool.registerTaskManager(resourceID);
 
 		ScheduledUnit task = mock(ScheduledUnit.class);
-		CompletableFuture<SimpleSlot> future = slotPool.allocateSlot(task, DEFAULT_TESTING_PROFILE, null);
+		CompletableFuture<SimpleSlot> future = slotPool.allocateSlot(task, DEFAULT_TESTING_PROFILE, null, Time.milliseconds(0L));
 		assertFalse(future.isDone());
 
 		ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class);
@@ -110,7 +110,7 @@ public class SlotPoolTest extends TestLogger {
 		final SlotRequest slotRequest = slotRequestArgumentCaptor.getValue();
 
 		AllocatedSlot allocatedSlot = createAllocatedSlot(resourceID, slotRequest.getAllocationId(), jobId, DEFAULT_TESTING_PROFILE);
-		assertTrue(slotPool.offerSlot(allocatedSlot));
+		assertTrue(slotPool.offerSlot(allocatedSlot).get());
 
 		SimpleSlot slot = future.get(1, TimeUnit.SECONDS);
 		assertTrue(future.isDone());
@@ -126,8 +126,8 @@ public class SlotPoolTest extends TestLogger {
 		ResourceID resourceID = new ResourceID("resource");
 		slotPool.registerTaskManager(resourceID);
 
-		CompletableFuture<SimpleSlot> future1 = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null);
-		CompletableFuture<SimpleSlot> future2 = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null);
+		CompletableFuture<SimpleSlot> future1 = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null, Time.milliseconds(0L));
+		CompletableFuture<SimpleSlot> future2 = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null, Time.milliseconds(0L));
 
 		assertFalse(future1.isDone());
 		assertFalse(future2.isDone());
@@ -139,7 +139,7 @@ public class SlotPoolTest extends TestLogger {
 		final List<SlotRequest> slotRequests = slotRequestArgumentCaptor.getAllValues();
 
 		AllocatedSlot allocatedSlot = createAllocatedSlot(resourceID, slotRequests.get(0).getAllocationId(), jobId, DEFAULT_TESTING_PROFILE);
-		assertTrue(slotPool.offerSlot(allocatedSlot));
+		assertTrue(slotPool.offerSlot(allocatedSlot).get());
 
 		SimpleSlot slot1 = future1.get(1, TimeUnit.SECONDS);
 		assertTrue(future1.isDone());
@@ -165,7 +165,7 @@ public class SlotPoolTest extends TestLogger {
 		ResourceID resourceID = new ResourceID("resource");
 		slotPool.registerTaskManager(resourceID);
 
-		CompletableFuture<SimpleSlot> future1 = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null);
+		CompletableFuture<SimpleSlot> future1 = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null, Time.milliseconds(0L));
 		assertFalse(future1.isDone());
 
 		ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class);
@@ -174,7 +174,7 @@ public class SlotPoolTest extends TestLogger {
 		final SlotRequest slotRequest = slotRequestArgumentCaptor.getValue();
 
 		AllocatedSlot allocatedSlot = createAllocatedSlot(resourceID, slotRequest.getAllocationId(), jobId, DEFAULT_TESTING_PROFILE);
-		assertTrue(slotPool.offerSlot(allocatedSlot));
+		assertTrue(slotPool.offerSlot(allocatedSlot).get());
 
 		SimpleSlot slot1 = future1.get(1, TimeUnit.SECONDS);
 		assertTrue(future1.isDone());
@@ -182,7 +182,7 @@ public class SlotPoolTest extends TestLogger {
 		// return this slot to pool
 		slot1.releaseSlot();
 
-		CompletableFuture<SimpleSlot> future2 = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null);
+		CompletableFuture<SimpleSlot> future2 = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null, Time.milliseconds(0L));
 
 		// second allocation fulfilled by previous slot returning
 		SimpleSlot slot2 = future2.get(1, TimeUnit.SECONDS);
@@ -200,7 +200,7 @@ public class SlotPoolTest extends TestLogger {
 		ResourceID resourceID = new ResourceID("resource");
 		slotPool.registerTaskManager(resourceID);
 
-		CompletableFuture<SimpleSlot> future = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null);
+		CompletableFuture<SimpleSlot> future = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null, Time.milliseconds(0L));
 		assertFalse(future.isDone());
 
 		ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class);
@@ -210,30 +210,30 @@ public class SlotPoolTest extends TestLogger {
 
 		// slot from unregistered resource
 		AllocatedSlot invalid = createAllocatedSlot(new ResourceID("unregistered"), slotRequest.getAllocationId(), jobId, DEFAULT_TESTING_PROFILE);
-		assertFalse(slotPool.offerSlot(invalid));
+		assertFalse(slotPool.offerSlot(invalid).get());
 
 		AllocatedSlot notRequested = createAllocatedSlot(resourceID, new AllocationID(), jobId, DEFAULT_TESTING_PROFILE);
 
 		// we'll also accept non requested slots
-		assertTrue(slotPool.offerSlot(notRequested));
+		assertTrue(slotPool.offerSlot(notRequested).get());
 
 		AllocatedSlot allocatedSlot = createAllocatedSlot(resourceID, slotRequest.getAllocationId(), jobId, DEFAULT_TESTING_PROFILE);
 
 		// accepted slot
-		assertTrue(slotPool.offerSlot(allocatedSlot));
+		assertTrue(slotPool.offerSlot(allocatedSlot).get());
 		SimpleSlot slot = future.get(1, TimeUnit.SECONDS);
 		assertTrue(future.isDone());
 		assertTrue(slot.isAlive());
 
 		// duplicated offer with using slot
-		assertTrue(slotPool.offerSlot(allocatedSlot));
+		assertTrue(slotPool.offerSlot(allocatedSlot).get());
 		assertTrue(future.isDone());
 		assertTrue(slot.isAlive());
 
 		// duplicated offer with free slot
 		slot.releaseSlot();
 		assertTrue(slot.isReleased());
-		assertTrue(slotPool.offerSlot(allocatedSlot));
+		assertTrue(slotPool.offerSlot(allocatedSlot).get());
 	}
 
 	@Test
@@ -241,17 +241,17 @@ public class SlotPoolTest extends TestLogger {
 		ResourceID resourceID = new ResourceID("resource");
 		slotPool.registerTaskManager(resourceID);
 
-		CompletableFuture<SimpleSlot> future1 = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null);
+		CompletableFuture<SimpleSlot> future1 = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null, Time.milliseconds(0L));
 
 		ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class);
 		verify(resourceManagerGateway).requestSlot(any(UUID.class), any(UUID.class), slotRequestArgumentCaptor.capture(), any(Time.class));
 
 		final SlotRequest slotRequest = slotRequestArgumentCaptor.getValue();
 
-		CompletableFuture<SimpleSlot> future2 = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null);
+		CompletableFuture<SimpleSlot> future2 = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null, Time.milliseconds(0L));
 
 		AllocatedSlot allocatedSlot = createAllocatedSlot(resourceID, slotRequest.getAllocationId(), jobId, DEFAULT_TESTING_PROFILE);
-		assertTrue(slotPool.offerSlot(allocatedSlot));
+		assertTrue(slotPool.offerSlot(allocatedSlot).get());
 
 		SimpleSlot slot1 = future1.get(1, TimeUnit.SECONDS);
 		assertTrue(future1.isDone());

http://git-wip-us.apache.org/repos/asf/flink/blob/d95d20eb/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
index 7c58879..435c23d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
@@ -52,6 +52,7 @@ import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -83,7 +84,7 @@ public class JobManagerRunnerMockTest extends TestLogger {
 
 		jobManager = mock(JobMaster.class);
 		jobManagerGateway = mock(JobMasterGateway.class);
-		when(jobManager.getSelf()).thenReturn(jobManagerGateway);
+		when(jobManager.getSelfGateway(eq(JobMasterGateway.class))).thenReturn(jobManagerGateway);
 		when(jobManager.getRpcService()).thenReturn(mockRpc);
 
 		PowerMockito.whenNew(JobMaster.class).withAnyArguments().thenReturn(jobManager);

http://git-wip-us.apache.org/repos/asf/flink/blob/d95d20eb/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index 48a1d45..0c4d376 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -112,7 +112,7 @@ public class JobMasterTest extends TestLogger {
 			jobMaster.start(jmLeaderId);
 
 			// register task manager will trigger monitor heartbeat target, schedule heartbeat request at interval time
-			jobMaster.registerTaskManager(taskManagerAddress, taskManagerLocation, jmLeaderId);
+			jobMaster.registerTaskManager(taskManagerAddress, taskManagerLocation, jmLeaderId, Time.milliseconds(0L));
 
 			ArgumentCaptor<Runnable> heartbeatRunnableCaptor = ArgumentCaptor.forClass(Runnable.class);
 			verify(scheduledExecutor, times(1)).scheduleAtFixedRate(

http://git-wip-us.apache.org/repos/asf/flink/blob/d95d20eb/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
index 6480d75..10d6a72 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
@@ -52,6 +52,8 @@ public class ResourceManagerJobMasterTest extends TestLogger {
 
 	private TestingSerialRpcService rpcService;
 
+	private final Time timeout = Time.milliseconds(0L);
+
 	@Before
 	public void setup() throws Exception {
 		rpcService = new TestingSerialRpcService();
@@ -83,7 +85,8 @@ public class ResourceManagerJobMasterTest extends TestLogger {
 			jmLeaderID,
 			jmResourceId,
 			jobMasterAddress,
-			jobID);
+			jobID,
+			timeout);
 		RegistrationResponse response = successfulFuture.get(5L, TimeUnit.SECONDS);
 		assertTrue(response instanceof JobMasterRegistrationSuccess);
 
@@ -114,7 +117,8 @@ public class ResourceManagerJobMasterTest extends TestLogger {
 			jmLeaderID,
 			jmResourceId,
 			jobMasterAddress,
-			jobID);
+			jobID,
+			timeout);
 		assertTrue(unMatchedLeaderFuture.get(5, TimeUnit.SECONDS) instanceof RegistrationResponse.Decline);
 
 		if (testingFatalErrorHandler.hasExceptionOccurred()) {
@@ -146,7 +150,8 @@ public class ResourceManagerJobMasterTest extends TestLogger {
 			differentLeaderSessionID,
 			jmResourceId,
 			jobMasterAddress,
-			jobID);
+			jobID,
+			timeout);
 		assertTrue(unMatchedLeaderFuture.get(5, TimeUnit.SECONDS) instanceof RegistrationResponse.Decline);
 
 		if (testingFatalErrorHandler.hasExceptionOccurred()) {
@@ -178,7 +183,8 @@ public class ResourceManagerJobMasterTest extends TestLogger {
 			jmLeaderSessionId,
 			jmResourceId,
 			invalidAddress,
-			jobID);
+			jobID,
+			timeout);
 		assertTrue(invalidAddressFuture.get(5, TimeUnit.SECONDS) instanceof RegistrationResponse.Decline);
 
 		if (testingFatalErrorHandler.hasExceptionOccurred()) {
@@ -210,7 +216,8 @@ public class ResourceManagerJobMasterTest extends TestLogger {
 			jmLeaderSessionId,
 			jmResourceId,
 			jobMasterAddress,
-			unknownJobIDToHAServices);
+			unknownJobIDToHAServices,
+			timeout);
 		RegistrationResponse response = declineFuture.get(5, TimeUnit.SECONDS);
 		assertTrue(response instanceof RegistrationResponse.Decline);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d95d20eb/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
index 4127cea..fc96f0d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
@@ -90,13 +90,13 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
 		try {
 			// test response successful
 			CompletableFuture<RegistrationResponse> successfulFuture =
-				resourceManager.registerTaskExecutor(leaderSessionId, taskExecutorAddress, taskExecutorResourceID, slotReport);
+				resourceManager.registerTaskExecutor(leaderSessionId, taskExecutorAddress, taskExecutorResourceID, slotReport, Time.milliseconds(0L));
 			RegistrationResponse response = successfulFuture.get(5, TimeUnit.SECONDS);
 			assertTrue(response instanceof TaskExecutorRegistrationSuccess);
 
 			// test response successful with instanceID not equal to previous when receive duplicate registration from taskExecutor
 			CompletableFuture<RegistrationResponse> duplicateFuture =
-				resourceManager.registerTaskExecutor(leaderSessionId, taskExecutorAddress, taskExecutorResourceID, slotReport);
+				resourceManager.registerTaskExecutor(leaderSessionId, taskExecutorAddress, taskExecutorResourceID, slotReport, Time.milliseconds(0L));
 			RegistrationResponse duplicateResponse = duplicateFuture.get();
 			assertTrue(duplicateResponse instanceof TaskExecutorRegistrationSuccess);
 			assertNotEquals(((TaskExecutorRegistrationSuccess) response).getRegistrationId(), ((TaskExecutorRegistrationSuccess) duplicateResponse).getRegistrationId());
@@ -116,7 +116,7 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
 			// test throw exception when receive a registration from taskExecutor which takes unmatched leaderSessionId
 			UUID differentLeaderSessionID = UUID.randomUUID();
 			CompletableFuture<RegistrationResponse> unMatchedLeaderFuture =
-				resourceManager.registerTaskExecutor(differentLeaderSessionID, taskExecutorAddress, taskExecutorResourceID, slotReport);
+				resourceManager.registerTaskExecutor(differentLeaderSessionID, taskExecutorAddress, taskExecutorResourceID, slotReport, Time.milliseconds(0L));
 			assertTrue(unMatchedLeaderFuture.get(5, TimeUnit.SECONDS) instanceof RegistrationResponse.Decline);
 		} finally {
 			if (testingFatalErrorHandler.hasExceptionOccurred()) {
@@ -134,7 +134,7 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
 			// test throw exception when receive a registration from taskExecutor which takes invalid address
 			String invalidAddress = "/taskExecutor2";
 			CompletableFuture<RegistrationResponse> invalidAddressFuture =
-				resourceManager.registerTaskExecutor(leaderSessionId, invalidAddress, taskExecutorResourceID, slotReport);
+				resourceManager.registerTaskExecutor(leaderSessionId, invalidAddress, taskExecutorResourceID, slotReport, Time.milliseconds(0L));
 			assertTrue(invalidAddressFuture.get(5, TimeUnit.SECONDS) instanceof RegistrationResponse.Decline);
 		} finally {
 			if (testingFatalErrorHandler.hasExceptionOccurred()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/d95d20eb/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
index 4be5257..00762b9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
@@ -67,7 +67,7 @@ public class AsyncCallsTest extends TestLogger {
 
 		TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService, lock);
 		testEndpoint.start();
-		TestGateway gateway = testEndpoint.getSelf();
+		TestGateway gateway = testEndpoint.getSelfGateway(TestGateway.class);
 
 		// a bunch of gateway calls
 		gateway.someCall();
@@ -108,7 +108,7 @@ public class AsyncCallsTest extends TestLogger {
 		assertFalse("Rpc Endpoint had concurrent access", testEndpoint.hasConcurrentAccess());
 		assertFalse("Rpc Endpoint had concurrent access", concurrentAccess.get());
 
-		akkaRpcService.stopServer(testEndpoint.getSelf());
+		testEndpoint.shutDown();
 	}
 
 	@Test
@@ -174,7 +174,7 @@ public class AsyncCallsTest extends TestLogger {
 	}
 
 	@SuppressWarnings("unused")
-	public static class TestEndpoint extends RpcEndpoint<TestGateway> {
+	public static class TestEndpoint extends RpcEndpoint implements TestGateway {
 
 		private final ReentrantLock lock;
 
@@ -185,7 +185,7 @@ public class AsyncCallsTest extends TestLogger {
 			this.lock = lock;
 		}
 
-		@RpcMethod
+		@Override
 		public void someCall() {
 			boolean holdsLock = lock.tryLock();
 			if (holdsLock) {
@@ -195,7 +195,7 @@ public class AsyncCallsTest extends TestLogger {
 			}
 		}
 
-		@RpcMethod
+		@Override
 		public void anotherCall() {
 			boolean holdsLock = lock.tryLock();
 			if (holdsLock) {

http://git-wip-us.apache.org/repos/asf/flink/blob/d95d20eb/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
deleted file mode 100644
index 07dadae..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
+++ /dev/null
@@ -1,452 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rpc;
-
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.util.ReflectionUtil;
-import org.apache.flink.util.TestLogger;
-import org.junit.Test;
-import org.reflections.Reflections;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.lang.annotation.Annotation;
-import java.lang.reflect.Method;
-import java.lang.reflect.Modifier;
-import java.lang.reflect.Type;
-import java.lang.reflect.TypeVariable;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-/**
- * Test which ensures that all classes of subtype {@link RpcEndpoint} implement
- * the methods specified in the generic gateway type argument.
- *
- * {@code
- * 	    RpcEndpoint<GatewayTypeParameter extends RpcGateway>
- * }
- *
- * Note, that the class hierarchy can also be nested. In this case the type argument
- * always has to be the first argument, e.g. {@code
- *
- * 	    // RpcClass needs to implement RpcGatewayClass' methods
- * 	    RpcClass extends RpcEndpoint<RpcGatewayClass>
- *
- * 	    // RpcClass2 or its subclass needs to implement RpcGatewayClass' methods
- *      RpcClass<GatewayTypeParameter extends RpcGateway,...> extends RpcEndpoint<GatewayTypeParameter>
- *      RpcClass2 extends RpcClass<RpcGatewayClass,...>
- *
- *      // needless to say, this can even be nested further
- *      ...
- * }
- *
- */
-public class RpcCompletenessTest extends TestLogger {
-
-	private static Logger LOG = LoggerFactory.getLogger(RpcCompletenessTest.class);
-
-	private static final Class<?> futureClass = CompletableFuture.class;
-	private static final Class<?> timeoutClass = Time.class;
-
-	@Test
-	@SuppressWarnings({"rawtypes", "unchecked"})
-	public void testRpcCompleteness() {
-		Reflections reflections = new Reflections("org.apache.flink");
-
-		Set<Class<? extends RpcEndpoint>> classes = reflections.getSubTypesOf(RpcEndpoint.class);
-
-		Class<? extends RpcEndpoint> c;
-
-		mainloop:
-		for (Class<? extends RpcEndpoint> rpcEndpoint : classes) {
-			c = rpcEndpoint;
-
-			LOG.debug("-------------");
-			LOG.debug("c: {}", c);
-
-			// skip abstract classes
-			if (Modifier.isAbstract(c.getModifiers())) {
-				LOG.debug("Skipping abstract class");
-				continue;
-			}
-
-			// check for type parameter bound to RpcGateway
-			// skip if one is found because a subclass will provide the concrete argument
-			TypeVariable<? extends Class<? extends RpcEndpoint>>[] typeParameters = c.getTypeParameters();
-			LOG.debug("Checking {} parameters.", typeParameters.length);
-			for (int i = 0; i < typeParameters.length; i++) {
-				for (Type bound : typeParameters[i].getBounds()) {
-					LOG.debug("checking bound {} of type parameter {}", bound, typeParameters[i]);
-					if (bound.toString().equals("interface " + RpcGateway.class.getName())) {
-						if (i > 0) {
-							fail("Type parameter for RpcGateway should come first in " + c);
-						}
-						LOG.debug("Skipping class with type parameter bound to RpcGateway.");
-						// Type parameter is bound to RpcGateway which a subclass will provide
-						continue mainloop;
-					}
-				}
-			}
-
-			// check if this class or any super class contains the RpcGateway argument
-			Class<?> rpcGatewayType;
-			do {
-				LOG.debug("checking type argument of class: {}", c);
-				rpcGatewayType = ReflectionUtil.getTemplateType1(c);
-				LOG.debug("type argument is: {}", rpcGatewayType);
-
-				c = (Class<? extends RpcEndpoint>) c.getSuperclass();
-
-			} while (!RpcGateway.class.isAssignableFrom(rpcGatewayType));
-
-			LOG.debug("Checking RRC completeness of endpoint '{}' with gateway '{}'",
-				rpcEndpoint.getSimpleName(), rpcGatewayType.getSimpleName());
-
-			checkCompleteness(rpcEndpoint, (Class<? extends RpcGateway>) rpcGatewayType);
-		}
-	}
-
-	@SuppressWarnings("rawtypes")
-	private void checkCompleteness(Class<? extends RpcEndpoint> rpcEndpoint, Class<? extends RpcGateway> rpcGateway) {
-		List<Method> rpcMethodsFromGateway = getRpcMethodsFromGateway(rpcGateway);
-		Method[] gatewayMethods = rpcMethodsFromGateway.toArray(new Method[rpcMethodsFromGateway.size()]);
-		Method[] serverMethods = rpcEndpoint.getMethods();
-
-		Map<String, Set<Method>> rpcMethods = new HashMap<>();
-		Set<Method> unmatchedRpcMethods = new HashSet<>();
-
-		for (Method serverMethod : serverMethods) {
-			if (serverMethod.isAnnotationPresent(RpcMethod.class)) {
-				if (rpcMethods.containsKey(serverMethod.getName())) {
-					Set<Method> methods = rpcMethods.get(serverMethod.getName());
-					methods.add(serverMethod);
-
-					rpcMethods.put(serverMethod.getName(), methods);
-				} else {
-					Set<Method> methods = new HashSet<>();
-					methods.add(serverMethod);
-
-					rpcMethods.put(serverMethod.getName(), methods);
-				}
-
-				unmatchedRpcMethods.add(serverMethod);
-			}
-		}
-
-		for (Method gatewayMethod : gatewayMethods) {
-			assertTrue(
-				"The rpc endpoint " + rpcEndpoint.getName() + " does not contain a RpcMethod " +
-					"annotated method with the same name and signature " +
-					generateEndpointMethodSignature(gatewayMethod) + ".",
-				rpcMethods.containsKey(gatewayMethod.getName()));
-
-			checkGatewayMethod(gatewayMethod);
-
-			if (!matchGatewayMethodWithEndpoint(gatewayMethod, rpcMethods.get(gatewayMethod.getName()), unmatchedRpcMethods)) {
-				fail("Could not find a RpcMethod annotated method in rpc endpoint " +
-					rpcEndpoint.getName() + " matching the rpc gateway method " +
-					generateEndpointMethodSignature(gatewayMethod) + " defined in the rpc gateway " +
-					rpcGateway.getName() + ".");
-			}
-		}
-
-		if (!unmatchedRpcMethods.isEmpty()) {
-			StringBuilder builder = new StringBuilder();
-
-			for (Method unmatchedRpcMethod : unmatchedRpcMethods) {
-				builder.append(unmatchedRpcMethod).append("\n");
-			}
-
-			fail("The rpc endpoint " + rpcEndpoint.getName() + " contains rpc methods which " +
-				"are not matched to gateway methods of " + rpcGateway.getName() + ":\n" +
-				builder.toString());
-		}
-	}
-
-	/**
-	 * Checks whether the gateway method fulfills the gateway method requirements.
-	 * <ul>
-	 *     <li>It checks whether the return type is void or a {@link CompletableFuture} wrapping the actual result. </li>
-	 *     <li>It checks that the method's parameter list contains at most one parameter annotated with {@link RpcTimeout}.</li>
-	 * </ul>
-	 *
-	 * @param gatewayMethod Gateway method to check
-	 */
-	private void checkGatewayMethod(Method gatewayMethod) {
-		if (!gatewayMethod.getReturnType().equals(Void.TYPE)) {
-			assertTrue(
-				"The return type of method " + gatewayMethod.getName() + " in the rpc gateway " +
-					gatewayMethod.getDeclaringClass().getName() + " is non void and not a " +
-					"future. Non-void return types have to be returned as a future.",
-				gatewayMethod.getReturnType().equals(futureClass));
-		}
-
-		Annotation[][] parameterAnnotations = gatewayMethod.getParameterAnnotations();
-		Class<?>[] parameterTypes = gatewayMethod.getParameterTypes();
-		int rpcTimeoutParameters = 0;
-
-		for (int i = 0; i < parameterAnnotations.length; i++) {
-			if (RpcCompletenessTest.isRpcTimeout(parameterAnnotations[i])) {
-				assertTrue(
-					"The rpc timeout has to be of type " + timeoutClass.getName() + ".",
-					parameterTypes[i].equals(timeoutClass));
-
-				rpcTimeoutParameters++;
-			}
-		}
-
-		assertTrue("The gateway method " + gatewayMethod + " must have at most one RpcTimeout " +
-			"annotated parameter.", rpcTimeoutParameters <= 1);
-	}
-
-	/**
-	 * Checks whether we find a matching overloaded version for the gateway method among the methods
-	 * with the same name in the rpc endpoint.
-	 *
-	 * @param gatewayMethod Gateway method
-	 * @param endpointMethods Set of rpc methods on the rpc endpoint with the same name as the gateway
-	 *                   method
-	 * @param unmatchedRpcMethods Set of unmatched rpc methods on the endpoint side (so far)
-	 */
-	private boolean matchGatewayMethodWithEndpoint(Method gatewayMethod, Set<Method> endpointMethods, Set<Method> unmatchedRpcMethods) {
-		for (Method endpointMethod : endpointMethods) {
-			if (checkMethod(gatewayMethod, endpointMethod)) {
-				unmatchedRpcMethods.remove(endpointMethod);
-				return true;
-			}
-		}
-
-		return false;
-	}
-
-	private boolean checkMethod(Method gatewayMethod, Method endpointMethod) {
-		Class<?>[] gatewayParameterTypes = gatewayMethod.getParameterTypes();
-		Annotation[][] gatewayParameterAnnotations = gatewayMethod.getParameterAnnotations();
-
-		Class<?>[] endpointParameterTypes = endpointMethod.getParameterTypes();
-
-		List<Class<?>> filteredGatewayParameterTypes = new ArrayList<>();
-
-		assertEquals(gatewayParameterTypes.length, gatewayParameterAnnotations.length);
-
-		// filter out the RpcTimeout parameters
-		for (int i = 0; i < gatewayParameterTypes.length; i++) {
-			if (!RpcCompletenessTest.isRpcTimeout(gatewayParameterAnnotations[i])) {
-				filteredGatewayParameterTypes.add(gatewayParameterTypes[i]);
-			}
-		}
-
-		if (filteredGatewayParameterTypes.size() != endpointParameterTypes.length) {
-			return false;
-		} else {
-			// check the parameter types
-			for (int i = 0; i < filteredGatewayParameterTypes.size(); i++) {
-				if (!checkType(filteredGatewayParameterTypes.get(i), endpointParameterTypes[i])) {
-					return false;
-				}
-			}
-
-			// check the return types
-			if (endpointMethod.getReturnType() == void.class) {
-				if (gatewayMethod.getReturnType() != void.class) {
-					return false;
-				}
-			} else {
-				// has return value. The gateway method should be wrapped in a future
-				Class<?> futureClass = gatewayMethod.getReturnType();
-
-				// sanity check that the return type of a gateway method must be void or a future
-				if (!futureClass.equals(RpcCompletenessTest.futureClass)) {
-					return false;
-				} else {
-					ReflectionUtil.FullTypeInfo fullValueTypeInfo = ReflectionUtil.getFullTemplateType(gatewayMethod.getGenericReturnType(), 0);
-
-					if (endpointMethod.getReturnType().equals(futureClass)) {
-						ReflectionUtil.FullTypeInfo fullRpcEndpointValueTypeInfo = ReflectionUtil.getFullTemplateType(endpointMethod.getGenericReturnType(), 0);
-
-						// check if we have the same future value types
-						if (fullValueTypeInfo != null && fullRpcEndpointValueTypeInfo != null) {
-							Iterator<Class<?>> valueClasses = fullValueTypeInfo.getClazzIterator();
-							Iterator<Class<?>> rpcClasses = fullRpcEndpointValueTypeInfo.getClazzIterator();
-
-							while (valueClasses.hasNext() && rpcClasses.hasNext()) {
-								if (!checkType(valueClasses.next(), rpcClasses.next())) {
-									return false;
-								}
-							}
-
-							// both should be empty
-							return !valueClasses.hasNext() && !rpcClasses.hasNext();
-						}
-					} else {
-						if (fullValueTypeInfo != null && !checkType(fullValueTypeInfo.getClazz(), endpointMethod.getReturnType())) {
-							return false;
-						}
-					}
-				}
-			}
-
-			return gatewayMethod.getName().equals(endpointMethod.getName());
-		}
-	}
-
-	private boolean checkType(Class<?> firstType, Class<?> secondType) {
-		Class<?> firstResolvedType;
-		Class<?> secondResolvedType;
-
-		if (firstType.isPrimitive()) {
-			firstResolvedType = RpcCompletenessTest.resolvePrimitiveType(firstType);
-		} else {
-			firstResolvedType = firstType;
-		}
-
-		if (secondType.isPrimitive()) {
-			secondResolvedType = RpcCompletenessTest.resolvePrimitiveType(secondType);
-		} else {
-			secondResolvedType = secondType;
-		}
-
-		return firstResolvedType.equals(secondResolvedType);
-	}
-
-	/**
-	 * Generates from a gateway rpc method signature the corresponding rpc endpoint signature.
-	 *
-	 * For example the {@link RpcTimeout} annotation adds an additional parameter to the gateway
-	 * signature which is not relevant on the server side.
-	 *
-	 * @param method Method to generate the signature string for
-	 * @return String of the respective server side rpc method signature
-	 */
-	private String generateEndpointMethodSignature(Method method) {
-		StringBuilder builder = new StringBuilder();
-
-		if (method.getReturnType().equals(Void.TYPE)) {
-			builder.append("void").append(" ");
-		} else if (method.getReturnType().equals(futureClass)) {
-			ReflectionUtil.FullTypeInfo fullTypeInfo = ReflectionUtil.getFullTemplateType(method.getGenericReturnType(), 0);
-
-			builder
-				.append(futureClass.getSimpleName())
-				.append("<")
-				.append(fullTypeInfo != null ? fullTypeInfo.toString() : "")
-				.append(">");
-
-			if (fullTypeInfo != null) {
-				builder.append("/").append(fullTypeInfo);
-			}
-
-			builder.append(" ");
-		} else {
-			return "Invalid rpc method signature.";
-		}
-
-		builder.append(method.getName()).append("(");
-
-		Class<?>[] parameterTypes = method.getParameterTypes();
-		Annotation[][] parameterAnnotations = method.getParameterAnnotations();
-
-		assertEquals(parameterTypes.length, parameterAnnotations.length);
-
-		for (int i = 0; i < parameterTypes.length; i++) {
-			// filter out the RpcTimeout parameters
-			if (!RpcCompletenessTest.isRpcTimeout(parameterAnnotations[i])) {
-				builder.append(parameterTypes[i].getName());
-
-				if (i < parameterTypes.length -1) {
-					builder.append(", ");
-				}
-			}
-		}
-
-		builder.append(")");
-
-		return builder.toString();
-	}
-
-	private static boolean isRpcTimeout(Annotation[] annotations) {
-		for (Annotation annotation : annotations) {
-			if (annotation.annotationType().equals(RpcTimeout.class)) {
-				return true;
-			}
-		}
-
-		return false;
-	}
-
-	/**
-	 * Returns the boxed type for a primitive type.
-	 *
-	 * @param primitveType Primitive type to resolve
-	 * @return Boxed type for the given primitive type
-	 */
-	private static Class<?> resolvePrimitiveType(Class<?> primitveType) {
-		assert primitveType.isPrimitive();
-
-		TypeInformation<?> typeInformation = BasicTypeInfo.getInfoFor(primitveType);
-
-		if (typeInformation != null) {
-			return typeInformation.getTypeClass();
-		} else {
-			throw new RuntimeException("Could not retrive basic type information for primitive type " + primitveType + '.');
-		}
-	}
-
-	/**
-	 * Extract all rpc methods defined by the gateway interface
-	 *
-	 * @param interfaceClass the given rpc gateway interface
-	 * @return all methods defined by the given interface
-	 */
-	private List<Method> getRpcMethodsFromGateway(Class<? extends RpcGateway> interfaceClass) {
-		if(!interfaceClass.isInterface()) {
-			fail(interfaceClass.getName() + " is not a interface");
-		}
-
-		ArrayList<Method> allMethods = new ArrayList<>();
-		// Methods defined in RpcGateway are native method
-		if(interfaceClass.equals(RpcGateway.class)) {
-			return allMethods;
-		}
-
-		// Get all methods declared in current interface
-		Collections.addAll(allMethods, interfaceClass.getDeclaredMethods());
-
-		// Get all method inherited from super interface
-		for (Class<?> superClass : interfaceClass.getInterfaces()) {
-			@SuppressWarnings("unchecked")
-			Class<? extends RpcGateway> gatewayClass = (Class<? extends RpcGateway>) superClass;
-			allMethods.addAll(getRpcMethodsFromGateway(gatewayClass));
-		}
-		return allMethods;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d95d20eb/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java
new file mode 100644
index 0000000..b3e8ee6
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.util.TestLogger;
+
+import akka.actor.ActorSystem;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import scala.concurrent.duration.FiniteDuration;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the RpcEndpoint and its self gateways.
+ */
+public class RpcEndpointTest extends TestLogger {
+
+	private static final Time TIMEOUT = Time.seconds(10L);
+	private static ActorSystem actorSystem = null;
+	private static RpcService rpcService = null;
+
+	@BeforeClass
+	public static void setup() {
+		actorSystem = AkkaUtils.createDefaultActorSystem();
+		rpcService = new AkkaRpcService(actorSystem, TIMEOUT);
+	}
+
+	@AfterClass
+	public static void teardown() throws Exception {
+		if (rpcService != null) {
+			rpcService.stopService();
+		}
+
+		if (actorSystem != null) {
+			actorSystem.shutdown();
+		}
+
+		if (rpcService != null) {
+			rpcService.getTerminationFuture().get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
+		}
+
+		if (actorSystem != null) {
+			actorSystem.awaitTermination(new FiniteDuration(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS));
+		}
+	}
+
+	/**
+	 * Tests that we can obtain the self gateway from a RpcEndpoint and can interact with
+	 * it via the self gateway.
+	 */
+	@Test
+	public void testSelfGateway() throws Exception {
+		int expectedValue = 1337;
+		BaseEndpoint baseEndpoint = new BaseEndpoint(rpcService, expectedValue);
+
+		try {
+			baseEndpoint.start();
+
+			BaseGateway baseGateway = baseEndpoint.getSelfGateway(BaseGateway.class);
+
+			CompletableFuture<Integer> foobar = baseGateway.foobar();
+
+			assertEquals(Integer.valueOf(expectedValue), foobar.get());
+		} finally {
+			baseEndpoint.shutDown();
+		}
+	}
+
+	/**
+	 * Tests that we cannot accidentally obtain a wrong self gateway type which is
+	 * not implemented by the RpcEndpoint.
+	 */
+	@Test(expected = RuntimeException.class)
+	public void testWrongSelfGateway() throws Exception {
+		int expectedValue = 1337;
+		BaseEndpoint baseEndpoint = new BaseEndpoint(rpcService, expectedValue);
+
+		try {
+			baseEndpoint.start();
+
+			DifferentGateway differentGateway = baseEndpoint.getSelfGateway(DifferentGateway.class);
+
+			fail("Expected to fail with a RuntimeException since we requested the wrong gateway type.");
+		} finally {
+			baseEndpoint.shutDown();
+		}
+	}
+
+	/**
+	 * Tests that we can extend existing RpcEndpoints and can communicate with them via the
+	 * self gateways.
+	 */
+	@Test
+	public void testEndpointInheritance() throws Exception {
+		int foobar = 1;
+		int barfoo = 2;
+		String foo = "foobar";
+
+		ExtendedEndpoint endpoint = new ExtendedEndpoint(rpcService, foobar, barfoo, foo);
+
+		try {
+			endpoint.start();
+
+			BaseGateway baseGateway = endpoint.getSelfGateway(BaseGateway.class);
+			ExtendedGateway extendedGateway = endpoint.getSelfGateway(ExtendedGateway.class);
+			DifferentGateway differentGateway = endpoint.getSelfGateway(DifferentGateway.class);
+
+			assertEquals(Integer.valueOf(foobar), baseGateway.foobar().get());
+			assertEquals(Integer.valueOf(foobar), extendedGateway.foobar().get());
+
+			assertEquals(Integer.valueOf(barfoo), extendedGateway.barfoo().get());
+			assertEquals(foo, differentGateway.foo().get());
+		} finally {
+			endpoint.shutDown();
+		}
+	}
+
+	public interface BaseGateway extends RpcGateway {
+		CompletableFuture<Integer> foobar();
+	}
+
+	public interface ExtendedGateway extends BaseGateway {
+		CompletableFuture<Integer> barfoo();
+	}
+
+	public interface DifferentGateway extends RpcGateway {
+		CompletableFuture<String> foo();
+	}
+
+	public static class BaseEndpoint extends RpcEndpoint implements BaseGateway {
+
+		private final int foobarValue;
+
+		protected BaseEndpoint(RpcService rpcService, int foobarValue) {
+			super(rpcService);
+
+			this.foobarValue = foobarValue;
+		}
+
+		@Override
+		public CompletableFuture<Integer> foobar() {
+			return CompletableFuture.completedFuture(foobarValue);
+		}
+	}
+
+	public static class ExtendedEndpoint extends BaseEndpoint implements ExtendedGateway, DifferentGateway {
+
+		private final int barfooValue;
+
+		private final String fooString;
+
+		protected ExtendedEndpoint(RpcService rpcService, int foobarValue, int barfooValue, String fooString) {
+			super(rpcService, foobarValue);
+
+			this.barfooValue = barfooValue;
+			this.fooString = fooString;
+		}
+
+		@Override
+		public CompletableFuture<Integer> barfoo() {
+			return CompletableFuture.completedFuture(barfooValue);
+		}
+
+		@Override
+		public CompletableFuture<String> foo() {
+			return CompletableFuture.completedFuture(fooString);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d95d20eb/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
index 37349a1..cb38f6f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.rpc;
 
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
@@ -30,8 +29,8 @@ import java.lang.annotation.Annotation;
 import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.Method;
 import java.lang.reflect.Proxy;
-import java.util.BitSet;
 import java.util.List;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
@@ -139,32 +138,32 @@ public class TestingSerialRpcService implements RpcService {
 	}
 
 	@Override
-	public void stopServer(RpcGateway selfGateway) {
+	public void stopServer(RpcServer selfGateway) {
 		registeredConnections.remove(selfGateway.getAddress());
 	}
 
 	@Override
-	public <C extends RpcGateway, S extends RpcEndpoint<C>> C startServer(S rpcEndpoint) {
+	public <S extends RpcEndpoint & RpcGateway> RpcServer startServer(S rpcEndpoint) {
 		final String address = UUID.randomUUID().toString();
 
 		InvocationHandler akkaInvocationHandler = new TestingSerialRpcService.TestingSerialInvocationHandler<>(address, rpcEndpoint);
 		ClassLoader classLoader = getClass().getClassLoader();
 
+		Set<Class<? extends RpcGateway>> implementedRpcGateways = RpcUtils.extractImplementedRpcGateways(rpcEndpoint.getClass());
+
+		implementedRpcGateways.add(RpcServer.class);
+
+
 		@SuppressWarnings("unchecked")
-		C self = (C) Proxy.newProxyInstance(
+		RpcServer rpcServer = (RpcServer) Proxy.newProxyInstance(
 			classLoader,
-			new Class<?>[]{
-				rpcEndpoint.getSelfGatewayType(),
-				MainThreadExecutable.class,
-				StartStoppable.class,
-				RpcGateway.class
-			},
+			implementedRpcGateways.toArray(new Class<?>[implementedRpcGateways.size()]),
 			akkaInvocationHandler);
 
 		// register self
-		registeredConnections.putIfAbsent(self.getAddress(), self);
+		registeredConnections.putIfAbsent(rpcServer.getAddress(), rpcServer);
 
-		return self;
+		return rpcServer;
 	}
 
 	@Override
@@ -211,7 +210,7 @@ public class TestingSerialRpcService implements RpcService {
 		registeredConnections.clear();
 	}
 
-	private static final class TestingSerialInvocationHandler<C extends RpcGateway, T extends RpcEndpoint<C>> implements InvocationHandler, RpcGateway, MainThreadExecutable, StartStoppable {
+	private static final class TestingSerialInvocationHandler<T extends RpcEndpoint & RpcGateway> implements InvocationHandler, RpcGateway, MainThreadExecutable, StartStoppable {
 
 		private final T rpcEndpoint;
 
@@ -234,7 +233,9 @@ public class TestingSerialRpcService implements RpcService {
 		public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
 			Class<?> declaringClass = method.getDeclaringClass();
 			if (declaringClass.equals(MainThreadExecutable.class) ||
-				declaringClass.equals(Object.class) || declaringClass.equals(StartStoppable.class) ||
+				declaringClass.equals(Object.class) ||
+				declaringClass.equals(StartStoppable.class) ||
+				declaringClass.equals(RpcServer.class) ||
 				declaringClass.equals(RpcGateway.class)) {
 				return method.invoke(this, args);
 			} else {
@@ -243,22 +244,17 @@ public class TestingSerialRpcService implements RpcService {
 				Annotation[][] parameterAnnotations = method.getParameterAnnotations();
 				Time futureTimeout = extractRpcTimeout(parameterAnnotations, args, timeout);
 
-				final Tuple2<Class<?>[], Object[]> filteredArguments = filterArguments(
-					parameterTypes,
-					parameterAnnotations,
-					args);
-
 				Class<?> returnType = method.getReturnType();
 
 				if (returnType.equals(CompletableFuture.class)) {
 					try {
-						Object result = handleRpcInvocationSync(methodName, filteredArguments.f0, filteredArguments.f1, futureTimeout);
+						Object result = handleRpcInvocationSync(methodName, parameterTypes, args, futureTimeout);
 						return CompletableFuture.completedFuture(result);
 					} catch (Throwable e) {
 						return FutureUtils.completedExceptionally(e);
 					}
 				} else {
-					return handleRpcInvocationSync(methodName, filteredArguments.f0, filteredArguments.f1, futureTimeout);
+					return handleRpcInvocationSync(methodName, parameterTypes, args, futureTimeout);
 				}
 			}
 		}
@@ -380,61 +376,6 @@ public class TestingSerialRpcService implements RpcService {
 		}
 
 		/**
-		 * Removes all {@link RpcTimeout} annotated parameters from the parameter type and argument
-		 * list.
-		 *
-		 * @param parameterTypes       Array of parameter types
-		 * @param parameterAnnotations Array of parameter annotations
-		 * @param args                 Arary of arguments
-		 * @return Tuple of filtered parameter types and arguments which no longer contain the
-		 * {@link RpcTimeout} annotated parameter types and arguments
-		 */
-		private static Tuple2<Class<?>[], Object[]> filterArguments(
-			Class<?>[] parameterTypes,
-			Annotation[][] parameterAnnotations,
-			Object[] args) {
-
-			Class<?>[] filteredParameterTypes;
-			Object[] filteredArgs;
-
-			if (args == null) {
-				filteredParameterTypes = parameterTypes;
-				filteredArgs = null;
-			} else {
-				Preconditions.checkArgument(parameterTypes.length == parameterAnnotations.length);
-				Preconditions.checkArgument(parameterAnnotations.length == args.length);
-
-				BitSet isRpcTimeoutParameter = new BitSet(parameterTypes.length);
-				int numberRpcParameters = parameterTypes.length;
-
-				for (int i = 0; i < parameterTypes.length; i++) {
-					if (isRpcTimeout(parameterAnnotations[i])) {
-						isRpcTimeoutParameter.set(i);
-						numberRpcParameters--;
-					}
-				}
-
-				if (numberRpcParameters == parameterTypes.length) {
-					filteredParameterTypes = parameterTypes;
-					filteredArgs = args;
-				} else {
-					filteredParameterTypes = new Class<?>[numberRpcParameters];
-					filteredArgs = new Object[numberRpcParameters];
-					int counter = 0;
-
-					for (int i = 0; i < parameterTypes.length; i++) {
-						if (!isRpcTimeoutParameter.get(i)) {
-							filteredParameterTypes[counter] = parameterTypes[i];
-							filteredArgs[counter] = args[i];
-							counter++;
-						}
-					}
-				}
-			}
-			return Tuple2.of(filteredParameterTypes, filteredArgs);
-		}
-
-		/**
 		 * Checks whether any of the annotations is of type {@link RpcTimeout}
 		 *
 		 * @param annotations Array of annotations

http://git-wip-us.apache.org/repos/asf/flink/blob/d95d20eb/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
index 793d292..56d17e9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcGateway;
-import org.apache.flink.runtime.rpc.RpcMethod;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException;
 import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
@@ -105,7 +104,7 @@ public class AkkaRpcActorTest extends TestLogger {
 
 		DummyRpcEndpoint rpcEndpoint = new DummyRpcEndpoint(akkaRpcService);
 
-		DummyRpcGateway rpcGateway = rpcEndpoint.getSelf();
+		DummyRpcGateway rpcGateway = rpcEndpoint.getSelfGateway(DummyRpcGateway.class);
 
 		// this message should be discarded and completed with an AkkaRpcException
 		CompletableFuture<Integer> result = rpcGateway.foobar();
@@ -192,7 +191,7 @@ public class AkkaRpcActorTest extends TestLogger {
 		ExceptionalEndpoint rpcEndpoint = new ExceptionalEndpoint(akkaRpcService);
 		rpcEndpoint.start();
 
-		ExceptionalGateway rpcGateway = rpcEndpoint.getSelf();
+		ExceptionalGateway rpcGateway = rpcEndpoint.getSelfGateway(ExceptionalGateway.class);
 		CompletableFuture<Integer> result = rpcGateway.doStuff();
 
 		try {
@@ -211,7 +210,7 @@ public class AkkaRpcActorTest extends TestLogger {
 		ExceptionalFutureEndpoint rpcEndpoint = new ExceptionalFutureEndpoint(akkaRpcService);
 		rpcEndpoint.start();
 
-		ExceptionalGateway rpcGateway = rpcEndpoint.getSelf();
+		ExceptionalGateway rpcGateway = rpcEndpoint.getSelfGateway(ExceptionalGateway.class);
 		CompletableFuture<Integer> result = rpcGateway.doStuff();
 
 		try {
@@ -275,7 +274,7 @@ public class AkkaRpcActorTest extends TestLogger {
 		void tell(String message);
 	}
 
-	private static class DummyRpcEndpoint extends RpcEndpoint<DummyRpcGateway> {
+	private static class DummyRpcEndpoint extends RpcEndpoint implements DummyRpcGateway {
 
 		private volatile int _foobar = 42;
 
@@ -283,9 +282,9 @@ public class AkkaRpcActorTest extends TestLogger {
 			super(rpcService);
 		}
 
-		@RpcMethod
-		public int foobar() {
-			return _foobar;
+		@Override
+		public CompletableFuture<Integer> foobar() {
+			return CompletableFuture.completedFuture(_foobar);
 		}
 
 		public void setFoobar(int value) {
@@ -299,25 +298,25 @@ public class AkkaRpcActorTest extends TestLogger {
 		CompletableFuture<Integer> doStuff();
 	}
 
-	private static class ExceptionalEndpoint extends RpcEndpoint<ExceptionalGateway> {
+	private static class ExceptionalEndpoint extends RpcEndpoint implements ExceptionalGateway {
 
 		protected ExceptionalEndpoint(RpcService rpcService) {
 			super(rpcService);
 		}
 
-		@RpcMethod
-		public int doStuff() {
+		@Override
+		public CompletableFuture<Integer> doStuff() {
 			throw new RuntimeException("my super specific test exception");
 		}
 	}
 
-	private static class ExceptionalFutureEndpoint extends RpcEndpoint<ExceptionalGateway> {
+	private static class ExceptionalFutureEndpoint extends RpcEndpoint implements ExceptionalGateway {
 
 		protected ExceptionalFutureEndpoint(RpcService rpcService) {
 			super(rpcService);
 		}
 
-		@RpcMethod
+		@Override
 		public CompletableFuture<Integer> doStuff() {
 			final CompletableFuture<Integer> future = new CompletableFuture<>();
 
@@ -338,7 +337,7 @@ public class AkkaRpcActorTest extends TestLogger {
 
 	// ------------------------------------------------------------------------
 
-	private static class SimpleRpcEndpoint extends RpcEndpoint<RpcGateway> {
+	private static class SimpleRpcEndpoint extends RpcEndpoint implements RpcGateway {
 
 		protected SimpleRpcEndpoint(RpcService rpcService, String endpointId) {
 			super(rpcService, endpointId);
@@ -352,7 +351,7 @@ public class AkkaRpcActorTest extends TestLogger {
 
 	// ------------------------------------------------------------------------
 
-	private static class FailingPostStopEndpoint extends RpcEndpoint<RpcGateway> {
+	private static class FailingPostStopEndpoint extends RpcEndpoint implements RpcGateway {
 
 		protected FailingPostStopEndpoint(RpcService rpcService, String endpointId) {
 			super(rpcService, endpointId);

http://git-wip-us.apache.org/repos/asf/flink/blob/d95d20eb/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java
index 9f134d8..96a9ee4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java
@@ -23,7 +23,6 @@ import org.apache.flink.runtime.akka.AkkaUtils;
 
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcGateway;
-import org.apache.flink.runtime.rpc.RpcMethod;
 import org.apache.flink.runtime.rpc.RpcService;
 
 import org.apache.flink.util.TestLogger;
@@ -52,7 +51,7 @@ public class MainThreadValidationTest extends TestLogger {
 			testEndpoint.start();
 
 			// this works, because it is executed as an RPC call
-			testEndpoint.getSelf().someConcurrencyCriticalFunction();
+			testEndpoint.getSelfGateway(TestGateway.class).someConcurrencyCriticalFunction();
 
 			// this fails, because it is executed directly
 			boolean exceptionThrown;
@@ -65,7 +64,7 @@ public class MainThreadValidationTest extends TestLogger {
 			}
 			assertTrue("should fail with an assertion error", exceptionThrown);
 
-			akkaRpcService.stopServer(testEndpoint.getSelf());
+			testEndpoint.shutDown();
 		}
 		finally {
 			akkaRpcService.stopService();
@@ -82,13 +81,13 @@ public class MainThreadValidationTest extends TestLogger {
 	}
 
 	@SuppressWarnings("unused")
-	public static class TestEndpoint extends RpcEndpoint<TestGateway> {
+	public static class TestEndpoint extends RpcEndpoint implements TestGateway {
 
 		public TestEndpoint(RpcService rpcService) {
 			super(rpcService);
 		}
 
-		@RpcMethod
+		@Override
 		public void someConcurrencyCriticalFunction() {
 			validateRunsInMainThread();
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/d95d20eb/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java
index 34cf412..c722980 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java
@@ -25,7 +25,6 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcGateway;
-import org.apache.flink.runtime.rpc.RpcMethod;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.util.TestLogger;
 import org.hamcrest.core.Is;
@@ -85,7 +84,7 @@ public class MessageSerializationTest extends TestLogger {
 		TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService1, linkedBlockingQueue);
 		testEndpoint.start();
 
-		TestGateway testGateway = testEndpoint.getSelf();
+		TestGateway testGateway = testEndpoint.getSelfGateway(TestGateway.class);
 
 		NonSerializableObject expected = new NonSerializableObject(42);
 
@@ -169,7 +168,7 @@ public class MessageSerializationTest extends TestLogger {
 		void foobar(Object object) throws IOException, InterruptedException;
 	}
 
-	private static class TestEndpoint extends RpcEndpoint<TestGateway> {
+	private static class TestEndpoint extends RpcEndpoint implements TestGateway {
 
 		private final LinkedBlockingQueue<Object> queue;
 
@@ -178,7 +177,7 @@ public class MessageSerializationTest extends TestLogger {
 			this.queue = queue;
 		}
 
-		@RpcMethod
+		@Override
 		public void foobar(Object object) throws InterruptedException {
 			queue.put(object);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/d95d20eb/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
index 53c435e..4c87671 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
@@ -43,6 +43,7 @@ import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
 import org.apache.flink.runtime.resourcemanager.ResourceManager;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.resourcemanager.SlotRequest;
 import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
@@ -167,7 +168,7 @@ public class TaskExecutorITCase extends TestLogger {
 			any(Time.class))).thenReturn(mock(CompletableFuture.class, RETURNS_MOCKS));
 
 
-		rpcService.registerGateway(rmAddress, resourceManager.getSelf());
+		rpcService.registerGateway(rmAddress, resourceManager.getSelfGateway(ResourceManagerGateway.class));
 		rpcService.registerGateway(jmAddress, jmGateway);
 
 		final AllocationID allocationId = new AllocationID();
@@ -189,13 +190,14 @@ public class TaskExecutorITCase extends TestLogger {
 				jmLeaderId,
 				jmResourceId,
 				jmAddress,
-				jobId);
+				jobId,
+				Time.milliseconds(0L));
 
 			RegistrationResponse registrationResponse = registrationResponseFuture.get();
 
 			assertTrue(registrationResponse instanceof JobMasterRegistrationSuccess);
 
-			resourceManager.requestSlot(jmLeaderId, rmLeaderId, slotRequest);
+			resourceManager.requestSlot(jmLeaderId, rmLeaderId, slotRequest, Time.milliseconds(0L));
 
 			verify(jmGateway).offerSlots(
 				eq(taskManagerResourceId),


Mime
View raw message