flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject flink git commit: [FLINK-4489] [tm] Add TaskSlotTable to manage slot allocations for multiple job managers
Date Sat, 15 Oct 2016 09:32:33 GMT
Repository: flink
Updated Branches:
  refs/heads/flip-6 7aca811df -> 9da76dcfd


[FLINK-4489] [tm] Add TaskSlotTable to manage slot allocations for multiple job managers

Add TimerService for slot timeouts

Add task and task slot access methods

Add comments to newly introduced classes

This closes #2638.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9da76dcf
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9da76dcf
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9da76dcf

Branch: refs/heads/flip-6
Commit: 9da76dcfde689e8f4516880459b80c448233aec5
Parents: 7aca811
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Wed Oct 5 11:58:26 2016 +0200
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Sat Oct 15 11:31:59 2016 +0200

----------------------------------------------------------------------
 .../runtime/clusterframework/types/SlotID.java  |   4 +-
 .../resourcemanager/ResourceManager.java        |  13 +-
 .../resourcemanager/ResourceManagerGateway.java |  13 +-
 .../runtime/taskexecutor/TaskExecutor.java      | 194 +++---
 .../runtime/taskexecutor/TaskManagerRunner.java |   1 +
 .../taskexecutor/TaskManagerServices.java       |  29 +-
 .../TaskManagerServicesConfiguration.java       |   2 +
 .../flink/runtime/taskexecutor/TaskSlot.java    |  73 --
 .../runtime/taskexecutor/TaskSlotMapping.java   |  44 --
 .../runtime/taskexecutor/slot/SlotActions.java  |  45 ++
 .../slot/SlotNotActiveException.java            |  34 +
 .../slot/SlotNotFoundException.java             |  37 +
 .../runtime/taskexecutor/slot/TaskSlot.java     | 289 ++++++++
 .../taskexecutor/slot/TaskSlotState.java        |  29 +
 .../taskexecutor/slot/TaskSlotTable.java        | 682 +++++++++++++++++++
 .../taskexecutor/slot/TimeoutListener.java      |  37 +
 .../runtime/taskexecutor/slot/TimerService.java | 160 +++++
 .../apache/flink/runtime/taskmanager/Task.java  |   8 +-
 .../PartialConsumePipelinedResultTest.java      |   1 -
 .../runtime/taskexecutor/TaskExecutorTest.java  |   4 +
 20 files changed, 1457 insertions(+), 242 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9da76dcf/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotID.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotID.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotID.java
