flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [6/6] flink git commit: [FLINK-4987] Add RpcTaskManagerGateway implementation; Port AllocatedSlotsTest, AvailableSlotsTest and SlotPoolTest
Date Tue, 01 Nov 2016 08:42:10 GMT
[FLINK-4987] Add RpcTaskManagerGateway implementation; Port AllocatedSlotsTest, AvailableSlotsTest and SlotPoolTest

The RpcTaskManagerGateway is the TaskManagerGateway of Flink's new RPC abstraction. It basically forwards all calls to the underlying TaskExecutorGateway.

Moreover, this PR enables the disabled tests AllocatedSlotsTest, AvailableSlotsTest and SlotPoolTest.

Add license header to RpcTaskManagerGateway

Fix ExecutionGraphMetricsTest


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4129de28
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4129de28
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4129de28

Branch: refs/heads/flip-6
Commit: 4129de281277f39755619564e14790bb86b0c0e0
Parents: 8b2731a
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Tue Nov 1 03:03:23 2016 +0100
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Tue Nov 1 09:41:33 2016 +0100

----------------------------------------------------------------------
 .../flink/runtime/executiongraph/Execution.java |   1 +
 .../runtime/executiongraph/ExecutionVertex.java |   3 +-
 .../apache/flink/runtime/instance/SlotPool.java |  19 +-
 .../runtime/jobmanager/slots/AllocatedSlot.java |  26 +-
 .../flink/runtime/jobmaster/JobMaster.java      |  28 +-
 .../jobmaster/RpcTaskManagerGateway.java        | 142 +++++
 .../resourcemanager/ResourceManager.java        |   5 +
 .../taskexecutor/TaskExecutorGateway.java       |  12 +-
 .../ExecutionGraphMetricsTest.java              |   8 +-
 .../runtime/instance/AllocatedSlotsTest.java    | 275 +++++----
 .../runtime/instance/AvailableSlotsTest.java    | 244 ++++----
 .../flink/runtime/instance/SlotPoolTest.java    | 608 ++++++++++---------
 .../runtime/minicluster/MiniClusterITCase.java  |   4 +-
 .../runtime/rpc/TestingSerialRpcService.java    |   5 +-
 14 files changed, 775 insertions(+), 605 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4129de28/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index c0ce799..1ebd409 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -298,6 +298,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 			});
 
 			// if tasks have to scheduled immediately check that the task has been deployed