index 237597b..d6409b6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotID.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotID.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.clusterframework.types;
 
 import java.io.Serializable;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -34,8 +35,9 @@ public class SlotID implements ResourceIDRetrievable, Serializable {
 
 	/** The numeric id for single slot */
 	private final int slotNumber;
-
+	
 	public SlotID(ResourceID resourceId, int slotNumber) {
+		checkArgument(0 <= slotNumber, "Slot number must be positive.");
 		this.resourceId = checkNotNull(resourceId, "ResourceID must not be null");
 		this.slotNumber = slotNumber;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/9da76dcf/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 8fbb34b..3122804 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -40,7 +40,6 @@ import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.resourcemanager.messages.jobmanager.RMSlotRequestRejected;
 import org.apache.flink.runtime.resourcemanager.messages.jobmanager.RMSlotRequestReply;
-import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.SlotAvailableReply;
 import org.apache.flink.runtime.resourcemanager.registration.JobMasterRegistration;
 import org.apache.flink.runtime.resourcemanager.registration.WorkerRegistration;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
@@ -337,34 +336,32 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 	/**
 	 * Notification from a TaskExecutor that a slot has become available
 	 * @param resourceManagerLeaderId TaskExecutor's resource manager leader id
-	 * @param resourceID TaskExecutor's resource id
 	 * @param instanceID TaskExecutor's instance id
 	 * @param slotID The slot id of the available slot
 	 * @return SlotAvailableReply
 	 */
 	@RpcMethod
-	public SlotAvailableReply notifySlotAvailable(
+	public void notifySlotAvailable(
 			final UUID resourceManagerLeaderId,
-			final ResourceID resourceID,
 			final InstanceID instanceID,
 			final SlotID slotID) {
 
 		if (resourceManagerLeaderId.equals(leaderSessionID)) {
-			WorkerRegistration<WorkerType> registration = taskExecutors.get(resourceID);
+			final ResourceID resourceId = slotID.getResourceID();
+			WorkerRegistration<WorkerType> registration = taskExecutors.get(resourceId);
+
 			if (registration != null) {
 				InstanceID registrationInstanceID = registration.getInstanceID();
 				if (registrationInstanceID.equals(instanceID)) {
 					runAsync(new Runnable() {
 						@Override
 						public void run() {
-							slotManager.notifySlotAvailable(resourceID, slotID);
+							slotManager.notifySlotAvailable(resourceId, slotID);
 						}
 					});
-					return new SlotAvailableReply(leaderSessionID, slotID);
 				}
 			}
 		}
-		return null;
 	}
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9da76dcf/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
index 07e9e43..968eeb8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
@@ -26,7 +26,6 @@ import org.apache.flink.runtime.clusterframework.types.SlotID;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.resourcemanager.messages.jobmanager.RMSlotRequestReply;
-import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.SlotAvailableReply;
 import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcTimeout;
 import org.apache.flink.runtime.jobmaster.JobMaster;
@@ -94,17 +93,13 @@ public interface ResourceManagerGateway extends RpcGateway {
 	 * Sent by the TaskExecutor to notify the ResourceManager that a slot has become available.
 	 *
 	 * @param resourceManagerLeaderId The ResourceManager leader id
-	 * @param resourceID The ResourceID of the TaskExecutor
-	 * @param instanceID The InstanceID of the TaskExecutor
+	 * @param instanceId TaskExecutor's instance id
 	 * @param slotID The SlotID of the freed slot
-	 * @return The confirmation by the ResourceManager
 	 */
-	Future<SlotAvailableReply> notifySlotAvailable(
+	void notifySlotAvailable(
 		UUID resourceManagerLeaderId,
-		ResourceID resourceID,
-		InstanceID instanceID,
-		SlotID slotID,
-		@RpcTimeout Time timeout);
+		InstanceID instanceId,
+		SlotID slotID);
 
 	/**
 	 * Registers an infoMessage listener

http://git-wip-us.apache.org/repos/asf/flink/blob/9da76dcf/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 9f9234f..e642315 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -47,6 +47,7 @@ import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestRegistered;
@@ -64,6 +65,10 @@ import org.apache.flink.runtime.taskexecutor.rpc.RpcCheckpointResponder;
 import org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider;
 import org.apache.flink.runtime.taskexecutor.rpc.RpcPartitionStateChecker;
 import org.apache.flink.runtime.taskexecutor.rpc.RpcResultPartitionConsumableNotifier;
+import org.apache.flink.runtime.taskexecutor.slot.SlotActions;
+import org.apache.flink.runtime.taskexecutor.slot.SlotNotActiveException;
+import org.apache.flink.runtime.taskexecutor.slot.SlotNotFoundException;
+import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
 import org.apache.flink.runtime.taskmanager.CheckpointResponder;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
@@ -76,7 +81,6 @@ import java.net.InetSocketAddress;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
@@ -132,13 +136,9 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 
 	private Map<ResourceID, JobManagerConnection> jobManagerConnections;
 
-	// --------- Slot allocation table --------
+	// --------- task slot allocation table -----------
 
-	private Map<AllocationID, TaskSlot> taskSlots;
-
-	// --------- Slot allocation table --------
-
-	private Map<ExecutionAttemptID, TaskSlotMapping> taskSlotMappings;
+	private final TaskSlotTable taskSlotTable;
 
 	// ------------------------------------------------------------------------
 
@@ -154,6 +154,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 		TaskManagerMetricGroup taskManagerMetricGroup,
 		BroadcastVariableManager broadcastVariableManager,
 		FileCache fileCache,
+		TaskSlotTable taskSlotTable,
 		FatalErrorHandler fatalErrorHandler) {
 
 		super(rpcService);
@@ -167,6 +168,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 		this.networkEnvironment = checkNotNull(networkEnvironment);
 		this.haServices = checkNotNull(haServices);
 		this.metricRegistry = checkNotNull(metricRegistry);
+		this.taskSlotTable = checkNotNull(taskSlotTable);
 		this.fatalErrorHandler = checkNotNull(fatalErrorHandler);
 		this.taskManagerMetricGroup = checkNotNull(taskManagerMetricGroup);
 		this.broadcastVariableManager = checkNotNull(broadcastVariableManager);
@@ -175,8 +177,6 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 		this.jobManagerConnections = new HashMap<>(4);
 
 		this.unconfirmedFreeSlots = new HashSet<>();
-		this.taskSlots = new HashMap<>(taskManagerConfiguration.getNumberSlots());
-		this.taskSlotMappings = new HashMap<>(taskManagerConfiguration.getNumberSlots() * 2);
 	}
 
 	// ------------------------------------------------------------------------
@@ -193,6 +193,9 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 		} catch (Exception e) {
 			onFatalErrorAsync(e);
 		}
+
+		// tell the task slot table who's responsible for the task slot actions
+		taskSlotTable.start(new SlotActionsImpl(), taskManagerConfiguration.getTimeout());
 	}
 
 	/**
@@ -202,6 +205,8 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 	public void shutDown() {
 		log.info("Stopping TaskManager {}.", getAddress());
 
+		taskSlotTable.stop();
+
 		if (resourceManagerConnection.isConnected()) {
 			try {
 				resourceManagerConnection.close();
@@ -264,10 +269,9 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 			throw new TaskSubmissionException(message);
 		}
 
-		TaskSlot taskSlot = taskSlots.get(tdd.getAllocationID());
-
-		if (taskSlot == null) {
-			final String message = "No task slot allocated for allocation ID " + tdd.getAllocationID() + '.';
+		if (!taskSlotTable.existActiveSlot(tdd.getJobID(), tdd.getAllocationID())) {
+			final String message = "No task slot allocated for job ID " + tdd.getJobID() +
+				" and allocation ID " + tdd.getAllocationID() + '.';
 			log.debug(message);
 			throw new TaskSubmissionException(message);
 		}
@@ -307,10 +311,15 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 
 		log.info("Received task {}.", task.getTaskInfo().getTaskNameWithSubtasks());
 
-		if(taskSlot.add(task)) {
-			TaskSlotMapping taskSlotMapping = new TaskSlotMapping(task, taskSlot);
+		boolean taskAdded;
 
-			taskSlotMappings.put(task.getExecutionId(), taskSlotMapping);
+		try {
+			taskAdded = taskSlotTable.addTask(task);
+		} catch (SlotNotFoundException | SlotNotActiveException e) {
+			throw new TaskSubmissionException("Could not submit task.", e);
+		}
+
+		if (taskAdded) {
 			task.startTaskThread();
 
 			return Acknowledge.get();
@@ -325,7 +334,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 
 	@RpcMethod
 	public Acknowledge cancelTask(ExecutionAttemptID executionAttemptID) throws TaskException {
-		final Task task = getTask(executionAttemptID);
+		final Task task = taskSlotTable.getTask(executionAttemptID);
 
 		if (task != null) {
 			try {
@@ -344,7 +353,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 
 	@RpcMethod
 	public Acknowledge stopTask(ExecutionAttemptID executionAttemptID) throws TaskException {
-		final Task task = getTask(executionAttemptID);
+		final Task task = taskSlotTable.getTask(executionAttemptID);
 
 		if (task != null) {
 			try {
@@ -367,7 +376,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 
 	@RpcMethod
 	public Acknowledge updatePartitions(final ExecutionAttemptID executionAttemptID, Collection<PartitionInfo> partitionInfos) throws PartitionException {
-		final Task task = getTask(executionAttemptID);
+		final Task task = taskSlotTable.getTask(executionAttemptID);
 
 		if (task != null) {
 			for (final PartitionInfo partitionInfo: partitionInfos) {
@@ -430,7 +439,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 	public Acknowledge triggerCheckpoint(ExecutionAttemptID executionAttemptID, long checkpointId, long checkpointTimestamp) throws CheckpointException {
 		log.debug("Trigger checkpoint {}@{} for {}.", checkpointId, checkpointTimestamp, executionAttemptID);
 
-		final Task task = getTask(executionAttemptID);
+		final Task task = taskSlotTable.getTask(executionAttemptID);
 
 		if (task != null) {
 			task.triggerCheckpointBarrier(checkpointId, checkpointTimestamp);
@@ -448,7 +457,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 	public Acknowledge confirmCheckpoint(ExecutionAttemptID executionAttemptID, long checkpointId, long checkpointTimestamp) throws CheckpointException {
 		log.debug("Confirm checkpoint {}@{} for {}.", checkpointId, checkpointTimestamp, executionAttemptID);
 
-		final Task task = getTask(executionAttemptID);
+		final Task task = taskSlotTable.getTask(executionAttemptID);
 
 		if (task != null) {
 			task.notifyCheckpointComplete(checkpointId);
@@ -494,68 +503,8 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 		return jobManagerConnections.get(jobManagerID);
 	}
 
-	private Task getTask(ExecutionAttemptID executionAttemptID) {
-		TaskSlotMapping taskSlotMapping = taskSlotMappings.get(executionAttemptID);
-
-		if (taskSlotMapping != null) {
-			return taskSlotMapping.getTask();
-		} else {
-			return null;
-		}
-	}
-
-	private Task removeTask(ExecutionAttemptID executionAttemptID) {
-		TaskSlotMapping taskSlotMapping = taskSlotMappings.remove(executionAttemptID);
-
-		if (taskSlotMapping != null) {
-			final Task task = taskSlotMapping.getTask();
-			final TaskSlot taskSlot = taskSlotMapping.getTaskSlot();
-
-			taskSlot.remove(task);
-
-			return task;
-		} else {
-			return null;
-		}
-	}
-
-	private Iterable<Task> getAllTasks() {
-		final Iterator<TaskSlotMapping> taskEntryIterator = taskSlotMappings.values().iterator();
-		final Iterator<Task> iterator = new Iterator<Task>() {
-			@Override
-			public boolean hasNext() {
-				return taskEntryIterator.hasNext();
-			}
-
-			@Override
-			public Task next() {
-				return taskEntryIterator.next().getTask();
-			}
-
-			@Override
-			public void remove() {
-				taskEntryIterator.remove();
-			}
-		};
-
-		return new Iterable<Task>() {
-			@Override
-			public Iterator<Task> iterator() {
-				return iterator;
-			}
-		};
-	}
-
-	private void clearTasks() {
-		taskSlotMappings.clear();
-
-		for (TaskSlot taskSlot: taskSlots.values()) {
-			taskSlot.clear();
-		}
-	}
-
 	private void failTask(final ExecutionAttemptID executionAttemptID, final Throwable cause) {
-		final Task task = getTask(executionAttemptID);
+		final Task task = taskSlotTable.getTask(executionAttemptID);
 
 		if (task != null) {
 			try {
@@ -568,18 +517,6 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 		}
 	}
 
-	private void cancelAndClearAllTasks(Throwable cause) {
-		log.info("Cancellaing all computations and discarding all cached data.");
-
-		Iterable<Task> tasks = getAllTasks();
-
-		for (Task task: tasks) {
-			task.failExternally(cause);
-		}
-
-		clearTasks();
-	}
-
 	private void updateTaskExecutionState(
 			final UUID jobMasterLeaderId,
 			final JobMasterGateway jobMasterGateway,
@@ -602,11 +539,10 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 
 	private void unregisterTaskAndNotifyFinalState(
 			final UUID jobMasterLeaderId,
-			final JobMasterGateway jobMasterGateway,
-			final ExecutionAttemptID executionAttemptID)
-	{
-		Task task = removeTask(executionAttemptID);
+			final JobMasterGateway jobMasterGateway,		
+			final ExecutionAttemptID executionAttemptID) {
 
+		Task task = taskSlotTable.removeTask(executionAttemptID);
 		if (task != null) {
 			if (!task.getExecutionState().isTerminal()) {
 				try {
@@ -718,6 +654,41 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 		}
 	}
 
+	private void freeSlot(AllocationID allocationId) {
+		Preconditions.checkNotNull(allocationId);
+
+		try {
+			int freedSlotIndex = taskSlotTable.freeSlot(allocationId);
+
+			if (freedSlotIndex != -1 && isConnectedToResourceManager()) {
+				// the slot was freed. Tell the RM about it
+				ResourceManagerGateway resourceManagerGateway = resourceManagerConnection.getTargetGateway();
+
+				resourceManagerGateway.notifySlotAvailable(
+					resourceManagerConnection.getTargetLeaderId(),
+					resourceManagerConnection.getRegistrationId(),
+					new SlotID(getResourceID(), freedSlotIndex));
+			}
+		} catch (SlotNotFoundException e) {
+			log.debug("Could not free slot for allocation id {}.", allocationId, e);
+		}
+	}
+
+	private void timeoutSlot(AllocationID allocationId, UUID ticket) {
+		Preconditions.checkNotNull(allocationId);
+		Preconditions.checkNotNull(ticket);
+
+		if (taskSlotTable.isValidTimeout(allocationId, ticket)) {
+			freeSlot(allocationId);
+		} else {
+			log.debug("Received an invalid timeout for allocation id {} with ticket {}.", allocationId, ticket);
+		}
+	}
+
+	private boolean isConnectedToResourceManager() {
+		return (resourceManagerConnection != null && resourceManagerConnection.isConnected());
+	}
+
 	// ------------------------------------------------------------------------
 	//  Properties
 	// ------------------------------------------------------------------------
@@ -778,7 +749,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 	/**
 	 * The listener for leader changes of the resource manager
 	 */
-	private class ResourceManagerLeaderListener implements LeaderRetrievalListener {
+	private final class ResourceManagerLeaderListener implements LeaderRetrievalListener {
 
 		@Override
 		public void notifyLeaderAddress(final String leaderAddress, final UUID leaderSessionID) {
@@ -796,7 +767,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 		}
 	}
 
-	private class TaskManagerActionsImpl implements TaskManagerActions {
+	private final class TaskManagerActionsImpl implements TaskManagerActions {
 		private final UUID jobMasterLeaderId;
 		private final JobMasterGateway jobMasterGateway;
 
@@ -837,4 +808,27 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 		}
 	}
 
+	private class SlotActionsImpl implements SlotActions {
+
+		@Override
+		public void freeSlot(final AllocationID allocationId) {
+			runAsync(new Runnable() {
+				@Override
+				public void run() {
+					TaskExecutor.this.freeSlot(allocationId);
+				}
+			});
+		}
+
+		@Override
+		public void timeoutSlot(final AllocationID allocationId, final UUID ticket) {
+			runAsync(new Runnable() {
+				@Override
+				public void run() {
+					TaskExecutor.this.timeoutSlot(allocationId, ticket);
+				}
+			});
+		}
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9da76dcf/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index bb66655..ca1d2ce 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -98,6 +98,7 @@ public class TaskManagerRunner implements FatalErrorHandler {
 			taskManagerServices.getTaskManagerMetricGroup(),
 			taskManagerServices.getBroadcastVariableManager(),
 			taskManagerServices.getFileCache(),
+			taskManagerServices.getTaskSlotTable(),
 			this);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9da76dcf/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index e264a1c..c1728b4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -20,7 +20,9 @@ package org.apache.flink.runtime.taskexecutor;
 
 import org.apache.flink.core.memory.MemoryType;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.filecache.FileCache;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
@@ -38,6 +40,8 @@ import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.query.netty.DisabledKvStateRequestStats;
 import org.apache.flink.runtime.query.netty.KvStateServer;
+import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
+import org.apache.flink.runtime.taskexecutor.slot.TimerService;
 import org.apache.flink.runtime.taskexecutor.utils.TaskExecutorMetricsInitializer;
 import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
@@ -48,6 +52,9 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
 
 /**
  * Container for {@link TaskExecutor} services such as the {@link MemoryManager}, {@link IOManager},
@@ -65,6 +72,7 @@ public class TaskManagerServices {
 	private final TaskManagerMetricGroup taskManagerMetricGroup;
 	private final BroadcastVariableManager broadcastVariableManager;
 	private final FileCache fileCache;
+	private final TaskSlotTable taskSlotTable;
 
 	private TaskManagerServices(
 		TaskManagerLocation taskManagerLocation,
@@ -74,7 +82,8 @@ public class TaskManagerServices {
 		MetricRegistry metricRegistry,
 		TaskManagerMetricGroup taskManagerMetricGroup,
 		BroadcastVariableManager broadcastVariableManager,
-		FileCache fileCache) {
+		FileCache fileCache,
+		TaskSlotTable taskSlotTable) {
 
 		this.taskManagerLocation = Preconditions.checkNotNull(taskManagerLocation);
 		this.memoryManager = Preconditions.checkNotNull(memoryManager);
@@ -84,6 +93,7 @@ public class TaskManagerServices {
 		this.taskManagerMetricGroup = Preconditions.checkNotNull(taskManagerMetricGroup);
 		this.broadcastVariableManager = Preconditions.checkNotNull(broadcastVariableManager);
 		this.fileCache = Preconditions.checkNotNull(fileCache);
+		this.taskSlotTable = Preconditions.checkNotNull(taskSlotTable);
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -121,6 +131,10 @@ public class TaskManagerServices {
 	public FileCache getFileCache() {
 		return fileCache;
 	}
+	
+	public TaskSlotTable getTaskSlotTable() {
+		return taskSlotTable;
+	}
 
 	// --------------------------------------------------------------------------------------------
 	//  Static factory methods for task manager services
@@ -167,6 +181,16 @@ public class TaskManagerServices {
 
 		final FileCache fileCache = new FileCache(taskManagerServicesConfiguration.getTmpDirPaths());
 
+		final List<ResourceProfile> resourceProfiles = new ArrayList<>(taskManagerServicesConfiguration.getNumberOfSlots());
+
+		for (int i = 0; i < taskManagerServicesConfiguration.getNumberOfSlots(); i++) {
+			resourceProfiles.add(new ResourceProfile(1.0, 42L));
+		}
+
+		final TimerService<AllocationID> timerService = new TimerService<>(new ScheduledThreadPoolExecutor(1));
+
+		final TaskSlotTable taskSlotTable = new TaskSlotTable(resourceProfiles, timerService);
+		
 		return new TaskManagerServices(
 			taskManagerLocation,
 			memoryManager,
@@ -175,7 +199,8 @@ public class TaskManagerServices {
 			metricRegistry,
 			taskManagerMetricGroup,
 			broadcastVariableManager,
-			fileCache);
+			fileCache,
+			taskSlotTable);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/9da76dcf/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
index 80dfc09..036a890 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
@@ -173,6 +173,8 @@ public class TaskManagerServicesConfiguration {
 
 		final MetricRegistryConfiguration metricRegistryConfiguration = MetricRegistryConfiguration.fromConfiguration(configuration);
 
+
+
 		return new TaskManagerServicesConfiguration(
 			remoteAddress,
 			tmpDirs,

http://git-wip-us.apache.org/repos/asf/flink/blob/9da76dcf/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskSlot.java
deleted file mode 100644
index 4fc1d66..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskSlot.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.taskexecutor;
-
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.taskmanager.Task;
-import org.apache.flink.util.Preconditions;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Container for multiple {@link Task} belonging to the same slot.
- */
-public class TaskSlot {
-	private final AllocationID allocationID;
-	private final ResourceID resourceID;
-	private final Map<ExecutionAttemptID, Task> tasks;
-
-	public TaskSlot(AllocationID allocationID, ResourceID resourceID) {
-		this.allocationID = Preconditions.checkNotNull(allocationID);
-		this.resourceID = Preconditions.checkNotNull(resourceID);
-		tasks = new HashMap<>(4);
-	}
-
-	public AllocationID getAllocationID() {
-		return allocationID;
-	}
-
-	public ResourceID getResourceID() {
-		return resourceID;
-	}
-
-	public boolean add(Task task) {
-		// sanity check
-		Preconditions.checkArgument(allocationID.equals(task.getAllocationID()));
-
-		Task oldTask = tasks.put(task.getExecutionId(), task);
-
-		if (oldTask != null) {
-			tasks.put(task.getExecutionId(), oldTask);
-			return false;
-		} else {
-			return true;
-		}
-	}
-
-	public Task remove(Task task) {
-		return tasks.remove(task.getExecutionId());
-	}
-
-	public void clear() {
-		tasks.clear();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9da76dcf/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskSlotMapping.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskSlotMapping.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskSlotMapping.java
deleted file mode 100644
index e67fd52..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskSlotMapping.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.taskexecutor;
-
-import org.apache.flink.runtime.taskmanager.Task;
-import org.apache.flink.util.Preconditions;
-
-/**
- * Mapping between a {@link Task} and its {@link TaskSlot}.
- */
-public class TaskSlotMapping {
-
-	private final Task task;
-	private final TaskSlot taskSlot;
-
-	public TaskSlotMapping(Task task, TaskSlot taskSlot) {
-		this.task = Preconditions.checkNotNull(task);
-		this.taskSlot = Preconditions.checkNotNull(taskSlot);
-	}
-
-	public Task getTask() {
-		return task;
-	}
-
-	public TaskSlot getTaskSlot() {
-		return taskSlot;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9da76dcf/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/SlotActions.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/SlotActions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/SlotActions.java
new file mode 100644
index 0000000..f7ed235
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/SlotActions.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.slot;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+
+import java.util.UUID;
+
+/**
+ * Interface to trigger slot actions from within the {@link TaskSlotTable}.
+ */
+public interface SlotActions {
+
+	/**
+	 * Free the task slot with the given allocation id.
+	 *
+	 * @param allocationId to identify the slot to be freed
+	 */
+	void freeSlot(AllocationID allocationId);
+
+	/**
+	 * Timeout the task slot for the given allocation id. The timeout is identified by the given
+	 * ticket to filter invalid timeouts out.
+	 *
+	 * @param allocationId identifying the task slot to be timed out
+	 * @param ticket allowing to filter invalid timeouts out
+	 */
+	void timeoutSlot(AllocationID allocationId, UUID ticket);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9da76dcf/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/SlotNotActiveException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/SlotNotActiveException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/SlotNotActiveException.java
new file mode 100644
index 0000000..b0ddc5d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/SlotNotActiveException.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.slot;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+
+/**
+ * Exception indicating that the given {@link TaskSlot} was not in state active.
+ */
+public class SlotNotActiveException extends Exception {
+
+	private static final long serialVersionUID = 4305837511564584L;
+
+	public SlotNotActiveException(JobID jobId, AllocationID allocationId) {
+		super("No active slot for job " + jobId + " with allocation id " + allocationId + '.');
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9da76dcf/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/SlotNotFoundException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/SlotNotFoundException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/SlotNotFoundException.java
new file mode 100644
index 0000000..c639b16
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/SlotNotFoundException.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.slot;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+
+/**
+ * Exception indicating that a {@link TaskSlot} could not be found.
+ */
+public class SlotNotFoundException extends Exception {
+
+	private static final long serialVersionUID = -883614807750137925L;
+
+	public SlotNotFoundException(AllocationID allocationId) {
+		this("Could not find slot for " + allocationId + '.');
+	}
+
+	public SlotNotFoundException(String message) {
+		super(message);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9da76dcf/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java
new file mode 100644
index 0000000..0942772
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.slot;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.taskmanager.Task;
+import org.apache.flink.util.Preconditions;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * Container for multiple {@link Task} belonging to the same slot. A {@link TaskSlot} can be in one
+ * of the following states:
+ * <ul>
+ *     <li>Free - The slot is empty and not allocated to a job</li>
+ *     <li>Releasing - The slot is about to be freed after it has become empty.</li>
+ *     <li>Allocated - The slot has been allocated for a job.</li>
+ *     <li>Active - The slot is in active use by a job manager which is the leader of the allocating job.</li>
+ * </ul>
+ * <p>
+ * A task slot can only be allocated if it is in state free. An allocated task slot can transition
+ * to state active.
+ *<p>
+ * An active slot allows to add tasks from the respective job and with the correct allocation id.
+ * An active slot can be marked as inactive which sets the state back to allocated.
+ * <p>
+ * An allocated or active slot can only be freed if it is empty. If it is not empty, then it's state
+ * can be set to releasing indicating that it can be freed once it becomes empty.
+ */
+public class TaskSlot {
+
+	/** Index of the task slot */
+	private final int index;
+
+	/** Resource characteristics for this slot */
+	private final ResourceProfile resourceProfile;
+
+	/** Tasks running in this slot */
+	private final Map<ExecutionAttemptID, Task> tasks;
+
+	/** State of this slot */
+	private TaskSlotState state;
+
+	/** Job id to which the slot has been allocated; null if not allocated */
+	private JobID jobId;
+
+	/** Allocation id of this slot; null if not allocated */
+	private AllocationID allocationId;
+
+	TaskSlot(final int index, final ResourceProfile resourceProfile) {
+		Preconditions.checkArgument(0 <= index, "The index must be greater than 0.");
+		this.index = index;
+		this.resourceProfile = Preconditions.checkNotNull(resourceProfile);
+
+		this.tasks = new HashMap<>(4);
+		this.state = TaskSlotState.FREE;
+
+		this.jobId = null;
+		this.allocationId = null;
+	}
+
+	// ----------------------------------------------------------------------------------
+	// State accessors
+	// ----------------------------------------------------------------------------------
+
+	public int getIndex() {
+		return index;
+	}
+
+	public ResourceProfile getResourceProfile() {
+		return resourceProfile;
+	}
+
+	public JobID getJobId() {
+		return jobId;
+	}
+
+	public AllocationID getAllocationId() {
+		return allocationId;
+	}
+
+	TaskSlotState getState() {
+		return state;
+	}
+
+	public boolean isEmpty() {
+		return tasks.isEmpty();
+	}
+
+	public boolean isFree() {
+		return TaskSlotState.FREE == state;
+	}
+
+	public boolean isActive(JobID activeJobId, AllocationID activeAllocationId) {
+		Preconditions.checkNotNull(activeJobId);
+		Preconditions.checkNotNull(activeAllocationId);
+
+		return TaskSlotState.ACTIVE == state &&
+			activeJobId.equals(jobId) &&
+			activeAllocationId.equals(allocationId);
+	}
+
+	public boolean isAllocated(JobID jobIdToCheck, AllocationID allocationIDToCheck) {
+		Preconditions.checkNotNull(jobIdToCheck);
+		Preconditions.checkNotNull(allocationIDToCheck);
+
+		return jobIdToCheck.equals(jobId) && allocationIDToCheck.equals(allocationId) &&
+			(TaskSlotState.ACTIVE == state || TaskSlotState.ALLOCATED == state);
+	}
+
+	public boolean isReleasing() {
+		return TaskSlotState.RELEASING == state;
+	}
+
+	/**
+	 * Get all tasks running in this task slot.
+	 *
+	 * @return Iterator to all currently contained tasks in this task slot.
+	 */
+	public Iterator<Task> getTasks() {
+		return tasks.values().iterator();
+	}
+
+	// ----------------------------------------------------------------------------------
+	// State changing methods
+	// ----------------------------------------------------------------------------------
+
+	/**
+	 * Add the given task to the task slot. This is only possible if there is not already another
+	 * task with the same execution attempt id added to the task slot. In this case, the method
+	 * returns true. Otherwise the task slot is left unchanged and false is returned.
+	 *
+	 * In case that the task slot state is not active an {@link IllegalStateException} is thrown.
+	 * In case that the task's job id and allocation id don't match with the job id and allocation
+	 * id for which the task slot has been allocated, an {@link IllegalArgumentException} is thrown.
+	 *
+	 * @param task to be added to the task slot
+	 * @throws IllegalStateException if the task slot is not in state active
+	 * @return true if the task was added to the task slot; otherwise false
+	 */
+	public boolean add(Task task) {
+		// Check that this slot has been assigned to the job sending this task
+		Preconditions.checkArgument(task.getJobID().equals(jobId), "The task's job id does not match the " +
+			"job id for which the slot has been allocated.");
+		Preconditions.checkArgument(task.getAllocationId().equals(allocationId), "The task's allocation " +
+			"id does not match the allocation id for which the slot has been allocated.");
+		Preconditions.checkState(TaskSlotState.ACTIVE == state, "The task slot is not in state active.");
+
+		Task oldTask = tasks.put(task.getExecutionId(), task);
+
+		if (oldTask != null) {
+			tasks.put(task.getExecutionId(), oldTask);
+			return false;
+		} else {
+			return true;
+		}
+	}
+
+	/**
+	 * Remove the task identified by the given execution attempt id.
+	 *
+	 * @param executionAttemptId identifying the task to be removed
+	 * @return The removed task if there was any; otherwise null.
+	 */
+	public Task remove(ExecutionAttemptID executionAttemptId) {
+		return tasks.remove(executionAttemptId);
+	}
+
+	/**
+	 * Removes all tasks from this task slot.
+	 */
+	public void clear() {
+		tasks.clear();
+	}
+
+	/**
+	 * Allocate the task slot for the given job and allocation id. If the slot could be allocated,
+	 * or is already allocated/active for the given job and allocation id, then the method returns
+	 * true. Otherwise it returns false.
+	 *
+	 * A slot can only be allocated if it's current state is free.
+	 *
+	 * @param newJobId to allocate the slot for
+	 * @param newAllocationId to identify the slot allocation
+	 * @return True if the slot was allocated for the given job and allocation id; otherwise false
+	 */
+	public boolean allocate(JobID newJobId, AllocationID newAllocationId) {
+		if (TaskSlotState.FREE == state) {
+			// sanity checks
+			Preconditions.checkState(allocationId == null);
+			Preconditions.checkState(jobId == null);
+
+			this.jobId = Preconditions.checkNotNull(newJobId);
+			this.allocationId = Preconditions.checkNotNull(newAllocationId);
+
+			state = TaskSlotState.ALLOCATED;
+
+			return true;
+		} else if (TaskSlotState.ALLOCATED == state || TaskSlotState.ACTIVE == state) {
+			Preconditions.checkNotNull(newJobId);
+			Preconditions.checkNotNull(newAllocationId);
+
+			return newJobId.equals(jobId) && newAllocationId.equals(allocationId);
+		} else {
+			return false;
+		}
+	}
+
+	/**
+	 * Mark this slot as active. A slot can only be marked active if it's in state allocated.
+	 *
+	 * The method returns true if the slot was set to active. Otherwise it returns false.
+	 *
+	 * @return True if the new state of the slot is active; otherwise false
+	 */
+	public boolean markActive() {
+		if (TaskSlotState.ALLOCATED == state || TaskSlotState.ACTIVE == state) {
+			state = TaskSlotState.ACTIVE;
+
+			return true;
+		} else {
+			return false;
+		}
+	}
+
+	/**
+	 * Mark the slot as inactive/allocated. A slot can only be marked as inactive/allocated if it's
+	 * in state allocated or active.
+	 *
+	 * @return True if the new state of the slot is allocated; otherwise false
+	 */
+	public boolean markInactive() {
+		if (TaskSlotState.ACTIVE == state || TaskSlotState.ALLOCATED == state) {
+			state = TaskSlotState.ALLOCATED;
+
+			return true;
+		} else {
+			return false;
+		}
+	}
+
+	/**
+	 * Mark the slot as free. A slot can only marked as free if it's empty.
+	 *
+	 * @return True if the new state is free; otherwise false
+	 */
+	public boolean markFree() {
+		if (isEmpty()) {
+			state = TaskSlotState.FREE;
+			this.jobId = null;
+			this.allocationId = null;
+
+			return true;
+		} else {
+			return false;
+		}
+	}
+
+	/**
+	 * Mark this slot as releasing. A slot can always be marked as releasing.
+	 *
+	 * @return True
+	 */
+	public boolean markReleasing() {
+		state = TaskSlotState.RELEASING;
+		return true;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9da76dcf/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotState.java
new file mode 100644
index 0000000..e3ba903
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotState.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.slot;
+
+/**
+ * Internal task slot state
+ */
+enum TaskSlotState {
+	ACTIVE, // Slot is in active use by a job manager responsible for a job
+	ALLOCATED, // Slot has been allocated for a job but not yet given to a job manager
+	RELEASING, // Slot is not empty but tasks are failed. Upon removal of all tasks, it will be released
+	FREE // Slot is free
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9da76dcf/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
new file mode 100644
index 0000000..42cb919
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
@@ -0,0 +1,682 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.slot;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.taskmanager.Task;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.UUID;
+
+/**
+ * Container for multiple {@link TaskSlot} instances. Additionally, it maintains multiple indices
+ * for faster access to tasks and sets of allocated slots.
+ * <p>
+ * The task slot table automatically registers timeouts for allocated slots which cannot be assigned
+ * to a job manager.
+ * <p>
+ * Before the task slot table can be used, it must be started via the {@link #start} method.
+ */
+public class TaskSlotTable implements TimeoutListener<AllocationID> {
+
+	private static final Logger LOG = LoggerFactory.getLogger(TaskSlotTable.class);
+
+	/** Timer service used to time out allocated slots */
+	private final TimerService<AllocationID> timerService;
+
+	/** The list of all task slots */
+	private final List<TaskSlot> taskSlots;
+
+	/** Mapping from allocation id to task slot */
+	private final Map<AllocationID, TaskSlot> allocationIDTaskSlotMap;
+
+	/** Mapping from execution attempt id to task and task slot */
+	private final Map<ExecutionAttemptID, TaskSlotMapping> taskSlotMappings;
+
+	/** Mapping from job id to allocated slots for a job */
+	private final Map<JobID, Set<AllocationID>> slotsPerJob;
+
+	/** Interface for slot actions, such as freeing them or timing them out */
+	private SlotActions slotActions;
+
+	/** The timeout for allocated slots */
+	private Time slotTimeout;
+
+	/** Whether the table has been started */
+	private boolean started;
+
+	public TaskSlotTable(
+		final Collection<ResourceProfile> resourceProfiles,
+		final TimerService<AllocationID> timerService) {
+
+		int numberSlots = resourceProfiles.size();
+
+		Preconditions.checkArgument(0 < numberSlots, "The number of task slots must be greater than 0.");
+
+		this.timerService = Preconditions.checkNotNull(timerService);
+
+		taskSlots = Arrays.asList(new TaskSlot[numberSlots]);
+
+		int index = 0;
+
+		// create the task slots for the given resource profiles
+		for (ResourceProfile resourceProfile: resourceProfiles) {
+			taskSlots.set(index, new TaskSlot(index, resourceProfile));
+			++index;
+		}
+
+		allocationIDTaskSlotMap = new HashMap<>(numberSlots);
+
+		taskSlotMappings = new HashMap<>(4 * numberSlots);
+
+		slotsPerJob = new HashMap<>(4);
+
+		slotActions = null;
+		slotTimeout = null;
+		started = false;
+	}
+
+	/**
+	 * Start the task slot table with the given slot actions and slot timeout value.
+	 *
+	 * @param initialSlotActions to use for slot actions
+	 * @param initialSlotTimeout to use for slot timeouts
+	 */
+	public void start(SlotActions initialSlotActions, Time initialSlotTimeout) {
+		this.slotActions = Preconditions.checkNotNull(initialSlotActions);
+		this.slotTimeout = Preconditions.checkNotNull(initialSlotTimeout);
+
+		timerService.start(this);
+
+		started = true;
+	}
+
+	/**
+	 * Stop the task slot table.
+	 */
+	public void stop() {
+		started = false;
+		timerService.stop();
+		slotTimeout = null;
+		slotActions = null;
+	}
+
+	// ---------------------------------------------------------------------
+	// Slot methods
+	// ---------------------------------------------------------------------
+
+	/**
+	 * Allocate the slot with the given index for the given job and allocation id. Returns true if
+	 * the slot could be allocated. Otherwise it returns false;
+	 *
+	 * @param index of the task slot to allocate
+	 * @param jobId to allocate the task slot for
+	 * @param allocationId identifying the allocation
+	 * @return True if the task slot could be allocated; otherwise false
+	 */
+	public boolean allocateSlot(int index, JobID jobId, AllocationID allocationId) {
+		checkInit();
+
+		TaskSlot taskSlot = taskSlots.get(index);
+
+		boolean result = taskSlot.allocate(jobId, allocationId);
+
+		if (result) {
+			// update the alloction id to task slot map
+			allocationIDTaskSlotMap.put(allocationId, taskSlot);
+
+			// register a timeout for this slot since it's in state allocated
+			timerService.registerTimeout(allocationId, slotTimeout.getSize(), slotTimeout.getUnit());
+
+			// add this slot to the set of job slots
+			Set<AllocationID> slots = slotsPerJob.get(jobId);
+
+			if (slots == null) {
+				slots = new HashSet<>(4);
+				slotsPerJob.put(jobId, slots);
+			}
+
+			slots.add(allocationId);
+		}
+
+		return result;
+	}
+
+	/**
+	 * Marks the slot under the given allocation id as active. If the slot could not be found, then
+	 * a {@link SlotNotFoundException} is thrown.
+	 *
+	 * @param allocationId to identify the task slot to mark as active
+	 * @throws SlotNotFoundException if the slot could not be found for the given allocation id
+	 * @return True if the slot could be marked active
+	 */
+	public boolean markSlotActive(AllocationID allocationId) throws SlotNotFoundException {
+		checkInit();
+
+		TaskSlot taskSlot = getTaskSlot(allocationId);
+
+		if (taskSlot != null) {
+			if (taskSlot.markActive()) {
+				// unregister a potential timeout
+				timerService.unregisterTimeout(allocationId);
+
+				return true;
+			} else {
+				return false;
+			}
+		} else {
+			throw new SlotNotFoundException(allocationId);
+		}
+	}
+
+	/**
+	 * Marks the slot under the given allocation id as inactive. If the slot could not be found,
+	 * then a {@link SlotNotFoundException} is thrown.
+	 *
+	 * @param allocationId to identify the task slot to mark as inactive
+	 * @throws SlotNotFoundException if the slot could not be found for the given allocation id
+	 * @return True if the slot could be marked inactive
+	 */
+	public boolean markSlotInactive(AllocationID allocationId) throws SlotNotFoundException {
+		checkInit();
+
+		TaskSlot taskSlot = getTaskSlot(allocationId);
+
+		if (taskSlot != null) {
+			if (taskSlot.markInactive()) {
+				// register a timeout to free the slot
+				timerService.registerTimeout(allocationId, slotTimeout.getSize(), slotTimeout.getUnit());
+
+				return true;
+			} else {
+				return false;
+			}
+		} else {
+			throw new SlotNotFoundException(allocationId);
+		}
+	}
+
+	/**
+	 * Try to free the slot. If the slot is empty it will set the state of the task slot to free
+	 * and return its index. If the slot is not empty, then it will set the state of the task slot
+	 * to releasing, fail all tasks and return -1.
+	 *
+	 * @param allocationId identifying the task slot to be freed
+	 * @throws SlotNotFoundException if there is not task slot for the given allocation id
+	 * @return Index of the freed slot if the slot could be freed; otherwise -1
+	 */
+	public int freeSlot(AllocationID allocationId) throws SlotNotFoundException {
+		return freeSlot(allocationId, new Exception("The task slot of this task is being freed."));
+	}
+
+	/**
+	 * Tries to free the slot. If the slot is empty it will set the state of the task slot to free
+	 * and return its index. If the slot is not empty, then it will set the state of the task slot
+	 * to releasing, fail all tasks and return -1.
+	 *
+	 * @param allocationId identifying the task slot to be freed
+	 * @param cause to fail the tasks with if slot is not empty
+	 * @throws SlotNotFoundException if there is not task slot for the given allocation id
+	 * @return Index of the freed slot if the slot could be freed; otherwise -1
+	 */
+	public int freeSlot(AllocationID allocationId, Throwable cause) throws SlotNotFoundException {
+		checkInit();
+
+		TaskSlot taskSlot = getTaskSlot(allocationId);
+
+		if (taskSlot != null) {
+			LOG.info("Free slot {}.", allocationId, cause);
+
+			final JobID jobId = taskSlot.getJobId();
+
+			if (taskSlot.markFree()) {
+				// remove the allocation id to task slot mapping
+				allocationIDTaskSlotMap.remove(allocationId);
+
+				// unregister a potential timeout
+				timerService.unregisterTimeout(allocationId);
+
+				Set<AllocationID> slots = slotsPerJob.get(jobId);
+
+				if (slots == null) {
+					throw new IllegalStateException("There are no more slots allocated for the job " + jobId +
+						". This indicates a programming bug.");
+				}
+
+				slots.remove(allocationId);
+
+				if (slots.isEmpty()) {
+					slotsPerJob.remove(jobId);
+				}
+
+				return taskSlot.getIndex();
+			} else {
+				// we couldn't free the task slot because it still contains task, fail the tasks
+				// and set the slot state to releasing so that it gets eventually freed
+				taskSlot.markReleasing();
+
+				Iterator<Task> taskIterator = taskSlot.getTasks();
+
+				while (taskIterator.hasNext()) {
+					taskIterator.next().failExternally(cause);
+				}
+
+				return -1;
+			}
+		} else {
+			throw new SlotNotFoundException(allocationId);
+		}
+	}
+
+	/**
+	 * Check whether the timeout with ticket is valid for the given allocation id.
+	 *
+	 * @param allocationId to check against
+	 * @param ticket of the timeout
+	 * @return True if the timeout is valid; otherwise false
+	 */
+	public boolean isValidTimeout(AllocationID allocationId, UUID ticket) {
+		checkInit();
+
+		return timerService.isValid(allocationId, ticket);
+	}
+
+	/**
+	 * Check whether the slot for the given index is allocated for the given job and allocation id.
+	 *
+	 * @param index of the task slot
+	 * @param jobId for which the task slot should be allocated
+	 * @param allocationId which should match the task slot's allocation id
+	 * @return True if the given task slot is allocated for the given job and allocation id
+	 */
+	public boolean isAllocated(int index, JobID jobId, AllocationID allocationId) {
+		checkInit();
+
+		TaskSlot taskSlot = taskSlots.get(index);
+
+		return taskSlot.isAllocated(jobId, allocationId);
+	}
+
+	/**
+	 * Check whether there exists an active slot for the given job and allocation id.
+	 *
+	 * @param jobId of the allocated slot
+	 * @param allocationId identifying the allocation
+	 * @return True if there exists a task slot which is active for the given job and allocation id.
+	 */
+	public boolean existActiveSlot(JobID jobId, AllocationID allocationId) {
+		TaskSlot taskSlot = getTaskSlot(allocationId);
+
+		if (taskSlot != null) {
+			return taskSlot.isActive(jobId, allocationId);
+		} else {
+			return false;
+		}
+	}
+
+	/**
+	 * Check whether the task slot with the given index is free.
+	 *
+	 * @param index of the task slot
+	 * @return True if the task slot is free; otherwise false
+	 */
+	public boolean isSlotFree(int index) {
+		TaskSlot taskSlot = taskSlots.get(index);
+
+		return taskSlot.isFree();
+	}
+
+	/**
+	 * Check whether the job has allocated (not active) slots.
+	 *
+	 * @param jobId for which to check for allocated slots
+	 * @return True if there are allocated slots for the given job id.
+	 */
+	public boolean hasAllocatedSlots(JobID jobId) {
+		return getAllocatedSlots(jobId).hasNext();
+	}
+
+	/**
+	 * Return an iterator of allocated slots (their allocation ids) for the given job id.
+	 *
+	 * @param jobId for which to return the allocated slots
+	 * @return Iterator of allocation ids of allocated slots.
+	 */
+	public Iterator<AllocationID> getAllocatedSlots(JobID jobId) {
+		return new AllocationIDIterator(jobId, TaskSlotState.ALLOCATED);
+	}
+
+	/**
+	 * Return an iterator of active slots (their application ids) for the given job id.
+	 *
+	 * @param jobId for which to return the active slots
+	 * @return Iterator of allocation ids of active slots
+	 */
+	public Iterator<AllocationID> getActiveSlots(JobID jobId) {
+		return new AllocationIDIterator(jobId, TaskSlotState.ACTIVE);
+	}
+
+	// ---------------------------------------------------------------------
+	// Task methods
+	// ---------------------------------------------------------------------
+
+	/**
+	 * Add the given task to the slot identified by the task's allocation id.
+	 *
+	 * @param task to add to the task slot with the respective allocation id
+	 * @throws SlotNotFoundException if there was no slot for the given allocation id
+	 * @throws SlotNotActiveException if there was no slot active for task's job and allocation id
+	 * @return True if the task could be added to the task slot; otherwise false
+	 */
+	public boolean addTask(Task task) throws SlotNotFoundException, SlotNotActiveException {
+		Preconditions.checkNotNull(task);
+
+		TaskSlot taskSlot = getTaskSlot(task.getAllocationId());
+
+		if (taskSlot != null) {
+			if (taskSlot.isActive(task.getJobID(), task.getAllocationId())) {
+				if (taskSlot.add(task)) {
+					taskSlotMappings.put(task.getExecutionId(), new TaskSlotMapping(task, taskSlot));
+
+					return true;
+				} else {
+					return false;
+				}
+			} else {
+				throw new SlotNotActiveException(task.getJobID(), task.getAllocationId());
+			}
+		} else {
+			throw new SlotNotFoundException(taskSlot.getAllocationId());
+		}
+	}
+
+	/**
+	 * Remove the task with the given execution attempt id from its task slot. If the owning task
+	 * slot is in state releasing and empty after removing the task, the slot is freed via the
+	 * slot actions.
+	 *
+	 * @param executionAttemptID identifying the task to remove
+	 * @return The removed task if there is any for the given execution attempt id; otherwise null
+	 */
+	public Task removeTask(ExecutionAttemptID executionAttemptID) {
+		TaskSlotMapping taskSlotMapping = taskSlotMappings.remove(executionAttemptID);
+
+		if (taskSlotMapping != null) {
+			Task task = taskSlotMapping.getTask();
+			TaskSlot taskSlot = taskSlotMapping.getTaskSlot();
+
+			taskSlot.remove(task.getExecutionId());
+
+			if (taskSlot.isReleasing() && taskSlot.isEmpty()) {
+				slotActions.freeSlot(taskSlot.getAllocationId());
+			}
+
+			return task;
+		} else {
+			return null;
+		}
+	}
+
+	/**
+	 * Get the task for the given execution attempt id. If none could be found, then return null.
+	 *
+	 * @param executionAttemptID identifying the requested task
+	 * @return The task for the given execution attempt id if it exist; otherwise null
+	 */
+	public Task getTask(ExecutionAttemptID executionAttemptID) {
+		TaskSlotMapping taskSlotMapping = taskSlotMappings.get(executionAttemptID);
+
+		if (taskSlotMapping != null) {
+			return taskSlotMapping.getTask();
+		} else {
+			return null;
+		}
+	}
+
+	/**
+	 * Return an iterator over all tasks for a given job.
+	 *
+	 * @param jobId identifying the job of the requested tasks
+	 * @return Iterator over all task for a given job
+	 */
+	public Iterator<Task> getTasks(JobID jobId) {
+		return new TaskIterator(jobId);
+	}
+
+	// ---------------------------------------------------------------------
+	// TimeoutListener methods
+	// ---------------------------------------------------------------------
+
+	@Override
+	public void notifyTimeout(AllocationID key, UUID ticket) {
+		if (slotActions != null) {
+			slotActions.timeoutSlot(key, ticket);
+		}
+	}
+
+	// ---------------------------------------------------------------------
+	// Internal methods
+	// ---------------------------------------------------------------------
+
+	private TaskSlot getTaskSlot(AllocationID allocationId) {
+		Preconditions.checkNotNull(allocationId);
+
+		TaskSlot taskSlot = allocationIDTaskSlotMap.get(allocationId);
+
+		return taskSlot;
+	}
+
+	private void checkInit() {
+		Preconditions.checkState(started, "The " + TaskSlotTable.class.getSimpleName() + " has to be started.");
+	}
+
+	// ---------------------------------------------------------------------
+	// Static utility classes
+	// ---------------------------------------------------------------------
+
+	/**
+	 * Mapping class between a {@link Task} and a {@link TaskSlot}.
+	 */
+	private static final class TaskSlotMapping {
+		private final Task task;
+		private final TaskSlot taskSlot;
+
+
+		private TaskSlotMapping(Task task, TaskSlot taskSlot) {
+			this.task = Preconditions.checkNotNull(task);
+			this.taskSlot = Preconditions.checkNotNull(taskSlot);
+		}
+
+		public Task getTask() {
+			return task;
+		}
+
+		public TaskSlot getTaskSlot() {
+			return taskSlot;
+		}
+	}
+
+	/**
+	 * Iterator over {@link AllocationID} of the {@link TaskSlot} of a given job. Additionally,
+	 * the task slots identified by the allocation ids are in the given state.
+	 */
+	private final class AllocationIDIterator implements Iterator<AllocationID> {
+		private final Iterator<TaskSlot> iterator;
+
+		private AllocationIDIterator(JobID jobId, TaskSlotState state) {
+				iterator = new TaskSlotIterator(jobId, state);
+		}
+
+		@Override
+		public boolean hasNext() {
+			return iterator.hasNext();
+		}
+
+		@Override
+		public AllocationID next() {
+			try {
+				return iterator.next().getAllocationId();
+			} catch (NoSuchElementException e) {
+				throw new NoSuchElementException("No more allocation ids.");
+			}
+		}
+
+		@Override
+		public void remove() {
+			throw new UnsupportedOperationException("Cannot remove allocation ids via this iterator.");
+		}
+	}
+
+	/**
+	 * Iterator over {@link TaskSlot} which fulfill a given state condition and belong to the given
+	 * job.
+	 */
+	private final class TaskSlotIterator implements Iterator<TaskSlot> {
+		private final Iterator<AllocationID> allSlots;
+		private final TaskSlotState state;
+
+		private TaskSlot currentSlot;
+
+		private TaskSlotIterator(JobID jobId, TaskSlotState state) {
+
+			Set<AllocationID> allocationIds = slotsPerJob.get(jobId);
+
+			if (allocationIds == null || allocationIds.isEmpty()) {
+				allSlots = Collections.emptyIterator();
+			} else {
+				allSlots = allocationIds.iterator();
+			}
+
+			this.state = Preconditions.checkNotNull(state);
+
+			this.currentSlot = null;
+		}
+
+		@Override
+		public boolean hasNext() {
+			while (currentSlot == null && allSlots.hasNext()) {
+				AllocationID tempSlot = allSlots.next();
+
+				TaskSlot taskSlot = getTaskSlot(tempSlot);
+
+				if (taskSlot != null && taskSlot.getState() == state) {
+					currentSlot = taskSlot;
+				}
+			}
+
+			return currentSlot != null;
+		}
+
+		@Override
+		public TaskSlot next() {
+			if (currentSlot != null) {
+				TaskSlot result = currentSlot;
+
+				currentSlot = null;
+
+				return result;
+			} else {
+				while (true) {
+					AllocationID tempSlot;
+
+					try {
+						tempSlot = allSlots.next();
+					} catch (NoSuchElementException e) {
+						throw new NoSuchElementException("No more task slots.");
+					}
+
+					TaskSlot taskSlot = getTaskSlot(tempSlot);
+
+					if (taskSlot != null && taskSlot.getState() == state) {
+						return taskSlot;
+					}
+				}
+			}
+		}
+
+		@Override
+		public void remove() {
+			throw new UnsupportedOperationException("Cannot remove task slots via this iterator.");
+		}
+	}
+
+	/**
+	 * Iterator over all {@link Task} for a given job
+	 */
+	private final class TaskIterator implements Iterator<Task> {
+		private final Iterator<TaskSlot> taskSlotIterator;
+
+		private Iterator<Task> currentTasks;
+
+		private TaskIterator(JobID jobId) {
+			this.taskSlotIterator = new TaskSlotIterator(jobId, TaskSlotState.ACTIVE);
+
+			this.currentTasks = null;
+		}
+
+		@Override
+		public boolean hasNext() {
+			while ((currentTasks == null || !currentTasks.hasNext()) && taskSlotIterator.hasNext()) {
+				TaskSlot taskSlot = taskSlotIterator.next();
+
+				currentTasks = taskSlot.getTasks();
+			}
+
+			return (currentTasks != null && currentTasks.hasNext());
+		}
+
+		@Override
+		public Task next() {
+			while ((currentTasks == null || !currentTasks.hasNext())) {
+				TaskSlot taskSlot;
+
+				try {
+					taskSlot = taskSlotIterator.next();
+				} catch (NoSuchElementException e) {
+					throw new NoSuchElementException("No more tasks.");
+				}
+
+				currentTasks = taskSlot.getTasks();
+			}
+
+			return currentTasks.next();
+		}
+
+		@Override
+		public void remove() {
+			throw new UnsupportedOperationException("Cannot remove tasks via this iterator.");
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9da76dcf/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TimeoutListener.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TimeoutListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TimeoutListener.java
new file mode 100644
index 0000000..3e75d74
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TimeoutListener.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.slot;
+
+import java.util.UUID;
+
+/**
+ * Listener for timeout events by the {@link TimerService}.
+ * @param <K> Type of the timeout key
+ */
+public interface TimeoutListener<K> {
+
+	/**
+	 * Notify the listener about the timeout for an event identified by key. Additionally the method
+	 * is called with the timeout ticket which allows to identify outdated timeout events.
+	 *
+	 * @param key identifying the timed out event
+	 * @param ticket used to check whether the timeout is still valid
+	 */
+	void notifyTimeout(K key, UUID ticket);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9da76dcf/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TimerService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TimerService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TimerService.java
new file mode 100644
index 0000000..e28e801
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TimerService.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.slot;
+
+import org.apache.flink.util.Preconditions;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Service to register timeouts for a given key. The timeouts are identified by a ticket so that
+ * newly registered timeouts for the same key can be distinguished from older timeouts.
+ *
+ * @param <K> Type of the key
+ */
+public class TimerService<K> {
+
+	/** Executor service for the scheduled timeouts */
+	private final ScheduledExecutorService scheduledExecutorService;
+
+	/** Map of currently active timeouts */
+	private final Map<K, Timeout<K>> timeouts;
+
+	/** Listener which is notified about occurring timeouts */
+	private TimeoutListener<K> timeoutListener;
+
+	public TimerService(final ScheduledExecutorService scheduledExecutorService) {
+		this.scheduledExecutorService = Preconditions.checkNotNull(scheduledExecutorService);
+
+		this.timeouts = new HashMap<>(16);
+		this.timeoutListener = null;
+	}
+
+	public void start(TimeoutListener<K> initialTimeoutListener) {
+		// sanity check; We only allow to assign a timeout listener once
+		Preconditions.checkState(!scheduledExecutorService.isShutdown());
+		Preconditions.checkState(timeoutListener == null);
+
+		this.timeoutListener = Preconditions.checkNotNull(initialTimeoutListener);
+	}
+
+	public void stop() {
+		for (K key: timeouts.keySet()) {
+			unregisterTimeout(key);
+		}
+
+		timeoutListener = null;
+
+		scheduledExecutorService.shutdown();
+	}
+
+	/**
+	 * Register a timeout for the given key which shall occur in the given delay.
+	 *
+	 * @param key for which to register the timeout
+	 * @param delay until the timeout
+	 * @param unit of the timeout delay
+	 */
+	public void registerTimeout(final K key, final long delay, final TimeUnit unit) {
+		Preconditions.checkState(timeoutListener != null, "The " + getClass().getSimpleName() +
+			" has not been started.");
+
+		if (timeouts.containsKey(key)) {
+			unregisterTimeout(key);
+		}
+
+		timeouts.put(key, new Timeout<>(timeoutListener, key, delay, unit, scheduledExecutorService));
+	}
+
+	/**
+	 * Unregister the timeout for the given key.
+	 *
+	 * @param key for which to unregister the timeout
+	 */
+	public void unregisterTimeout(K key) {
+		Timeout<K> timeout = timeouts.remove(key);
+
+		if (timeout != null) {
+			timeout.cancel();
+		}
+	}
+
+	/**
+	 * Check whether the timeout for the given key and ticket is still valid (not yet unregistered
+	 * and not yet overwritten).
+	 *
+	 * @param key for which to check the timeout
+	 * @param ticket of the timeout
+	 * @return True if the timeout ticket is still valid; otherwise false
+	 */
+	public boolean isValid(K key, UUID ticket) {
+		if (timeouts.containsKey(key)) {
+			Timeout<K> timeout = timeouts.get(key);
+
+			return timeout.getTicket().equals(ticket);
+		} else {
+			return false;
+		}
+	}
+
+	// ---------------------------------------------------------------------
+	// Static utility classes
+	// ---------------------------------------------------------------------
+
+	private static final class Timeout<K> implements Runnable {
+
+		private final TimeoutListener<K> timeoutListener;
+		private final K key;
+		private final ScheduledFuture<?> scheduledTimeout;
+		private final UUID ticket;
+
+		Timeout(
+			final TimeoutListener<K> timeoutListener,
+			final K key,
+			final long delay,
+			final TimeUnit unit,
+			final ScheduledExecutorService scheduledExecutorService) {
+
+			Preconditions.checkNotNull(scheduledExecutorService);
+
+			this.timeoutListener = Preconditions.checkNotNull(timeoutListener);
+			this.key = Preconditions.checkNotNull(key);
+			this.scheduledTimeout = scheduledExecutorService.schedule(this, delay, unit);
+			this.ticket = UUID.randomUUID();
+		}
+
+		UUID getTicket() {
+			return ticket;
+		}
+
+		void cancel() {
+			scheduledTimeout.cancel(true);
+		}
+
+		@Override
+		public void run() {
+			timeoutListener.notifyTimeout(key, ticket);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9da76dcf/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 977e563..b67737d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -133,7 +133,7 @@ public class Task implements Runnable, TaskActions {
 	private final ExecutionAttemptID executionId;
 
 	/** ID which identifies the slot in which the task is supposed to run */
-	private final AllocationID allocationID;
+	private final AllocationID allocationId;
 
 	/** TaskInfo object for this task */
 	private final TaskInfo taskInfo;
@@ -278,7 +278,7 @@ public class Task implements Runnable, TaskActions {
 		this.jobId = checkNotNull(tdd.getJobID());
 		this.vertexId = checkNotNull(tdd.getVertexID());
 		this.executionId  = checkNotNull(tdd.getExecutionId());
-		this.allocationID = checkNotNull(tdd.getAllocationID());
+		this.allocationId = checkNotNull(tdd.getAllocationID());
 		this.taskNameWithSubtask = taskInfo.getTaskNameWithSubtasks();
 		this.jobConfiguration = checkNotNull(tdd.getJobConfiguration());
 		this.taskConfiguration = checkNotNull(tdd.getTaskConfiguration());
@@ -385,8 +385,8 @@ public class Task implements Runnable, TaskActions {
 		return executionId;
 	}
 
-	public AllocationID getAllocationID() {
-		return allocationID;
+	public AllocationID getAllocationId() {
+		return allocationId;
 	}
 
 	public TaskInfo getTaskInfo() {

http://git-wip-us.apache.org/repos/asf/flink/blob/9da76dcf/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
index af8aa69..97f42b1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.io.network.partition;
 
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.io.network.api.reader.BufferReader;

http://git-wip-us.apache.org/repos/asf/flink/blob/9da76dcf/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index ecbd9b5..baae251 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -39,6 +39,7 @@ import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequ
 import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestReply;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.TestingSerialRpcService;
+import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.TestLogger;
 
@@ -83,6 +84,7 @@ public class TaskExecutorTest extends TestLogger {
 				mock(TaskManagerMetricGroup.class),
 				mock(BroadcastVariableManager.class),
 				mock(FileCache.class),
+				mock(TaskSlotTable.class),
 				mock(FatalErrorHandler.class));
 
 			taskManager.start();
@@ -139,6 +141,7 @@ public class TaskExecutorTest extends TestLogger {
 				mock(TaskManagerMetricGroup.class),
 				mock(BroadcastVariableManager.class),
 				mock(FileCache.class),
+				mock(TaskSlotTable.class),
 				mock(FatalErrorHandler.class));
 
 			taskManager.start();
@@ -211,6 +214,7 @@ public class TaskExecutorTest extends TestLogger {
 				mock(TaskManagerMetricGroup.class),
 				mock(BroadcastVariableManager.class),
 				mock(FileCache.class),
+				mock(TaskSlotTable.class),
 				mock(FatalErrorHandler.class));
 
 			taskManager.start();


Mime
View raw message