+			// TODO: This might be problematic if the future is not completed right away
 			if (!queued) {
 				if (!deploymentFuture.isDone()) {
 					markFailed(new IllegalArgumentException("The slot allocation future has not been completed yet."));

http://git-wip-us.apache.org/repos/asf/flink/blob/4129de28/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index eea2e81..b9cc80a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -24,7 +24,6 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.blob.BlobKey;
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.PartialInputChannelDeploymentDescriptor;
@@ -606,7 +605,7 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 
 		return new TaskDeploymentDescriptor(
 			getJobId(),
-			new AllocationID(), // TODO: Obtain the proper allocation id from the slot
+			targetSlot.getAllocatedSlot().getSlotAllocationId(),
 			getExecutionGraph().getJobName(),
 			getJobvertexId(),
 			executionId,

http://git-wip-us.apache.org/repos/asf/flink/blob/4129de28/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
index 7c298ce..65a5c45 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
@@ -172,7 +172,7 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
 	 *
 	 * @param jobManagerLeaderId The necessary leader id for running the job.
 	 */
-	public void start(UUID jobManagerLeaderId) {
+	public void start(UUID jobManagerLeaderId) throws Exception {
 		this.jobManagerLeaderId = jobManagerLeaderId;
 		super.start();
 	}
@@ -289,7 +289,7 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
 		pendingRequests.put(allocationID, new PendingRequest(allocationID, future, resources));
 
 		Future<RMSlotRequestReply> rmResponse = resourceManagerGateway.requestSlot(
-				resourceManagerLeaderId, jobManagerLeaderId,
+				jobManagerLeaderId, resourceManagerLeaderId,
 				new SlotRequest(jobId, allocationID, resources),
 				resourceManagerRequestsTimeout);
 
@@ -573,7 +573,7 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
 	/**
 	 * Organize allocated slots from different points of view.
 	 */
-	private static class AllocatedSlots {
+	static class AllocatedSlots {
 
 		/** All allocated slots organized by TaskManager's id */
 		private final Map<ResourceID, Set<Slot>> allocatedSlotsByTaskManager;
@@ -644,7 +644,7 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
 				Set<Slot> slotsForTM = allocatedSlotsByTaskManager.get(taskManagerId);
 				slotsForTM.remove(slot);
 				if (slotsForTM.isEmpty()) {
-					allocatedSlotsByTaskManager.get(taskManagerId);
+					allocatedSlotsByTaskManager.remove(taskManagerId);
 				}
 				return slot;
 			}
@@ -686,6 +686,15 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
 		int size() {
 			return allocatedSlotsById.size();
 		}
+
+		@VisibleForTesting
+		Set<Slot> getSlotsForTaskManager(ResourceID resourceId) {
+			if (allocatedSlotsByTaskManager.containsKey(resourceId)) {
+				return allocatedSlotsByTaskManager.get(resourceId);
+			} else {
+				return Collections.emptySet();
+			}
+		}
 	}
 
 	// ------------------------------------------------------------------------
@@ -693,7 +702,7 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
 	/**
 	 * Organize all available slots from different points of view.
 	 */
-	private static class AvailableSlots {
+	static class AvailableSlots {
 
 		/** All available slots organized by TaskManager */
 		private final HashMap<ResourceID, Set<AllocatedSlot>> availableSlotsByTaskManager;

http://git-wip-us.apache.org/repos/asf/flink/blob/4129de28/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/AllocatedSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/AllocatedSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/AllocatedSlot.java
index b44128b..269a8f3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/AllocatedSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/AllocatedSlot.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -53,11 +52,8 @@ public class AllocatedSlot {
 	/** The resource profile of the slot provides */
 	private final ResourceProfile resourceProfile;
 
-	/** TEMP until the new RPC is in place: The actor gateway to communicate with the TaskManager */
-	private final TaskManagerGateway taskManagerGateway;
-
 	/** RPC gateway to call the TaskManager that holds this slot */
-	private final TaskExecutorGateway taskManagerGateway;
+	private final TaskManagerGateway taskManagerGateway;
 
 	/** The number of the slot on the TaskManager to which slot belongs. Purely informational. */
 	private final int slotNumber;
@@ -69,23 +65,6 @@ public class AllocatedSlot {
 			JobID jobID,
 			TaskManagerLocation location,
 			int slotNumber,
-			ResourceProfile resourceProfile,
-			TaskManagerGateway taskManagerGateway)
-	{
-		this.slotAllocationId = checkNotNull(slotAllocationId);
-		this.jobID = checkNotNull(jobID);
-		this.taskManagerLocation = checkNotNull(location);
-		this.slotNumber = slotNumber;
-		this.resourceProfile = checkNotNull(resourceProfile);
-		this.taskManagerGateway = checkNotNull(taskManagerGateway);
-		this.taskManagerGateway = null;
-	}
-
-	public AllocatedSlot(
-			AllocationID slotAllocationId,
-			JobID jobID,
-			TaskManagerLocation location,
-			int slotNumber,
 			ResourceProfile resourceProfile,		
 			TaskManagerGateway taskManagerGateway) {
 		this.slotAllocationId = checkNotNull(slotAllocationId);
@@ -93,7 +72,6 @@ public class AllocatedSlot {
 		this.taskManagerLocation = checkNotNull(location);
 		this.slotNumber = slotNumber;
 		this.resourceProfile = checkNotNull(resourceProfile);
-		this.taskManagerActorGateway = null;
 		this.taskManagerGateway = checkNotNull(taskManagerGateway);
 	}
 
@@ -163,7 +141,7 @@ public class AllocatedSlot {
 	 * @return The actor gateway that can be used to send messages to the TaskManager.
 	 */
 	public TaskManagerGateway getTaskManagerGateway() {
-		return 	return taskManagerGateway;
+		return taskManagerGateway;
 	}
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/4129de28/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index c82ac01..d0d1d39 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -308,9 +308,15 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 
 		log.info("Starting execution of job {} ({})", jobGraph.getName(), jobGraph.getJobID());
 
-		// start the slot pool make sure the slot pool now accepts messages for this leader
-		log.debug("Staring SlotPool component");
-		slotPool.start(leaderSessionID);
+		try {
+			// start the slot pool make sure the slot pool now accepts messages for this leader
+			log.debug("Staring SlotPool component");
+			slotPool.start(leaderSessionID);
+		} catch (Exception e) {
+			log.error("Faild to start job {} ({})", jobGraph.getName(), jobGraph.getJobID(), e);
+
+			handleFatalError(new Exception("Could not start job execution: Failed to start the slot pool.", e));
+		}
 
 		try {
 			// job is ready to go, try to establish connection with resource manager
@@ -638,24 +644,30 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 			final Iterable<SlotOffer> slots,
 			final UUID leaderId) throws Exception {
 
-		validateLeaderSessionId(leaderSessionID);
+		validateLeaderSessionId(leaderId);
 
 		Tuple2<TaskManagerLocation, TaskExecutorGateway> taskManager = registeredTaskManagers.get(taskManagerId);
+
 		if (taskManager == null) {
 			throw new Exception("Unknown TaskManager " + taskManagerId);
 		}
 
 		final JobID jid = jobGraph.getJobID();
 		final TaskManagerLocation taskManagerLocation = taskManager.f0;
-		final TaskExecutorGateway taskManagerGateway = taskManager.f1;
+		final TaskExecutorGateway taskExecutorGateway = taskManager.f1;
 
 		final ArrayList<Tuple2<AllocatedSlot, SlotOffer>> slotsAndOffers = new ArrayList<>();
 
+		final RpcTaskManagerGateway rpcTaskManagerGateway = new RpcTaskManagerGateway(taskExecutorGateway, leaderId);
+
 		for (SlotOffer slotOffer : slots) {
 			final AllocatedSlot slot = new AllocatedSlot(
-					slotOffer.getAllocationId(), jid, taskManagerLocation,
-					slotOffer.getSlotIndex(), slotOffer.getResourceProfile(),
-					taskManagerGateway);
+				slotOffer.getAllocationId(),
+				jid,
+				taskManagerLocation,
+				slotOffer.getSlotIndex(),
+				slotOffer.getResourceProfile(),
+				rpcTaskManagerGateway);
 
 			slotsAndOffers.add(new Tuple2<>(slot, slotOffer));
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/4129de28/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java
new file mode 100644
index 0000000..eba97d2
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.PartitionInfo;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.StackTrace;
+import org.apache.flink.runtime.messages.StackTraceSampleResponse;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.util.Preconditions;
+
+import java.util.UUID;
+
+/**
+ * Implementation of the {@link TaskManagerGateway} for Flink's RPC system
+ */
+public class RpcTaskManagerGateway implements TaskManagerGateway {
+
+	private final TaskExecutorGateway taskExecutorGateway;
+
+	private final UUID leaderId;
+
+	public RpcTaskManagerGateway(TaskExecutorGateway taskExecutorGateway, UUID leaderId) {
+		this.taskExecutorGateway = Preconditions.checkNotNull(taskExecutorGateway);
+		this.leaderId = Preconditions.checkNotNull(leaderId);
+	}
+
+	@Override
+	public String getAddress() {
+		return taskExecutorGateway.getAddress();
+	}
+
+	@Override
+	public void disconnectFromJobManager(InstanceID instanceId, Exception cause) {
+//		taskExecutorGateway.disconnectFromJobManager(instanceId, cause);
+		throw new UnsupportedOperationException("Operation is not yet supported.");
+	}
+
+	@Override
+	public void stopCluster(ApplicationStatus applicationStatus, String message) {
+//		taskExecutorGateway.stopCluster(applicationStatus, message);
+		throw new UnsupportedOperationException("Operation is not yet supported.");
+	}
+
+	@Override
+	public Future<StackTrace> requestStackTrace(Time timeout) {
+//		return taskExecutorGateway.requestStackTrace(timeout);
+		throw new UnsupportedOperationException("Operation is not yet supported.");
+	}
+
+	@Override
+	public Future<StackTraceSampleResponse> requestStackTraceSample(
+			ExecutionAttemptID executionAttemptID,
+			int sampleId,
+			int numSamples,
+			Time delayBetweenSamples,
+			int maxStackTraceDepth,
+			Time timeout) {
+//		return taskExecutorGateway.requestStackTraceSample(
+//			executionAttemptID,
+//			sampleId,
+//			numSamples,
+//			delayBetweenSamples,
+//			maxStackTraceDepth,
+//			timeout);
+
+		throw new UnsupportedOperationException("Operation is not yet supported.");
+	}
+
+	@Override
+	public Future<Acknowledge> submitTask(TaskDeploymentDescriptor tdd, Time timeout) {
+		return taskExecutorGateway.submitTask(tdd, leaderId, timeout);
+	}
+
+	@Override
+	public Future<Acknowledge> stopTask(ExecutionAttemptID executionAttemptID, Time timeout) {
+		return taskExecutorGateway.stopTask(executionAttemptID, timeout);
+	}
+
+	@Override
+	public Future<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptID, Time timeout) {
+		return taskExecutorGateway.cancelTask(executionAttemptID, timeout);
+	}
+
+	@Override
+	public Future<Acknowledge> updatePartitions(ExecutionAttemptID executionAttemptID, Iterable<PartitionInfo> partitionInfos, Time timeout) {
+		return taskExecutorGateway.updatePartitions(executionAttemptID, partitionInfos, timeout);
+	}
+
+	@Override
+	public void failPartition(ExecutionAttemptID executionAttemptID) {
+		taskExecutorGateway.failPartition(executionAttemptID);
+	}
+
+	@Override
+	public void notifyCheckpointComplete(ExecutionAttemptID executionAttemptID, JobID jobId, long checkpointId, long timestamp) {
+//		taskExecutorGateway.notifyCheckpointComplete(executionAttemptID, jobId, checkpointId, timestamp);
+		throw new UnsupportedOperationException("Operation is not yet supported.");
+	}
+
+	@Override
+	public void triggerCheckpoint(ExecutionAttemptID executionAttemptID, JobID jobId, long checkpointId, long timestamp) {
+//		taskExecutorGateway.triggerCheckpoint(executionAttemptID, jobId, checkpointId, timestamp);
+		throw new UnsupportedOperationException("Operation is not yet supported.");
+	}
+
+	@Override
+	public Future<BlobKey> requestTaskManagerLog(Time timeout) {
+//		return taskExecutorGateway.requestTaskManagerLog(timeout);
+		throw new UnsupportedOperationException("Operation is not yet supported.");
+	}
+
+	@Override
+	public Future<BlobKey> requestTaskManagerStdout(Time timeout) {
+//		return taskExecutorGateway.requestTaskManagerStdout(timeout);
+		throw new UnsupportedOperationException("Operation is not yet supported.");
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4129de28/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index a81c214..145cc40 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -394,6 +394,11 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 			UUID resourceManagerLeaderID,
 			SlotRequest slotRequest) {
 
+		log.info("Request slot with profile {} for job {} with allocation id {}.",
+			slotRequest.getResourceProfile(),
+			slotRequest.getJobId(),
+			slotRequest.getAllocationId());
+
 		JobID jobId = slotRequest.getJobId();
 		JobManagerRegistration jobManagerRegistration = jobManagerRegistrations.get(jobId);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4129de28/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
index 1ffc407..ebd4c0c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
@@ -75,9 +75,13 @@ public interface TaskExecutorGateway extends RpcGateway {
 	 *
 	 * @param executionAttemptID identifying the task
 	 * @param partitionInfos telling where the partition can be retrieved from
+	 * @param timeout for the update partitions operation
 	 * @return Future acknowledge if the partitions have been successfully updated
 	 */
-	Future<Acknowledge> updatePartitions(ExecutionAttemptID executionAttemptID, Iterable<PartitionInfo> partitionInfos);
+	Future<Acknowledge> updatePartitions(
+		ExecutionAttemptID executionAttemptID,
+		Iterable<PartitionInfo> partitionInfos,
+		@RpcTimeout Time timeout);
 
 	/**
 	 * Fail all intermediate result partitions of the given task.
@@ -112,15 +116,17 @@ public interface TaskExecutorGateway extends RpcGateway {
 	 * Stop the given task.
 	 *
 	 * @param executionAttemptID identifying the task
+	 * @param timeout for the stop operation
 	 * @return Future acknowledge if the task is successfully stopped
 	 */
-	Future<Acknowledge> stopTask(ExecutionAttemptID executionAttemptID);
+	Future<Acknowledge> stopTask(ExecutionAttemptID executionAttemptID, @RpcTimeout Time timeout);
 
 	/**
 	 * Cancel the given task.
 	 *
 	 * @param executionAttemptID identifying the task
+	 * @param timeout for the cancel operation
 	 * @return Future acknowledge if the task is successfully canceled
 	 */
-	Future<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptID);
+	Future<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptID, @RpcTimeout Time timeout);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4129de28/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
index 34d220e..c53064f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
@@ -29,6 +29,7 @@ import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.reporter.MetricReporter;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
@@ -36,6 +37,7 @@ import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.execution.SuppressRestartsException;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
 import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
@@ -127,6 +129,9 @@ public class ExecutionGraphMetricsTest extends TestLogger {
 
 			Slot rootSlot = mock(Slot.class);
 
+			AllocatedSlot mockAllocatedSlot = mock(AllocatedSlot.class);
+			when(mockAllocatedSlot.getSlotAllocationId()).thenReturn(new AllocationID());
+
 			SimpleSlot simpleSlot = mock(SimpleSlot.class);
 			when(simpleSlot.isAlive()).thenReturn(true);
 			when(simpleSlot.getTaskManagerLocation()).thenReturn(taskManagerLocation);
@@ -134,6 +139,7 @@ public class ExecutionGraphMetricsTest extends TestLogger {
 			when(simpleSlot.getTaskManagerGateway()).thenReturn(taskManagerGateway);
 			when(simpleSlot.setExecutedVertex(Matchers.any(Execution.class))).thenReturn(true);
 			when(simpleSlot.getRoot()).thenReturn(rootSlot);
+			when(simpleSlot.getAllocatedSlot()).thenReturn(mockAllocatedSlot);
 
 			FlinkCompletableFuture<SimpleSlot> future = new FlinkCompletableFuture<>();
 			future.complete(simpleSlot);
@@ -174,7 +180,7 @@ public class ExecutionGraphMetricsTest extends TestLogger {
 	
 			// start execution
 			executionGraph.scheduleForExecution(scheduler);
-	
+
 			assertTrue(0L == restartingTime.getValue());
 	
 			List<ExecutionAttemptID> executionIDs = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/flink/blob/4129de28/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotsTest.java
index 33ed679..e654a99 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotsTest.java
@@ -1,135 +1,140 @@
-///*
-// * Licensed to the Apache Software Foundation (ASF) under one
-// * or more contributor license agreements.  See the NOTICE file
-// * distributed with this work for additional information
-// * regarding copyright ownership.  The ASF licenses this file
-// * to you under the Apache License, Version 2.0 (the
-// * "License"); you may not use this file except in compliance
-// * with the License.  You may obtain a copy of the License at
-// *
-// *     http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing, software
-// * distributed under the License is distributed on an "AS IS" BASIS,
-// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// * See the License for the specific language governing permissions and
-// * limitations under the License.
-// */
-//
-//package org.apache.flink.runtime.instance;
-//
-//import org.apache.flink.runtime.clusterframework.types.AllocationID;
-//import org.apache.flink.runtime.clusterframework.types.ResourceID;
-//import org.junit.Test;
-//
-//import static org.junit.Assert.assertEquals;
-//import static org.junit.Assert.assertFalse;
-//import static org.junit.Assert.assertNull;
-//import static org.junit.Assert.assertTrue;
-//import static org.mockito.Mockito.mock;
-//import static org.mockito.Mockito.when;
-//
-//public class AllocatedSlotsTest {
-//
-//	@Test
-//	public void testOperations() throws Exception {
-//		SlotPool.AllocatedSlots allocatedSlots = new SlotPool.AllocatedSlots();
-//
-//		final AllocationID allocation1 = new AllocationID();
-//		final ResourceID resource1 = new ResourceID("resource1");
-//		final Slot slot1 = createSlot(resource1);
-//
-//		allocatedSlots.add(allocation1, new SlotDescriptor(slot1), slot1);
-//
-//		assertTrue(allocatedSlots.contains(slot1));
-//		assertTrue(allocatedSlots.containResource(resource1));
-//
-//		assertEquals(slot1, allocatedSlots.get(allocation1));
-//		assertEquals(1, allocatedSlots.getSlotsByResource(resource1).size());
-//		assertEquals(1, allocatedSlots.size());
-//
-//		final AllocationID allocation2 = new AllocationID();
-//		final Slot slot2 = createSlot(resource1);
-//
-//		allocatedSlots.add(allocation2, new SlotDescriptor(slot2), slot2);
-//
-//		assertTrue(allocatedSlots.contains(slot1));
-//		assertTrue(allocatedSlots.contains(slot2));
-//		assertTrue(allocatedSlots.containResource(resource1));
-//
-//		assertEquals(slot1, allocatedSlots.get(allocation1));
-//		assertEquals(slot2, allocatedSlots.get(allocation2));
-//		assertEquals(2, allocatedSlots.getSlotsByResource(resource1).size());
-//		assertEquals(2, allocatedSlots.size());
-//
-//		final AllocationID allocation3 = new AllocationID();
-//		final ResourceID resource2 = new ResourceID("resource2");
-//		final Slot slot3 = createSlot(resource2);
-//
-//		allocatedSlots.add(allocation3, new SlotDescriptor(slot2), slot3);
-//
-//		assertTrue(allocatedSlots.contains(slot1));
-//		assertTrue(allocatedSlots.contains(slot2));
-//		assertTrue(allocatedSlots.contains(slot3));
-//		assertTrue(allocatedSlots.containResource(resource1));
-//		assertTrue(allocatedSlots.containResource(resource2));
-//
-//		assertEquals(slot1, allocatedSlots.get(allocation1));
-//		assertEquals(slot2, allocatedSlots.get(allocation2));
-//		assertEquals(slot3, allocatedSlots.get(allocation3));
-//		assertEquals(2, allocatedSlots.getSlotsByResource(resource1).size());
-//		assertEquals(1, allocatedSlots.getSlotsByResource(resource2).size());
-//		assertEquals(3, allocatedSlots.size());
-//
-//		allocatedSlots.remove(slot2);
-//
-//		assertTrue(allocatedSlots.contains(slot1));
-//		assertFalse(allocatedSlots.contains(slot2));
-//		assertTrue(allocatedSlots.contains(slot3));
-//		assertTrue(allocatedSlots.containResource(resource1));
-//		assertTrue(allocatedSlots.containResource(resource2));
-//
-//		assertEquals(slot1, allocatedSlots.get(allocation1));
-//		assertNull(allocatedSlots.get(allocation2));
-//		assertEquals(slot3, allocatedSlots.get(allocation3));
-//		assertEquals(1, allocatedSlots.getSlotsByResource(resource1).size());
-//		assertEquals(1, allocatedSlots.getSlotsByResource(resource2).size());
-//		assertEquals(2, allocatedSlots.size());
-//
-//		allocatedSlots.remove(slot1);
-//
-//		assertFalse(allocatedSlots.contains(slot1));
-//		assertFalse(allocatedSlots.contains(slot2));
-//		assertTrue(allocatedSlots.contains(slot3));
-//		assertFalse(allocatedSlots.containResource(resource1));
-//		assertTrue(allocatedSlots.containResource(resource2));
-//
-//		assertNull(allocatedSlots.get(allocation1));
-//		assertNull(allocatedSlots.get(allocation2));
-//		assertEquals(slot3, allocatedSlots.get(allocation3));
-//		assertEquals(0, allocatedSlots.getSlotsByResource(resource1).size());
-//		assertEquals(1, allocatedSlots.getSlotsByResource(resource2).size());
-//		assertEquals(1, allocatedSlots.size());
-//
-//		allocatedSlots.remove(slot3);
-//
-//		assertFalse(allocatedSlots.contains(slot1));
-//		assertFalse(allocatedSlots.contains(slot2));
-//		assertFalse(allocatedSlots.contains(slot3));
-//		assertFalse(allocatedSlots.containResource(resource1));
-//		assertFalse(allocatedSlots.containResource(resource2));
-//
-//		assertNull(allocatedSlots.get(allocation1));
-//		assertNull(allocatedSlots.get(allocation2));
-//		assertNull(allocatedSlots.get(allocation3));
-//		assertEquals(0, allocatedSlots.getSlotsByResource(resource1).size());
-//		assertEquals(0, allocatedSlots.getSlotsByResource(resource2).size());
-//		assertEquals(0, allocatedSlots.size());
-//	}
-//
-//	private Slot createSlot(final ResourceID resourceId) {
-//		Slot slot = mock(Slot.class);
-//		when(slot.getTaskManagerID()).thenReturn(resourceId);
-//		return slot;
-//	}
-//}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.instance;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class AllocatedSlotsTest {
+
+	@Test
+	public void testOperations() throws Exception {
+		SlotPool.AllocatedSlots allocatedSlots = new SlotPool.AllocatedSlots();
+
+		final AllocationID allocation1 = new AllocationID();
+		final ResourceID resource1 = new ResourceID("resource1");
+		final Slot slot1 = createSlot(resource1, allocation1);
+
+		allocatedSlots.add(slot1);
+
+		assertTrue(allocatedSlots.contains(slot1.getAllocatedSlot().getSlotAllocationId()));
+		assertTrue(allocatedSlots.containResource(resource1));
+
+		assertEquals(slot1, allocatedSlots.get(allocation1));
+		assertEquals(1, allocatedSlots.getSlotsForTaskManager(resource1).size());
+		assertEquals(1, allocatedSlots.size());
+
+		final AllocationID allocation2 = new AllocationID();
+		final Slot slot2 = createSlot(resource1, allocation2);
+
+		allocatedSlots.add(slot2);
+
+		assertTrue(allocatedSlots.contains(slot1.getAllocatedSlot().getSlotAllocationId()));
+		assertTrue(allocatedSlots.contains(slot2.getAllocatedSlot().getSlotAllocationId()));
+		assertTrue(allocatedSlots.containResource(resource1));
+
+		assertEquals(slot1, allocatedSlots.get(allocation1));
+		assertEquals(slot2, allocatedSlots.get(allocation2));
+		assertEquals(2, allocatedSlots.getSlotsForTaskManager(resource1).size());
+		assertEquals(2, allocatedSlots.size());
+
+		final AllocationID allocation3 = new AllocationID();
+		final ResourceID resource2 = new ResourceID("resource2");
+		final Slot slot3 = createSlot(resource2, allocation3);
+
+		allocatedSlots.add(slot3);
+
+		assertTrue(allocatedSlots.contains(slot1.getAllocatedSlot().getSlotAllocationId()));
+		assertTrue(allocatedSlots.contains(slot2.getAllocatedSlot().getSlotAllocationId()));
+		assertTrue(allocatedSlots.contains(slot3.getAllocatedSlot().getSlotAllocationId()));
+		assertTrue(allocatedSlots.containResource(resource1));
+		assertTrue(allocatedSlots.containResource(resource2));
+
+		assertEquals(slot1, allocatedSlots.get(allocation1));
+		assertEquals(slot2, allocatedSlots.get(allocation2));
+		assertEquals(slot3, allocatedSlots.get(allocation3));
+		assertEquals(2, allocatedSlots.getSlotsForTaskManager(resource1).size());
+		assertEquals(1, allocatedSlots.getSlotsForTaskManager(resource2).size());
+		assertEquals(3, allocatedSlots.size());
+
+		allocatedSlots.remove(slot2);
+
+		assertTrue(allocatedSlots.contains(slot1.getAllocatedSlot().getSlotAllocationId()));
+		assertFalse(allocatedSlots.contains(slot2.getAllocatedSlot().getSlotAllocationId()));
+		assertTrue(allocatedSlots.contains(slot3.getAllocatedSlot().getSlotAllocationId()));
+		assertTrue(allocatedSlots.containResource(resource1));
+		assertTrue(allocatedSlots.containResource(resource2));
+
+		assertEquals(slot1, allocatedSlots.get(allocation1));
+		assertNull(allocatedSlots.get(allocation2));
+		assertEquals(slot3, allocatedSlots.get(allocation3));
+		assertEquals(1, allocatedSlots.getSlotsForTaskManager(resource1).size());
+		assertEquals(1, allocatedSlots.getSlotsForTaskManager(resource2).size());
+		assertEquals(2, allocatedSlots.size());
+
+		allocatedSlots.remove(slot1);
+
+		assertFalse(allocatedSlots.contains(slot1.getAllocatedSlot().getSlotAllocationId()));
+		assertFalse(allocatedSlots.contains(slot2.getAllocatedSlot().getSlotAllocationId()));
+		assertTrue(allocatedSlots.contains(slot3.getAllocatedSlot().getSlotAllocationId()));
+		assertFalse(allocatedSlots.containResource(resource1));
+		assertTrue(allocatedSlots.containResource(resource2));
+
+		assertNull(allocatedSlots.get(allocation1));
+		assertNull(allocatedSlots.get(allocation2));
+		assertEquals(slot3, allocatedSlots.get(allocation3));
+		assertEquals(0, allocatedSlots.getSlotsForTaskManager(resource1).size());
+		assertEquals(1, allocatedSlots.getSlotsForTaskManager(resource2).size());
+		assertEquals(1, allocatedSlots.size());
+
+		allocatedSlots.remove(slot3);
+
+		assertFalse(allocatedSlots.contains(slot1.getAllocatedSlot().getSlotAllocationId()));
+		assertFalse(allocatedSlots.contains(slot2.getAllocatedSlot().getSlotAllocationId()));
+		assertFalse(allocatedSlots.contains(slot3.getAllocatedSlot().getSlotAllocationId()));
+		assertFalse(allocatedSlots.containResource(resource1));
+		assertFalse(allocatedSlots.containResource(resource2));
+
+		assertNull(allocatedSlots.get(allocation1));
+		assertNull(allocatedSlots.get(allocation2));
+		assertNull(allocatedSlots.get(allocation3));
+		assertEquals(0, allocatedSlots.getSlotsForTaskManager(resource1).size());
+		assertEquals(0, allocatedSlots.getSlotsForTaskManager(resource2).size());
+		assertEquals(0, allocatedSlots.size());
+	}
+
+	private Slot createSlot(final ResourceID resourceId, final AllocationID allocationId) {
+		AllocatedSlot mockAllocatedSlot = mock(AllocatedSlot.class);
+		Slot slot = mock(Slot.class);
+		when(slot.getTaskManagerID()).thenReturn(resourceId);
+		when(slot.getAllocatedSlot()).thenReturn(mockAllocatedSlot);
+
+		when(mockAllocatedSlot.getSlotAllocationId()).thenReturn(allocationId);
+		return slot;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4129de28/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AvailableSlotsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AvailableSlotsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AvailableSlotsTest.java
index 4d58a31..4ed88c4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AvailableSlotsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AvailableSlotsTest.java
@@ -1,123 +1,121 @@
-///*
-// * Licensed to the Apache Software Foundation (ASF) under one
-// * or more contributor license agreements.  See the NOTICE file
-// * distributed with this work for additional information
-// * regarding copyright ownership.  The ASF licenses this file
-// * to you under the Apache License, Version 2.0 (the
-// * "License"); you may not use this file except in compliance
-// * with the License.  You may obtain a copy of the License at
-// *
-// *     http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing, software
-// * distributed under the License is distributed on an "AS IS" BASIS,
-// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// * See the License for the specific language governing permissions and
-// * limitations under the License.
-// */
-//
-//package org.apache.flink.runtime.instance;
-//
-//import org.apache.flink.api.common.JobID;
-//import org.apache.flink.runtime.clusterframework.types.ResourceID;
-//import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-//import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-//import org.junit.Test;
-//
-//import static org.junit.Assert.assertEquals;
-//import static org.junit.Assert.assertFalse;
-//import static org.junit.Assert.assertNull;
-//import static org.junit.Assert.assertTrue;
-//import static org.mockito.Mockito.mock;
-//import static org.mockito.Mockito.when;
-//
-//public class AvailableSlotsTest {
-//
-//	static final ResourceProfile DEFAULT_TESTING_PROFILE = new ResourceProfile(1.0, 512);
-//
-//	static final ResourceProfile DEFAULT_TESTING_BIG_PROFILE = new ResourceProfile(2.0, 1024);
-//
-//	@Test
-//	public void testAddAndRemove() throws Exception {
-//		SlotPool.AvailableSlots availableSlots = new SlotPool.AvailableSlots();
-//
-//		final ResourceID resource1 = new ResourceID("resource1");
-//		final ResourceID resource2 = new ResourceID("resource2");
-//
-//		final SlotDescriptor slot1 = createSlotDescriptor(resource1);
-//		final SlotDescriptor slot2 = createSlotDescriptor(resource1);
-//		final SlotDescriptor slot3 = createSlotDescriptor(resource2);
-//
-//		availableSlots.add(slot1);
-//		availableSlots.add(slot2);
-//		availableSlots.add(slot3);
-//
-//		assertEquals(3, availableSlots.size());
-//		assertTrue(availableSlots.contains(slot1));
-//		assertTrue(availableSlots.contains(slot2));
-//		assertTrue(availableSlots.contains(slot3));
-//		assertTrue(availableSlots.containResource(resource1));
-//		assertTrue(availableSlots.containResource(resource2));
-//
-//		availableSlots.removeByResource(resource1);
-//
-//		assertEquals(1, availableSlots.size());
-//		assertFalse(availableSlots.contains(slot1));
-//		assertFalse(availableSlots.contains(slot2));
-//		assertTrue(availableSlots.contains(slot3));
-//		assertFalse(availableSlots.containResource(resource1));
-//		assertTrue(availableSlots.containResource(resource2));
-//
-//		availableSlots.removeByResource(resource2);
-//
-//		assertEquals(0, availableSlots.size());
-//		assertFalse(availableSlots.contains(slot1));
-//		assertFalse(availableSlots.contains(slot2));
-//		assertFalse(availableSlots.contains(slot3));
-//		assertFalse(availableSlots.containResource(resource1));
-//		assertFalse(availableSlots.containResource(resource2));
-//	}
-//
-//	@Test
-//	public void testPollFreeSlot() {
-//		SlotPool.AvailableSlots availableSlots = new SlotPool.AvailableSlots();
-//
-//		final ResourceID resource1 = new ResourceID("resource1");
-//		final SlotDescriptor slot1 = createSlotDescriptor(resource1);
-//
-//		availableSlots.add(slot1);
-//
-//		assertEquals(1, availableSlots.size());
-//		assertTrue(availableSlots.contains(slot1));
-//		assertTrue(availableSlots.containResource(resource1));
-//
-//		assertNull(availableSlots.poll(DEFAULT_TESTING_BIG_PROFILE));
-//
-//		assertEquals(slot1, availableSlots.poll(DEFAULT_TESTING_PROFILE));
-//		assertEquals(0, availableSlots.size());
-//		assertFalse(availableSlots.contains(slot1));
-//		assertFalse(availableSlots.containResource(resource1));
-//	}
-//
-//	static SlotDescriptor createSlotDescriptor(final ResourceID resourceID) {
-//		return createSlotDescriptor(resourceID, new JobID());
-//	}
-//
-//	static SlotDescriptor createSlotDescriptor(final ResourceID resourceID, final JobID jobID) {
-//		return createSlotDescriptor(resourceID, jobID, DEFAULT_TESTING_PROFILE);
-//	}
-//
-//	static SlotDescriptor createSlotDescriptor(final ResourceID resourceID, final JobID jobID,
-//		final ResourceProfile resourceProfile)
-//	{
-//		return createSlotDescriptor(resourceID, jobID, resourceProfile, 0);
-//	}
-//
-//	static SlotDescriptor createSlotDescriptor(final ResourceID resourceID, final JobID jobID,
-//		final ResourceProfile resourceProfile, final int slotNumber)
-//	{
-//		TaskManagerLocation location = mock(TaskManagerLocation.class);
-//		when(location.getResourceID()).thenReturn(resourceID);
-//		return new SlotDescriptor(jobID, location, slotNumber, resourceProfile, mock(ActorGateway.class));
-//	}
-//}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.instance;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
+import org.apache.flink.runtime.jobmanager.slots.SlotAndLocality;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class AvailableSlotsTest {
+
+	static final ResourceProfile DEFAULT_TESTING_PROFILE = new ResourceProfile(1.0, 512);
+
+	static final ResourceProfile DEFAULT_TESTING_BIG_PROFILE = new ResourceProfile(2.0, 1024);
+
+	@Test
+	public void testAddAndRemove() throws Exception {
+		SlotPool.AvailableSlots availableSlots = new SlotPool.AvailableSlots();
+
+		final ResourceID resource1 = new ResourceID("resource1");
+		final ResourceID resource2 = new ResourceID("resource2");
+
+		final AllocatedSlot slot1 = createAllocatedSlot(resource1);
+		final AllocatedSlot slot2 = createAllocatedSlot(resource1);
+		final AllocatedSlot slot3 = createAllocatedSlot(resource2);
+
+		availableSlots.add(slot1, 1L);
+		availableSlots.add(slot2, 2L);
+		availableSlots.add(slot3, 3L);
+
+		assertEquals(3, availableSlots.size());
+		assertTrue(availableSlots.contains(slot1.getSlotAllocationId()));
+		assertTrue(availableSlots.contains(slot2.getSlotAllocationId()));
+		assertTrue(availableSlots.contains(slot3.getSlotAllocationId()));
+		assertTrue(availableSlots.containsTaskManager(resource1));
+		assertTrue(availableSlots.containsTaskManager(resource2));
+
+		availableSlots.removeAllForTaskManager(resource1);
+
+		assertEquals(1, availableSlots.size());
+		assertFalse(availableSlots.contains(slot1.getSlotAllocationId()));
+		assertFalse(availableSlots.contains(slot2.getSlotAllocationId()));
+		assertTrue(availableSlots.contains(slot3.getSlotAllocationId()));
+		assertFalse(availableSlots.containsTaskManager(resource1));
+		assertTrue(availableSlots.containsTaskManager(resource2));
+
+		availableSlots.removeAllForTaskManager(resource2);
+
+		assertEquals(0, availableSlots.size());
+		assertFalse(availableSlots.contains(slot1.getSlotAllocationId()));
+		assertFalse(availableSlots.contains(slot2.getSlotAllocationId()));
+		assertFalse(availableSlots.contains(slot3.getSlotAllocationId()));
+		assertFalse(availableSlots.containsTaskManager(resource1));
+		assertFalse(availableSlots.containsTaskManager(resource2));
+	}
+
+	@Test
+	public void testPollFreeSlot() {
+		SlotPool.AvailableSlots availableSlots = new SlotPool.AvailableSlots();
+
+		final ResourceID resource1 = new ResourceID("resource1");
+		final AllocatedSlot slot1 = createAllocatedSlot(resource1);
+
+		availableSlots.add(slot1, 1L);
+
+		assertEquals(1, availableSlots.size());
+		assertTrue(availableSlots.contains(slot1.getSlotAllocationId()));
+		assertTrue(availableSlots.containsTaskManager(resource1));
+
+		assertNull(availableSlots.poll(DEFAULT_TESTING_BIG_PROFILE, null));
+
+		SlotAndLocality slotAndLocality = availableSlots.poll(DEFAULT_TESTING_PROFILE, null);
+		assertEquals(slot1, slotAndLocality.slot());
+		assertEquals(0, availableSlots.size());
+		assertFalse(availableSlots.contains(slot1.getSlotAllocationId()));
+		assertFalse(availableSlots.containsTaskManager(resource1));
+	}
+
+	static AllocatedSlot createAllocatedSlot(final ResourceID resourceId) {
+		TaskManagerLocation mockTaskManagerLocation = mock(TaskManagerLocation.class);
+		when(mockTaskManagerLocation.getResourceID()).thenReturn(resourceId);
+
+		TaskManagerGateway mockTaskManagerGateway = mock(TaskManagerGateway.class);
+
+		return new AllocatedSlot(
+			new AllocationID(),
+			new JobID(),
+			mockTaskManagerLocation,
+			0,
+			DEFAULT_TESTING_PROFILE,
+			mockTaskManagerGateway);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4129de28/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
index cc1d194..5fa7af3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
@@ -1,299 +1,309 @@
-///*
-// * Licensed to the Apache Software Foundation (ASF) under one
-// * or more contributor license agreements.  See the NOTICE file
-// * distributed with this work for additional information
-// * regarding copyright ownership.  The ASF licenses this file
-// * to you under the Apache License, Version 2.0 (the
-// * "License"); you may not use this file except in compliance
-// * with the License.  You may obtain a copy of the License at
-// *
-// *     http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing, software
-// * distributed under the License is distributed on an "AS IS" BASIS,
-// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// * See the License for the specific language governing permissions and
-// * limitations under the License.
-// */
-//
-//package org.apache.flink.runtime.instance;
-//
-//import org.apache.flink.api.common.JobID;
-//import org.apache.flink.api.common.time.Time;
-//import org.apache.flink.runtime.clusterframework.types.AllocationID;
-//import org.apache.flink.runtime.clusterframework.types.ResourceID;
-//import org.apache.flink.runtime.concurrent.BiFunction;
-//import org.apache.flink.runtime.concurrent.Future;
-//import org.apache.flink.runtime.jobgraph.JobVertexID;
-//import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
-//import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
-//import org.apache.flink.runtime.resourcemanager.SlotRequest;
-//import org.apache.flink.util.TestLogger;
-//import org.junit.After;
-//import org.junit.Before;
-//import org.junit.Test;
-//
-//import java.util.UUID;
-//import java.util.concurrent.ExecutionException;
-//import java.util.concurrent.Executor;
-//import java.util.concurrent.ExecutorService;
-//import java.util.concurrent.Executors;
-//import java.util.concurrent.TimeUnit;
-//
-//import static org.apache.flink.runtime.instance.AvailableSlotsTest.DEFAULT_TESTING_PROFILE;
-//import static org.apache.flink.runtime.instance.AvailableSlotsTest.createSlotDescriptor;
-//import static org.junit.Assert.assertEquals;
-//import static org.junit.Assert.assertFalse;
-//import static org.junit.Assert.assertNotEquals;
-//import static org.junit.Assert.assertNotNull;
-//import static org.junit.Assert.assertNull;
-//import static org.junit.Assert.assertTrue;
-//import static org.junit.Assert.fail;
-//import static org.mockito.Matchers.any;
-//import static org.mockito.Mockito.mock;
-//import static org.mockito.Mockito.times;
-//import static org.mockito.Mockito.verify;
-//import static org.mockito.Mockito.when;
-//
-//public class SlotPoolTest extends TestLogger {
-//
-//	private ExecutorService executor;
-//
-//	private SlotPool slotPool;
-//
-//	private ResourceManagerGateway resourceManagerGateway;
-//
-//	@Before
-//	public void setUp() throws Exception {
-//		this.executor = Executors.newFixedThreadPool(1);
-//		this.slotPool = new SlotPool(executor);
-//		this.resourceManagerGateway = mock(ResourceManagerGateway.class);
-//		when(resourceManagerGateway
-//			.requestSlot(any(UUID.class), any(UUID.class), any(SlotRequest.class), any(Time.class)))
-//			.thenReturn(mock(Future.class));
-//
-//		slotPool.setResourceManager(UUID.randomUUID(), resourceManagerGateway);
-//		slotPool.setJobManagerLeaderId(UUID.randomUUID());
-//	}
-//
-//	@After
-//	public void tearDown() throws Exception {
-//	}
-//
-//	@Test
-//	public void testAllocateSimpleSlot() throws Exception {
-//		ResourceID resourceID = new ResourceID("resource");
-//		slotPool.registerResource(resourceID);
-//
-//		JobID jobID = new JobID();
-//		AllocationID allocationID = new AllocationID();
-//		Future<SimpleSlot> future = slotPool.allocateSimpleSlot(jobID, DEFAULT_TESTING_PROFILE, allocationID);
-//		assertFalse(future.isDone());
-//		verify(resourceManagerGateway).requestSlot(any(UUID.class), any(UUID.class), any(SlotRequest.class), any(Time.class));
-//
-//		SlotDescriptor slotDescriptor = createSlotDescriptor(resourceID, jobID, DEFAULT_TESTING_PROFILE);
-//		assertTrue(slotPool.offerSlot(allocationID, slotDescriptor));
-//
-//		SimpleSlot slot = future.get(1, TimeUnit.SECONDS);
-//		assertTrue(future.isDone());
-//		assertTrue(slot.isAlive());
-//		assertEquals(resourceID, slot.getTaskManagerID());
-//		assertEquals(jobID, slot.getJobID());
-//		assertEquals(slotPool, slot.getOwner());
-//	}
-//
-//	@Test
-//	public void testAllocateSharedSlot() throws Exception {
-//		ResourceID resourceID = new ResourceID("resource");
-//		slotPool.registerResource(resourceID);
-//
-//		JobVertexID vid = new JobVertexID();
-//		SlotSharingGroup sharingGroup = new SlotSharingGroup(vid);
-//		SlotSharingGroupAssignment assignment = sharingGroup.getTaskAssignment();
-//
-//		JobID jobID = new JobID();
-//		AllocationID allocationID = new AllocationID();
-//		Future<SharedSlot> future = slotPool.allocateSharedSlot(jobID, DEFAULT_TESTING_PROFILE, assignment, allocationID);
-//
-//		assertFalse(future.isDone());
-//		verify(resourceManagerGateway).requestSlot(any(UUID.class), any(UUID.class), any(SlotRequest.class), any(Time.class));
-//
-//		SlotDescriptor slotDescriptor = createSlotDescriptor(resourceID, jobID, DEFAULT_TESTING_PROFILE);
-//		assertTrue(slotPool.offerSlot(allocationID, slotDescriptor));
-//
-//		SharedSlot slot = future.get(1, TimeUnit.SECONDS);
-//		assertTrue(future.isDone());
-//		assertTrue(slot.isAlive());
-//		assertEquals(resourceID, slot.getTaskManagerID());
-//		assertEquals(jobID, slot.getJobID());
-//		assertEquals(slotPool, slot.getOwner());
-//
-//		SimpleSlot simpleSlot = slot.allocateSubSlot(vid);
-//		assertNotNull(simpleSlot);
-//		assertTrue(simpleSlot.isAlive());
-//	}
-//
-//	@Test
-//	public void testAllocateSlotWithoutResourceManager() throws Exception {
-//		slotPool.disconnectResourceManager();
-//		Future<SimpleSlot> future = slotPool.allocateSimpleSlot(new JobID(), DEFAULT_TESTING_PROFILE);
-//		future.handleAsync(
-//			new BiFunction<SimpleSlot, Throwable, Void>() {
-//				@Override
-//				public Void apply(SimpleSlot simpleSlot, Throwable throwable) {
-//					assertNull(simpleSlot);
-//					assertNotNull(throwable);
-//					return null;
-//				}
-//			},
-//			executor);
-//		try {
-//			future.get(1, TimeUnit.SECONDS);
-//			fail("We expected a ExecutionException.");
-//		} catch (ExecutionException ex) {
-//			// we expect the exception
-//		}
-//	}
-//
-//	@Test
-//	public void testAllocationFulfilledByReturnedSlot() throws Exception {
-//		ResourceID resourceID = new ResourceID("resource");
-//		slotPool.registerResource(resourceID);
-//
-//		JobID jobID = new JobID();
-//
-//		AllocationID allocationID1 = new AllocationID();
-//		Future<SimpleSlot> future1 = slotPool.allocateSimpleSlot(jobID, DEFAULT_TESTING_PROFILE, allocationID1);
-//
-//		AllocationID allocationID2 = new AllocationID();
-//		Future<SimpleSlot> future2 = slotPool.allocateSimpleSlot(jobID, DEFAULT_TESTING_PROFILE, allocationID2);
-//
-//		assertFalse(future1.isDone());
-//		assertFalse(future2.isDone());
-//		verify(resourceManagerGateway, times(2))
-//			.requestSlot(any(UUID.class), any(UUID.class), any(SlotRequest.class), any(Time.class));
-//
-//		SlotDescriptor slotDescriptor = createSlotDescriptor(resourceID, jobID, DEFAULT_TESTING_PROFILE);
-//		assertTrue(slotPool.offerSlot(allocationID1, slotDescriptor));
-//
-//		SimpleSlot slot1 = future1.get(1, TimeUnit.SECONDS);
-//		assertTrue(future1.isDone());
-//		assertFalse(future2.isDone());
-//
-//		// return this slot to pool
-//		slot1.releaseSlot();
-//
-//		// second allocation fulfilled by previous slot returning
-//		SimpleSlot slot2 = future2.get(1, TimeUnit.SECONDS);
-//		assertTrue(future2.isDone());
-//
-//		assertNotEquals(slot1, slot2);
-//		assertTrue(slot1.isReleased());
-//		assertTrue(slot2.isAlive());
-//		assertEquals(slot1.getTaskManagerID(), slot2.getTaskManagerID());
-//		assertEquals(slot1.getSlotNumber(), slot2.getSlotNumber());
-//	}
-//
-//	@Test
-//	public void testAllocateWithFreeSlot() throws Exception {
-//		ResourceID resourceID = new ResourceID("resource");
-//		slotPool.registerResource(resourceID);
-//
-//		JobID jobID = new JobID();
-//		AllocationID allocationID1 = new AllocationID();
-//		Future<SimpleSlot> future1 = slotPool.allocateSimpleSlot(jobID, DEFAULT_TESTING_PROFILE, allocationID1);
-//		assertFalse(future1.isDone());
-//
-//		SlotDescriptor slotDescriptor = createSlotDescriptor(resourceID, jobID, DEFAULT_TESTING_PROFILE);
-//		assertTrue(slotPool.offerSlot(allocationID1, slotDescriptor));
-//
-//		SimpleSlot slot1 = future1.get(1, TimeUnit.SECONDS);
-//		assertTrue(future1.isDone());
-//
-//		// return this slot to pool
-//		slot1.releaseSlot();
-//
-//		AllocationID allocationID2 = new AllocationID();
-//		Future<SimpleSlot> future2 = slotPool.allocateSimpleSlot(jobID, DEFAULT_TESTING_PROFILE, allocationID2);
-//
-//		// second allocation fulfilled by previous slot returning
-//		SimpleSlot slot2 = future2.get(1, TimeUnit.SECONDS);
-//		assertTrue(future2.isDone());
-//
-//		assertNotEquals(slot1, slot2);
-//		assertTrue(slot1.isReleased());
-//		assertTrue(slot2.isAlive());
-//		assertEquals(slot1.getTaskManagerID(), slot2.getTaskManagerID());
-//		assertEquals(slot1.getSlotNumber(), slot2.getSlotNumber());
-//	}
-//
-//	@Test
-//	public void testOfferSlot() throws Exception {
-//		ResourceID resourceID = new ResourceID("resource");
-//		slotPool.registerResource(resourceID);
-//
-//		JobID jobID = new JobID();
-//		AllocationID allocationID = new AllocationID();
-//		Future<SimpleSlot> future = slotPool.allocateSimpleSlot(jobID, DEFAULT_TESTING_PROFILE, allocationID);
-//		assertFalse(future.isDone());
-//		verify(resourceManagerGateway).requestSlot(any(UUID.class), any(UUID.class), any(SlotRequest.class), any(Time.class));
-//
-//		// slot from unregistered resource
-//		SlotDescriptor invalid = createSlotDescriptor(new ResourceID("unregistered"), jobID, DEFAULT_TESTING_PROFILE);
-//		assertFalse(slotPool.offerSlot(allocationID, invalid));
-//
-//		SlotDescriptor slotDescriptor = createSlotDescriptor(resourceID, jobID, DEFAULT_TESTING_PROFILE);
-//
-//		// reject offering with mismatch allocation id
-//		assertFalse(slotPool.offerSlot(new AllocationID(), slotDescriptor));
-//
-//		// accepted slot
-//		assertTrue(slotPool.offerSlot(allocationID, slotDescriptor));
-//		SimpleSlot slot = future.get(1, TimeUnit.SECONDS);
-//		assertTrue(future.isDone());
-//		assertTrue(slot.isAlive());
-//
-//		// conflict offer with using slot
-//		SlotDescriptor conflict = createSlotDescriptor(resourceID, jobID, DEFAULT_TESTING_PROFILE);
-//		assertFalse(slotPool.offerSlot(allocationID, conflict));
-//
-//		// duplicated offer with using slot
-//		assertTrue(slotPool.offerSlot(allocationID, slotDescriptor));
-//		assertTrue(future.isDone());
-//		assertTrue(slot.isAlive());
-//
-//		// duplicated offer with free slot
-//		slot.releaseSlot();
-//		assertTrue(slot.isReleased());
-//		assertTrue(slotPool.offerSlot(allocationID, slotDescriptor));
-//	}
-//
-//	@Test
-//	public void testReleaseResource() throws Exception {
-//		ResourceID resourceID = new ResourceID("resource");
-//		slotPool.registerResource(resourceID);
-//
-//		JobID jobID = new JobID();
-//
-//		AllocationID allocationID1 = new AllocationID();
-//		Future<SimpleSlot> future1 = slotPool.allocateSimpleSlot(jobID, DEFAULT_TESTING_PROFILE, allocationID1);
-//
-//		AllocationID allocationID2 = new AllocationID();
-//		Future<SimpleSlot> future2 = slotPool.allocateSimpleSlot(jobID, DEFAULT_TESTING_PROFILE, allocationID2);
-//
-//		SlotDescriptor slotDescriptor = createSlotDescriptor(resourceID, jobID, DEFAULT_TESTING_PROFILE);
-//		assertTrue(slotPool.offerSlot(allocationID1, slotDescriptor));
-//
-//		SimpleSlot slot1 = future1.get(1, TimeUnit.SECONDS);
-//		assertTrue(future1.isDone());
-//		assertFalse(future2.isDone());
-//
-//		slotPool.releaseResource(resourceID);
-//		assertTrue(slot1.isReleased());
-//
-//		// slot released and not usable, second allocation still not fulfilled
-//		Thread.sleep(10);
-//		assertFalse(future2.isDone());
-//	}
-//
-//}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.instance;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.BiFunction;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
+import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.rpc.MainThreadValidatorUtil;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.TestingSerialRpcService;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.TestLogger;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.runtime.instance.AvailableSlotsTest.DEFAULT_TESTING_PROFILE;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class SlotPoolTest extends TestLogger {
+
+	private RpcService rpcService;
+
+	private JobID jobId;
+
+	private MainThreadValidatorUtil mainThreadValidatorUtil;
+
+	private SlotPool slotPool;
+
+	private ResourceManagerGateway resourceManagerGateway;
+
+	@Before
+	public void setUp() throws Exception {
+
+		this.rpcService = new TestingSerialRpcService();
+		this.jobId = new JobID();
+		this.slotPool = new SlotPool(rpcService, jobId);
+
+		this.mainThreadValidatorUtil = new MainThreadValidatorUtil(slotPool);
+
+		mainThreadValidatorUtil.enterMainThread();
+
+		slotPool.start(UUID.randomUUID());
+
+		this.resourceManagerGateway = mock(ResourceManagerGateway.class);
+		when(resourceManagerGateway
+			.requestSlot(any(UUID.class), any(UUID.class), any(SlotRequest.class), any(Time.class)))
+			.thenReturn(mock(Future.class));
+
+		slotPool.connectToResourceManager(UUID.randomUUID(), resourceManagerGateway);
+	}
+
+	@After
+	public void tearDown() throws Exception {
+		mainThreadValidatorUtil.exitMainThread();
+	}
+
+	@Test
+	public void testAllocateSimpleSlot() throws Exception {
+		ResourceID resourceID = new ResourceID("resource");
+		slotPool.registerTaskManager(resourceID);
+
+		ScheduledUnit task = mock(ScheduledUnit.class);
+		Future<SimpleSlot> future = slotPool.allocateSlot(task, DEFAULT_TESTING_PROFILE, null);
+		assertFalse(future.isDone());
+
+		ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class);
+		verify(resourceManagerGateway).requestSlot(any(UUID.class), any(UUID.class), slotRequestArgumentCaptor.capture(), any(Time.class));
+
+		final SlotRequest slotRequest = slotRequestArgumentCaptor.getValue();
+
+		AllocatedSlot allocatedSlot = createAllocatedSlot(resourceID, slotRequest.getAllocationId(), jobId, DEFAULT_TESTING_PROFILE);
+		assertTrue(slotPool.offerSlot(allocatedSlot));
+
+		SimpleSlot slot = future.get(1, TimeUnit.SECONDS);
+		assertTrue(future.isDone());
+		assertTrue(slot.isAlive());
+		assertEquals(resourceID, slot.getTaskManagerID());
+		assertEquals(jobId, slot.getJobID());
+		assertEquals(slotPool.getSlotOwner(), slot.getOwner());
+	}
+
+	@Test
+	public void testAllocateSlotWithoutResourceManager() throws Exception {
+		slotPool.disconnectResourceManager();
+		Future<SimpleSlot> future = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null);
+		future.handleAsync(
+			new BiFunction<SimpleSlot, Throwable, Void>() {
+				@Override
+				public Void apply(SimpleSlot simpleSlot, Throwable throwable) {
+					assertNull(simpleSlot);
+					assertNotNull(throwable);
+					return null;
+				}
+			},
+			rpcService.getExecutor());
+		try {
+			future.get(1, TimeUnit.SECONDS);
+			fail("We expected a ExecutionException.");
+		} catch (ExecutionException ex) {
+			// we expect the exception
+		}
+	}
+
+	@Test
+	public void testAllocationFulfilledByReturnedSlot() throws Exception {
+		ResourceID resourceID = new ResourceID("resource");
+		slotPool.registerTaskManager(resourceID);
+
+		Future<SimpleSlot> future1 = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null);
+		Future<SimpleSlot> future2 = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null);
+
+		assertFalse(future1.isDone());
+		assertFalse(future2.isDone());
+
+		ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class);
+		verify(resourceManagerGateway, times(2))
+			.requestSlot(any(UUID.class), any(UUID.class), slotRequestArgumentCaptor.capture(), any(Time.class));
+
+		final List<SlotRequest> slotRequests = slotRequestArgumentCaptor.getAllValues();
+
+		AllocatedSlot allocatedSlot = createAllocatedSlot(resourceID, slotRequests.get(0).getAllocationId(), jobId, DEFAULT_TESTING_PROFILE);
+		assertTrue(slotPool.offerSlot(allocatedSlot));
+
+		SimpleSlot slot1 = future1.get(1, TimeUnit.SECONDS);
+		assertTrue(future1.isDone());
+		assertFalse(future2.isDone());
+
+		// return this slot to pool
+		slot1.releaseSlot();
+
+		// second allocation fulfilled by previous slot returning
+		SimpleSlot slot2 = future2.get(1, TimeUnit.SECONDS);
+		assertTrue(future2.isDone());
+
+		assertNotEquals(slot1, slot2);
+		assertTrue(slot1.isReleased());
+		assertTrue(slot2.isAlive());
+		assertEquals(slot1.getTaskManagerID(), slot2.getTaskManagerID());
+		assertEquals(slot1.getSlotNumber(), slot2.getSlotNumber());
+	}
+
+	@Test
+	public void testAllocateWithFreeSlot() throws Exception {
+		ResourceID resourceID = new ResourceID("resource");
+		slotPool.registerTaskManager(resourceID);
+
+		Future<SimpleSlot> future1 = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null);
+		assertFalse(future1.isDone());
+
+		ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class);
+		verify(resourceManagerGateway).requestSlot(any(UUID.class), any(UUID.class), slotRequestArgumentCaptor.capture(), any(Time.class));
+
+		final SlotRequest slotRequest = slotRequestArgumentCaptor.getValue();
+
+		AllocatedSlot allocatedSlot = createAllocatedSlot(resourceID, slotRequest.getAllocationId(), jobId, DEFAULT_TESTING_PROFILE);
+		assertTrue(slotPool.offerSlot(allocatedSlot));
+
+		SimpleSlot slot1 = future1.get(1, TimeUnit.SECONDS);
+		assertTrue(future1.isDone());
+
+		// return this slot to pool
+		slot1.releaseSlot();
+
+		Future<SimpleSlot> future2 = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null);
+
+		// second allocation fulfilled by previous slot returning
+		SimpleSlot slot2 = future2.get(1, TimeUnit.SECONDS);
+		assertTrue(future2.isDone());
+
+		assertNotEquals(slot1, slot2);
+		assertTrue(slot1.isReleased());
+		assertTrue(slot2.isAlive());
+		assertEquals(slot1.getTaskManagerID(), slot2.getTaskManagerID());
+		assertEquals(slot1.getSlotNumber(), slot2.getSlotNumber());
+	}
+
+	@Test
+	public void testOfferSlot() throws Exception {
+		ResourceID resourceID = new ResourceID("resource");
+		slotPool.registerTaskManager(resourceID);
+
+		Future<SimpleSlot> future = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null);
+		assertFalse(future.isDone());
+
+		ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class);
+		verify(resourceManagerGateway).requestSlot(any(UUID.class), any(UUID.class), slotRequestArgumentCaptor.capture(), any(Time.class));
+
+		final SlotRequest slotRequest = slotRequestArgumentCaptor.getValue();
+
+		// slot from unregistered resource
+		AllocatedSlot invalid = createAllocatedSlot(new ResourceID("unregistered"), slotRequest.getAllocationId(), jobId, DEFAULT_TESTING_PROFILE);
+		assertFalse(slotPool.offerSlot(invalid));
+
+		AllocatedSlot notRequested = createAllocatedSlot(resourceID, new AllocationID(), jobId, DEFAULT_TESTING_PROFILE);
+
+		// we'll also accept non requested slots
+		assertTrue(slotPool.offerSlot(notRequested));
+
+		AllocatedSlot allocatedSlot = createAllocatedSlot(resourceID, slotRequest.getAllocationId(), jobId, DEFAULT_TESTING_PROFILE);
+
+		// accepted slot
+		assertTrue(slotPool.offerSlot(allocatedSlot));
+		SimpleSlot slot = future.get(1, TimeUnit.SECONDS);
+		assertTrue(future.isDone());
+		assertTrue(slot.isAlive());
+
+		// duplicated offer with using slot
+		assertTrue(slotPool.offerSlot(allocatedSlot));
+		assertTrue(future.isDone());
+		assertTrue(slot.isAlive());
+
+		// duplicated offer with free slot
+		slot.releaseSlot();
+		assertTrue(slot.isReleased());
+		assertTrue(slotPool.offerSlot(allocatedSlot));
+	}
+
+	@Test
+	public void testReleaseResource() throws Exception {
+		ResourceID resourceID = new ResourceID("resource");
+		slotPool.registerTaskManager(resourceID);
+
+		Future<SimpleSlot> future1 = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null);
+
+		ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class);
+		verify(resourceManagerGateway).requestSlot(any(UUID.class), any(UUID.class), slotRequestArgumentCaptor.capture(), any(Time.class));
+
+		final SlotRequest slotRequest = slotRequestArgumentCaptor.getValue();
+
+		Future<SimpleSlot> future2 = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null);
+
+		AllocatedSlot allocatedSlot = createAllocatedSlot(resourceID, slotRequest.getAllocationId(), jobId, DEFAULT_TESTING_PROFILE);
+		assertTrue(slotPool.offerSlot(allocatedSlot));
+
+		SimpleSlot slot1 = future1.get(1, TimeUnit.SECONDS);
+		assertTrue(future1.isDone());
+		assertFalse(future2.isDone());
+
+		slotPool.releaseTaskManager(resourceID);
+		assertTrue(slot1.isReleased());
+
+		// slot released and not usable, second allocation still not fulfilled
+		Thread.sleep(10);
+		assertFalse(future2.isDone());
+	}
+
+	static AllocatedSlot createAllocatedSlot(
+			final ResourceID resourceId,
+			final AllocationID allocationId,
+			final JobID jobId,
+			final ResourceProfile resourceProfile) {
+		TaskManagerLocation mockTaskManagerLocation = mock(TaskManagerLocation.class);
+		when(mockTaskManagerLocation.getResourceID()).thenReturn(resourceId);
+
+		TaskManagerGateway mockTaskManagerGateway = mock(TaskManagerGateway.class);
+
+		return new AllocatedSlot(
+			allocationId,
+			jobId,
+			mockTaskManagerLocation,
+			0,
+			resourceProfile,
+			mockTaskManagerGateway);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4129de28/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
index f5b3892..2cf2d4d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
@@ -45,7 +45,7 @@ public class MiniClusterITCase extends TestLogger {
 		executeJob(miniCluster);
 	}
 
-//	@Test
+	@Test
 	public void runJobWithMultipleRpcServices() throws Exception {
 		MiniClusterConfiguration cfg = new MiniClusterConfiguration();
 		cfg.setUseRpcServicePerComponent();
@@ -54,7 +54,7 @@ public class MiniClusterITCase extends TestLogger {
 		executeJob(miniCluster);
 	}
 
-//	@Test
+	@Test
 	public void runJobWithMultipleJobManagers() throws Exception {
 		MiniClusterConfiguration cfg = new MiniClusterConfiguration();
 		cfg.setNumJobManagers(3);

http://git-wip-us.apache.org/repos/asf/flink/blob/4129de28/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
index 88906a7..1d30ea4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.rpc;
 
-import akka.dispatch.Futures;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.concurrent.CompletableFuture;
@@ -210,9 +209,9 @@ public class TestingSerialRpcService implements RpcService {
 				if (returnType.equals(Future.class)) {
 					try {
 						Object result = handleRpcInvocationSync(methodName, filteredArguments.f0, filteredArguments.f1, futureTimeout);
-						return Futures.successful(result);
+						return FlinkCompletableFuture.completed(result);
 					} catch (Throwable e) {
-						return Futures.failed(e);
+						return FlinkCompletableFuture.completedExceptionally(e);
 					}
 				} else {
 					return handleRpcInvocationSync(methodName, filteredArguments.f0, filteredArguments.f1, futureTimeout);


Mime
View raw message