flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [3/3] flink git commit: [FLINK-1376] [runtime] Add proper shared slot release in case of a fatal TaskManager failure.
Date Mon, 09 Feb 2015 12:59:13 GMT
[FLINK-1376] [runtime] Add proper shared slot release in case of a fatal TaskManager failure.

Fixes concurrent modification exception of SharedSlot's subSlots field by synchronizing all state changing operations through the associated assignment group. Fixes deadlock where Instance.markDead first acquires InstanceLock and then by releasing the associated slots the assignment group lockcan block with a direct releaseSlot call on a SharedSlot which first acquires the assignment group lock and then the instance lock in order to return the slot to the instance.

Fixes colocation shared slot releasing. A colocation constraint is now realized as a SharedSlot in a SharedSlot where the colocated tasks allocate sub slots.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/91382bb8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/91382bb8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/91382bb8

Branch: refs/heads/release-0.8
Commit: 91382bb8c1f63dde0b11cc6f4dc9c18f29731cdd
Parents: 44b799d
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Mon Jan 12 10:58:45 2015 +0100
Committer: Robert Metzger <rmetzger@apache.org>
Committed: Mon Feb 9 13:25:30 2015 +0100

----------------------------------------------------------------------
 .../flink/runtime/executiongraph/Execution.java |  17 +-
 .../runtime/executiongraph/ExecutionGraph.java  |   1 -
 .../runtime/executiongraph/ExecutionVertex.java |  10 +-
 .../flink/runtime/instance/AllocatedSlot.java   | 189 -------
 .../apache/flink/runtime/instance/Instance.java |  81 ++-
 .../flink/runtime/instance/SharedSlot.java      | 154 ++++++
 .../flink/runtime/instance/SimpleSlot.java      | 124 +++++
 .../org/apache/flink/runtime/instance/Slot.java | 191 +++++++
 .../flink/runtime/jobmanager/JobManager.java    |   9 +-
 .../scheduler/CoLocationConstraint.java         |  12 +-
 .../scheduler/NoResourceAvailableException.java |  17 +-
 .../runtime/jobmanager/scheduler/Scheduler.java | 227 +++++---
 .../jobmanager/scheduler/SharedSlot.java        | 111 ----
 .../scheduler/SlotAllocationFuture.java         |  12 +-
 .../scheduler/SlotAllocationFutureAction.java   |   4 +-
 .../scheduler/SlotAvailabilityListener.java     |   2 +-
 .../scheduler/SlotSharingGroupAssignment.java   | 339 +++++++-----
 .../runtime/jobmanager/scheduler/SubSlot.java   |  72 ---
 .../jobmanager/web/JobManagerInfoServlet.java   | 516 ++++++++++++++++++
 .../jobmanager/web/JobmanagerInfoServlet.java   | 517 -------------------
 .../runtime/jobmanager/web/JsonFactory.java     |   4 +-
 .../runtime/jobmanager/web/WebInfoServer.java   |   2 +-
 .../profiling/impl/JobProfilingData.java        |   7 +-
 .../flink/runtime/taskmanager/TaskManager.java  |   2 +-
 .../ExecutionGraphDeploymentTest.java           |   4 +-
 .../executiongraph/ExecutionGraphTestUtils.java |   4 +-
 .../ExecutionStateProgressTest.java             |   4 +-
 .../ExecutionVertexCancelTest.java              |  22 +-
 .../ExecutionVertexDeploymentTest.java          |  16 +-
 .../ExecutionVertexSchedulingTest.java          |   8 +-
 .../runtime/instance/AllocatedSlotTest.java     |  26 +-
 .../flink/runtime/instance/InstanceTest.java    |  22 +-
 .../runtime/jobgraph/JobManagerTestUtils.java   |  16 +
 .../jobmanager/TaskManagerFailsITCase.java      |  62 +++
 .../TaskManagerFailsWithSlotSharingITCase.java  |  64 +++
 .../ScheduleWithCoLocationHintTest.java         | 136 ++---
 .../scheduler/SchedulerIsolatedTasksTest.java   |  44 +-
 .../scheduler/SchedulerSlotSharingTest.java     | 210 ++++----
 .../jobmanager/scheduler/SharedSlotsTest.java   |  73 +--
 .../scheduler/SlotAllocationFutureTest.java     |  24 +-
 .../runtime/taskmanager/TaskManagerTest.java    |   1 +
 .../src/test/resources/log4j-test.properties    |   7 +-
 42 files changed, 1913 insertions(+), 1450 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/91382bb8/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 22f0cba..1861bc6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -32,8 +32,8 @@ import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.instance.AllocatedSlot;
 import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
 import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
@@ -87,7 +87,7 @@ public class Execution {
 	
 	private volatile ExecutionState state = CREATED;
 	
-	private volatile AllocatedSlot assignedResource;  // once assigned, never changes
+	private volatile SimpleSlot assignedResource;  // once assigned, never changes
 	
 	private volatile Throwable failureCause;          // once assigned, never changes
 	
@@ -125,7 +125,7 @@ public class Execution {
 		return state;
 	}
 	
-	public AllocatedSlot getAssignedResource() {
+	public SimpleSlot getAssignedResource() {
 		return assignedResource;
 	}
 	
@@ -169,7 +169,7 @@ public class Execution {
 		
 		// sanity check
 		if (locationConstraint != null && sharingGroup == null) {
-			throw new RuntimeException("Trying to schedule with co-location constraint but without slot sharing allowed.");
+			throw new RuntimeException("Trying to schedule with co-location constraint but without slot sharing not allowed.");
 		}
 		
 		if (transitionState(CREATED, SCHEDULED)) {
@@ -185,7 +185,7 @@ public class Execution {
 				
 				future.setFutureAction(new SlotAllocationFutureAction() {
 					@Override
-					public void slotAllocated(AllocatedSlot slot) {
+					public void slotAllocated(SimpleSlot slot) {
 						try {
 							deployToSlot(slot);
 						}
@@ -200,7 +200,7 @@ public class Execution {
 				});
 			}
 			else {
-				AllocatedSlot slot = scheduler.scheduleImmediately(toSchedule);
+				SimpleSlot slot = scheduler.scheduleImmediately(toSchedule);
 				try {
 					deployToSlot(slot);
 				}
@@ -219,7 +219,7 @@ public class Execution {
 		}
 	}
 	
-	public void deployToSlot(final AllocatedSlot slot) throws JobException {
+	public void deployToSlot(final SimpleSlot slot) throws JobException {
 		// sanity checks
 		if (slot == null) {
 			throw new NullPointerException();
@@ -470,7 +470,6 @@ public class Execution {
 	// --------------------------------------------------------------------------------------------
 	
 	private boolean processFail(Throwable t, boolean isCallback) {
-		
 		// damn, we failed. This means only that we keep our books and notify our parent JobExecutionVertex
 		// the actual computation on the task manager is cleaned up by the TaskManager that noticed the failure
 		
@@ -572,7 +571,7 @@ public class Execution {
 	}
 	
 	private void sendCancelRpcCall() {
-		final AllocatedSlot slot = this.assignedResource;
+		final SimpleSlot slot = this.assignedResource;
 		if (slot == null) {
 			return;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/91382bb8/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 29f157c..1f9cf26 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -390,7 +390,6 @@ public class ExecutionGraph {
 			long now = System.currentTimeMillis();
 			long deadline = timeout == 0 ? Long.MAX_VALUE : now + timeout;
 			
-			
 			while (now < deadline && !state.isTerminalState()) {
 				progressLock.wait(deadline - now);
 				now = System.currentTimeMillis();

http://git-wip-us.apache.org/repos/asf/flink/blob/91382bb8/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 26dd19e..57e441e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -28,12 +28,12 @@ import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
 
 import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.instance.SimpleSlot;
 import org.slf4j.Logger;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.deployment.GateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.instance.AllocatedSlot;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobEdge;
@@ -162,7 +162,7 @@ public class ExecutionVertex {
 		return currentExecution.getFailureCause();
 	}
 	
-	public AllocatedSlot getCurrentAssignedResource() {
+	public SimpleSlot getCurrentAssignedResource() {
 		return currentExecution.getAssignedResource();
 	}
 	
@@ -291,7 +291,7 @@ public class ExecutionVertex {
 			ExecutionEdge[] sources = inputEdges[i];
 			if (sources != null) {
 				for (int k = 0; k < sources.length; k++) {
-					AllocatedSlot sourceSlot = sources[k].getSource().getProducer().getCurrentAssignedResource();
+					SimpleSlot sourceSlot = sources[k].getSource().getProducer().getCurrentAssignedResource();
 					if (sourceSlot != null) {
 						locations.add(sourceSlot.getInstance());
 						if (locations.size() > MAX_DISTINCT_LOCATIONS_TO_CONSIDER) {
@@ -342,7 +342,7 @@ public class ExecutionVertex {
 		this.currentExecution.scheduleForExecution(scheduler, queued);
 	}
 	
-	public void deployToSlot(AllocatedSlot slot) throws JobException {
+	public void deployToSlot(SimpleSlot slot) throws JobException {
 		this.currentExecution.deployToSlot(slot);
 	}
 	
@@ -385,7 +385,7 @@ public class ExecutionVertex {
 		getExecutionGraph().notifyExecutionChange(getJobvertexId(), subTaskIndex, executionId, newState, error);
 	}
 	
-	TaskDeploymentDescriptor createDeploymentDescriptor(ExecutionAttemptID executionId, AllocatedSlot slot) {
+	TaskDeploymentDescriptor createDeploymentDescriptor(ExecutionAttemptID executionId, SimpleSlot slot) {
 		//  create the input gate deployment descriptors
 		List<GateDeploymentDescriptor> inputGates = new ArrayList<GateDeploymentDescriptor>(inputEdges.length);
 		for (ExecutionEdge[] channels : inputEdges) {

http://git-wip-us.apache.org/repos/asf/flink/blob/91382bb8/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java
deleted file mode 100644
index b3ffbb1..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.instance;
-
-import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
-import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
-
-import org.apache.flink.runtime.executiongraph.Execution;
-import org.apache.flink.runtime.jobgraph.JobID;
-import org.apache.flink.runtime.jobmanager.scheduler.Locality;
-
-/**
- * An allocated slot is the unit in which resources are allocated on instances.
- */
-public class AllocatedSlot {
-	
-	private static final AtomicIntegerFieldUpdater<AllocatedSlot> STATUS_UPDATER = 
-			AtomicIntegerFieldUpdater.newUpdater(AllocatedSlot.class, "status");
-	
-	private static final AtomicReferenceFieldUpdater<AllocatedSlot, Execution> VERTEX_UPDATER =
-			AtomicReferenceFieldUpdater.newUpdater(AllocatedSlot.class, Execution.class, "executedTask");
-	
-	private static final int ALLOCATED_AND_ALIVE = 0;		// tasks may be added and might be running
-	private static final int CANCELLED = 1;					// no more tasks may run
-	private static final int RELEASED = 2;					// has been given back to the instance
-
-	
-	/** The ID of the job this slice belongs to. */
-	private final JobID jobID;
-	
-	/** The instance on which the slot is allocated */
-	private final Instance instance;
-	
-	/** The number of the slot on which the task is deployed */
-	private final int slotNumber;
-	
-	/** Task being executed in the slot. Volatile to force a memory barrier and allow for correct double-checking */
-	private volatile Execution executedTask;
-	
-	/** The state of the vertex, only atomically updated */
-	private volatile int status = ALLOCATED_AND_ALIVE;
-	
-	private Locality locality = Locality.UNCONSTRAINED;
-	
-
-	public AllocatedSlot(JobID jobID, Instance instance, int slotNumber) {
-		if (jobID == null || instance == null || slotNumber < 0) {
-			throw new IllegalArgumentException();
-		}
-		
-		this.jobID = jobID;
-		this.instance = instance;
-		this.slotNumber = slotNumber;
-	}
-
-	// --------------------------------------------------------------------------------------------
-	
-	/**
-	 * Returns the ID of the job this allocated slot belongs to.
-	 * 
-	 * @return the ID of the job this allocated slot belongs to
-	 */
-	public JobID getJobID() {
-		return this.jobID;
-	}
-	
-	public Instance getInstance() {
-		return instance;
-	}
-	
-	public int getSlotNumber() {
-		return slotNumber;
-	}
-	
-	public Execution getExecutedVertex() {
-		return executedTask;
-	}
-	
-	public Locality getLocality() {
-		return locality;
-	}
-	
-	public void setLocality(Locality locality) {
-		this.locality = locality;
-	}
-	
-	public boolean setExecutedVertex(Execution executedVertex) {
-		if (executedVertex == null) {
-			throw new NullPointerException();
-		}
-		
-		// check that we can actually run in this slot
-		if (status != ALLOCATED_AND_ALIVE) {
-			return false;
-		}
-		
-		// atomically assign the vertex
-		if (!VERTEX_UPDATER.compareAndSet(this, null, executedVertex)) {
-			return false;
-		}
-
-		// we need to do a double check that we were not cancelled in the meantime
-		if (status != ALLOCATED_AND_ALIVE) {
-			this.executedTask = null;
-			return false;
-		}
-		
-		return true;
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	//  Status and life cycle
-	// --------------------------------------------------------------------------------------------
-	
-	public boolean isAlive() {
-		return status == ALLOCATED_AND_ALIVE;
-	}
-	
-	public boolean isCanceled() {
-		return status != ALLOCATED_AND_ALIVE;
-	}
-	
-	public boolean isReleased() {
-		return status == RELEASED;
-	}
-	
-	
-	public void cancel() {
-		if (STATUS_UPDATER.compareAndSet(this, ALLOCATED_AND_ALIVE, CANCELLED)) {
-			// kill all tasks currently running in this slot
-			Execution exec = this.executedTask;
-			if (exec != null && !exec.isFinished()) {
-				exec.fail(new Exception("The slot in which the task was scheduled has been killed (probably loss of TaskManager)."));
-			}
-		}
-	}
-	
-	public void releaseSlot() {
-		// cancel everything, if there is something. since this is atomically status based,
-		// it will not happen twice if another attempt happened before or concurrently
-		try {
-			cancel();
-		} finally {
-			this.instance.returnAllocatedSlot(this);
-		}
-	}
-	
-	protected boolean markReleased() {
-		return STATUS_UPDATER.compareAndSet(this, CANCELLED, RELEASED);
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	//  Utilities
-	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	public String toString() {
-		return instance.getId() + " (" + slotNumber + ") - " + getStateName(status);
-	}
-	
-	private static final String getStateName(int state) {
-		switch (state) {
-		case ALLOCATED_AND_ALIVE:
-			return "ALLOCATED/ALIVE";
-		case CANCELLED:
-			return "CANCELLED";
-		case RELEASED:
-			return "RELEASED";
-		default:
-			return "(unknown)";
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/91382bb8/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
index 89453bf..19e6690 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
@@ -27,9 +27,11 @@ import java.util.List;
 import java.util.Queue;
 import java.util.Set;
 
+import org.apache.flink.runtime.AbstractID;
 import org.apache.flink.runtime.ipc.RPC;
 import org.apache.flink.runtime.jobgraph.JobID;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotAvailabilityListener;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroupAssignment;
 import org.apache.flink.runtime.net.NetUtils;
 import org.apache.flink.runtime.protocols.TaskOperationProtocol;
 import org.eclipse.jetty.util.log.Log;
@@ -58,7 +60,7 @@ public class Instance {
 	private final Queue<Integer> availableSlots;
 	
 	/** Allocated slots on this instance */
-	private final Set<AllocatedSlot> allocatedSlots = new HashSet<AllocatedSlot>();
+	private final Set<Slot> allocatedSlots = new HashSet<Slot>();
 
 	
 	/** A listener to be notified upon new slot availability */
@@ -156,24 +158,31 @@ public class Instance {
 		}
 	}
 	public void markDead() {
-		if (isDead) {
-			return;
-		}
-		
-		isDead = true;
-		
 		synchronized (instanceLock) {
-			
+			if (isDead) {
+				return;
+			}
+
+			isDead = true;
+
 			// no more notifications for the slot releasing
 			this.slotAvailabilityListener = null;
-			
-			for (AllocatedSlot slot : allocatedSlots) {
-				slot.releaseSlot();
-			}
+		}
+
+		/*
+		 * releaseSlot must not own the instanceLock in order to avoid dead locks where a slot
+		 * owning the assignment group lock wants to give itself back to the instance which requires
+		 * the instance lock
+		 */
+		for (Slot slot : allocatedSlots) {
+			slot.releaseSlot();
+		}
+
+		synchronized (instanceLock) {
 			allocatedSlots.clear();
 			availableSlots.clear();
 		}
-		
+
 		destroyTaskManagerProxy();
 	}
 	
@@ -252,8 +261,12 @@ public class Instance {
 	// --------------------------------------------------------------------------------------------
 	// Resource allocation
 	// --------------------------------------------------------------------------------------------
+
+	public SimpleSlot allocateSimpleSlot(JobID jobID) throws InstanceDiedException {
+		return allocateSimpleSlot(jobID, jobID);
+	}
 	
-	public AllocatedSlot allocateSlot(JobID jobID) throws InstanceDiedException {
+	public SimpleSlot allocateSimpleSlot(JobID jobID, AbstractID groupID) throws InstanceDiedException {
 		if (jobID == null) {
 			throw new IllegalArgumentException();
 		}
@@ -267,14 +280,37 @@ public class Instance {
 			if (nextSlot == null) {
 				return null;
 			} else {
-				AllocatedSlot slot = new AllocatedSlot(jobID, this, nextSlot);
+				SimpleSlot slot = new SimpleSlot(jobID, this, nextSlot, null, groupID);
 				allocatedSlots.add(slot);
 				return slot;
 			}
 		}
 	}
-	
-	public boolean returnAllocatedSlot(AllocatedSlot slot) {
+
+	public SharedSlot allocateSharedSlot(JobID jobID, SlotSharingGroupAssignment sharingGroupAssignment, AbstractID groupID)
+			throws InstanceDiedException {
+		if (jobID == null) {
+			throw new IllegalArgumentException();
+		}
+
+		synchronized (instanceLock) {
+			if (isDead) {
+				throw new InstanceDiedException(this);
+			}
+
+			Integer nextSlot = availableSlots.poll();
+			if (nextSlot == null) {
+				return null;
+			} else {
+				SharedSlot slot = new SharedSlot(jobID, this, nextSlot,
+						sharingGroupAssignment, null, groupID);
+				allocatedSlots.add(slot);
+				return slot;
+			}
+		}
+	}
+
+	public boolean returnAllocatedSlot(Slot slot) {
 		// the slot needs to be in the returned to instance state
 		if (slot == null || slot.getInstance() != this) {
 			throw new IllegalArgumentException("Slot is null or belongs to the wrong instance.");
@@ -307,14 +343,15 @@ public class Instance {
 	}
 	
 	public void cancelAndReleaseAllSlots() {
+		List<Slot> copy;
+
 		synchronized (instanceLock) {
 			// we need to do this copy because of concurrent modification exceptions
-			List<AllocatedSlot> copy = new ArrayList<AllocatedSlot>(this.allocatedSlots);
+			copy = new ArrayList<Slot>(this.allocatedSlots);
+		}
 			
-			for (AllocatedSlot slot : copy) {
-				slot.releaseSlot();
-			}
-			allocatedSlots.clear();
+		for (Slot slot : copy) {
+			slot.releaseSlot();
 		}
 	}
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/91382bb8/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java
new file mode 100644
index 0000000..2efcf6c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.instance;
+
+import org.apache.flink.runtime.AbstractID;
+import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroupAssignment;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * This class represents a shared slot. A shared slot can have multiple
+ * {@link org.apache.flink.runtime.instance.SimpleSlot} instances within itself. This allows to
+ * schedule multiple tasks simultaneously, enabling Flink's streaming capabilities.
+ *
+ * IMPORTANT: This class contains no synchronization. Thus, the caller has to guarantee proper
+ * synchronization. In the current implementation, all concurrently modifying operations are
+ * passed through a {@link SlotSharingGroupAssignment} object which is responsible for
+ * synchronization.
+ *
+ */
+public class SharedSlot extends Slot {
+
+	private final SlotSharingGroupAssignment assignmentGroup;
+
+	private final Set<Slot> subSlots;
+
+	public SharedSlot(JobID jobID, Instance instance, int slotNumber,
+					SlotSharingGroupAssignment assignmentGroup, SharedSlot parent,
+					AbstractID groupID) {
+		super(jobID, instance, slotNumber, parent, groupID);
+
+		this.assignmentGroup = assignmentGroup;
+		this.subSlots = new HashSet<Slot>();
+	}
+
+	public Set<Slot> getSubSlots() {
+		return subSlots;
+	}
+
+	/**
+	 * Removes the simple slot from the {@link org.apache.flink.runtime.instance.SharedSlot}. Should
+	 * only be called through the
+	 * {@link org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroupAssignment} attribute
+	 * assignmnetGroup.
+	 *
+	 * @param slot slot to be removed from the set of sub slots.
+	 * @return Number of remaining sub slots
+	 */
+	public int freeSubSlot(Slot slot){
+		if(!subSlots.remove(slot)){
+			throw new IllegalArgumentException("Wrong shared slot for sub slot.");
+		}
+
+		return subSlots.size();
+	}
+
+	@Override
+	public int getNumberLeaves() {
+		int result = 0;
+
+		for(Slot slot: subSlots){
+			result += slot.getNumberLeaves();
+		}
+
+		return result;
+	}
+
+	@Override
+	public void cancel() {
+		// Guarantee that the operation is only executed once
+		if (markCancelled()) {
+			assignmentGroup.releaseSharedSlot(this);
+		}
+	}
+
+	/**
+	 * Release this shared slot. In order to do this:
+	 *
+	 * 1. Cancel and release all sub slots atomically with respect to the assigned assignment group.
+	 * 2. Set the state of the shared slot to be cancelled.
+	 * 3. Dispose the shared slot (returning the slot to the instance).
+	 *
+	 * After cancelAndReleaseSubSlots, the shared slot is marked to be dead. This prevents further
+	 * sub slot creation by the scheduler.
+	 */
+	@Override
+	public void releaseSlot() {
+		assignmentGroup.releaseSharedSlot(this);
+	}
+
+	/**
+	 * Creates a new sub slot if the slot is not dead, yet. This method should only be called from
+	 * the assignment group instance to guarantee synchronization.
+	 *
+	 * @param jID id to identify tasks which can be deployed in this sub slot
+	 * @return new sub slot if the shared slot is still alive, otherwise null
+	 */
+	public SimpleSlot allocateSubSlot(AbstractID jID){
+		if(isDead()){
+			return null;
+		} else {
+			SimpleSlot slot = new SimpleSlot(jobID, instance, subSlots.size(), this, jID);
+			subSlots.add(slot);
+
+			return slot;
+		}
+	}
+
+	public SharedSlot allocateSharedSlot(AbstractID jID){
+		if(isDead()){
+			return null;
+		} else {
+			SharedSlot slot = new SharedSlot(jobID, instance, subSlots.size(), assignmentGroup, this, jID);
+			subSlots.add(slot);
+
+			return slot;
+		}
+	}
+
+	/**
+	 * Disposes the given sub slot. This
+	 * is done by the means of the assignmentGroup in order to synchronize the method. If the
+	 * disposed slot was the last sub slot, then the shared slot is marked to be cancelled and is
+	 * disposed/returned to the owning instance.
+	 *
+	 * @param slot sub slot which shall be removed from the shared slot
+	 */
+	public void disposeChild(SimpleSlot slot){
+		assignmentGroup.releaseSimpleSlot(slot);
+	}
+
+	@Override
+	public String toString() {
+		return "Shared " + super.toString();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/91382bb8/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java
new file mode 100644
index 0000000..5b1af57
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.instance;
+
+import org.apache.flink.runtime.AbstractID;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+
+/**
+ * Class which represents a single slot on a machine or within a shared slot. If this slot is part
+ * of a [[SharedSlot]], then its parent attribute is set to this instance. If not, then the parent
+ * attribute is null.
+ *
+ * IMPORTANT: This class has no synchronization. Thus it has to be synchronized by the calling
+ * object.
+ */
+public class SimpleSlot extends Slot {
+
+	private static final AtomicReferenceFieldUpdater<SimpleSlot, Execution> VERTEX_UPDATER =
+			AtomicReferenceFieldUpdater.newUpdater(SimpleSlot.class, Execution.class, "executedTask");
+
+	/** Task being executed in the slot. Volatile to force a memory barrier and allow for correct double-checking */
+	private volatile Execution executedTask;
+
+	private Locality locality = Locality.UNCONSTRAINED;
+
+	public SimpleSlot(JobID jobID, Instance instance, int slotNumber, SharedSlot parent, AbstractID groupID){
+		super(jobID, instance, slotNumber, parent, groupID);
+	}
+
+	@Override
+	public int getNumberLeaves() {
+		return 1;
+	}
+
+
+	public Execution getExecution() {
+		return executedTask;
+	}
+
+	public Locality getLocality() {
+		return locality;
+	}
+
+	public void setLocality(Locality locality) {
+		this.locality = locality;
+	}
+
+	public boolean setExecutedVertex(Execution executedVertex) {
+		if (executedVertex == null) {
+			throw new NullPointerException();
+		}
+
+		// check that we can actually run in this slot
+		if (status != ALLOCATED_AND_ALIVE) {
+			return false;
+		}
+
+		// atomically assign the vertex
+		if (!VERTEX_UPDATER.compareAndSet(this, null, executedVertex)) {
+			return false;
+		}
+
+		// we need to do a double check that we were not cancelled in the meantime
+		if (status != ALLOCATED_AND_ALIVE) {
+			this.executedTask = null;
+			return false;
+		}
+
+		return true;
+	}
+
+	@Override
+	public void cancel() {
+		if (markCancelled()) {
+			// kill all tasks currently running in this slot
+			Execution exec = this.executedTask;
+			if (exec != null && !exec.isFinished()) {
+				exec.fail(new Exception("The slot in which the task was scheduled has been killed (probably loss of TaskManager)."));
+			}
+		}
+	}
+
+	@Override
+	public void releaseSlot() {
+		// cancel everything, if there is something. since this is atomically status based,
+		// it will not happen twice if another attempt happened before or concurrently
+		try {
+			cancel();
+		} finally {
+			if (getParent() != null) {
+				// we have to ask our parent to dispose us
+				getParent().disposeChild(this);
+			} else {
+				// we have to give back the slot to the owning instance
+				instance.returnAllocatedSlot(this);
+			}
+		}
+	}
+
+	@Override
+	public String toString() {
+		return "SimpleSlot " + super.toString();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/91382bb8/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java
new file mode 100644
index 0000000..fb62c4c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.instance;
+
+import org.apache.flink.runtime.AbstractID;
+import org.apache.flink.runtime.jobgraph.JobID;
+
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+
+/**
+ * Base class for slots.
+ */
+public abstract class Slot {
+	protected static final AtomicIntegerFieldUpdater<Slot> STATUS_UPDATER =
+			AtomicIntegerFieldUpdater.newUpdater(Slot.class, "status");
+
+	protected static final int ALLOCATED_AND_ALIVE = 0;		// tasks may be added and might be running
+	protected static final int CANCELLED = 1;					// no more tasks may run
+	protected static final int RELEASED = 2;					// has been given back to the instance
+
+	/** The ID of the job this slice belongs to. */
+	protected final JobID jobID;
+
+	/** The instance on which the slot is allocated */
+	protected final Instance instance;
+
+	/** The number of the slot on which the task is deployed */
+	protected final int slotNumber;
+
+	/** The state of the vertex, only atomically updated */
+	protected volatile int status = ALLOCATED_AND_ALIVE;
+
+	/** Indicates whether this slot was marked dead by the system */
+	private boolean dead = false;
+
+	private final AbstractID groupID;
+
+	private final SharedSlot parent;
+
+	private boolean disposed = false;
+
+
+	public Slot(JobID jobID, Instance instance, int slotNumber, SharedSlot parent, AbstractID groupID) {
+		if (jobID == null || instance == null || slotNumber < 0) {
+			throw new IllegalArgumentException();
+		}
+
+		this.jobID = jobID;
+		this.instance = instance;
+		this.slotNumber = slotNumber;
+		this.parent = parent;
+		this.groupID = groupID;
+
+	}
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * Returns the ID of the job this allocated slot belongs to.
+	 *
+	 * @return the ID of the job this allocated slot belongs to
+	 */
+	public JobID getJobID() {
+		return this.jobID;
+	}
+
+	public Instance getInstance() {
+		return instance;
+	}
+
+	public int getSlotNumber() {
+		return slotNumber;
+	}
+
+	public AbstractID getGroupID() {
+		return groupID;
+	}
+
+	public SharedSlot getParent() {
+		return parent;
+	}
+
+	public Slot getRoot() {
+		if(parent == null){
+			return this;
+		} else {
+			return parent.getRoot();
+		}
+	}
+
+	public abstract int getNumberLeaves();
+
+	// --------------------------------------------------------------------------------------------
+	//  Status and life cycle
+	// --------------------------------------------------------------------------------------------
+
+	public boolean isAlive() {
+		return status == ALLOCATED_AND_ALIVE;
+	}
+
+	public boolean isCanceled() {
+		return status != ALLOCATED_AND_ALIVE;
+	}
+
+	public boolean isReleased() {
+		return status == RELEASED;
+	}
+
+	public abstract void cancel();
+
+	public abstract void releaseSlot();
+
+	public boolean markReleased() {
+		return STATUS_UPDATER.compareAndSet(this, CANCELLED, RELEASED);
+	}
+
+	public boolean markCancelled() {
+		return STATUS_UPDATER.compareAndSet(this, ALLOCATED_AND_ALIVE, CANCELLED);
+	}
+
+	/**
+	 * Marks this shared slot to be dead. Returns if the slot was alive before. Should only
+	 * be called through the {@link org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroupAssignment} attribute assignmentGroup.
+	 *
+	 * @return if the slot was alive before
+	 */
+	public boolean markDead() {
+		boolean result = !dead;
+
+		dead = true;
+
+		return result;
+	}
+
+	public boolean isDead() {
+		return dead;
+	}
+
+	public boolean markDisposed() {
+		boolean result = !disposed;
+
+		disposed = true;
+
+		return result;
+	}
+
+	public boolean isDisposed() {
+		return disposed;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Utilities
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public String toString() {
+		return hierarchy() + " - " + instance.getId() + " - " + getStateName(status);
+	}
+
+	protected String hierarchy() {
+		return "(" + slotNumber + ")" + (getParent() != null ? getParent().hierarchy() : "");
+	}
+
+	private static final String getStateName(int state) {
+		switch (state) {
+			case ALLOCATED_AND_ALIVE:
+				return "ALLOCATED/ALIVE";
+			case CANCELLED:
+				return "CANCELLED";
+			case RELEASED:
+				return "RELEASED";
+			default:
+				return "(unknown)";
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/91382bb8/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
index 13b3c3d..80607c2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
@@ -63,7 +63,6 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.JobStatusListener;
-import org.apache.flink.runtime.instance.AllocatedSlot;
 import org.apache.flink.runtime.instance.Hardware;
 import org.apache.flink.runtime.instance.HardwareDescription;
 import org.apache.flink.runtime.instance.Instance;
@@ -71,6 +70,7 @@ import org.apache.flink.runtime.instance.InstanceConnectionInfo;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.instance.InstanceManager;
 import org.apache.flink.runtime.instance.LocalInstanceManager;
+import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.io.network.ConnectionInfoLookupResponse;
 import org.apache.flink.runtime.io.network.channels.ChannelID;
 import org.apache.flink.runtime.ipc.RPC;
@@ -465,7 +465,6 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide
 	public boolean updateTaskExecutionState(TaskExecutionState executionState) throws IOException {
 		Preconditions.checkNotNull(executionState);
 
-
 		final ExecutionGraph eg = this.currentJobs.get(executionState.getJobID());
 		if (eg == null) {
 			if (LOG.isDebugEnabled()) {
@@ -475,7 +474,9 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide
 			return false;
 		}
 
-		return eg.updateState(executionState);
+		boolean result =  eg.updateState(executionState);
+
+		return result;
 	}
 	
 	@Override
@@ -505,7 +506,7 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide
 		if(execution == null) {
 			LOG.error("Can not find Execution for attempt " + executionAttempt);
 		} else {
-			AllocatedSlot slot = execution.getAssignedResource();
+			SimpleSlot slot = execution.getAssignedResource();
 			if(slot != null) {
 				host = slot.getInstance().getInstanceConnectionInfo().getHostname();
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/91382bb8/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java
index 36430de..902b5a1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java
@@ -22,6 +22,7 @@ import org.apache.flink.runtime.AbstractID;
 import org.apache.flink.runtime.instance.Instance;
 
 import com.google.common.base.Preconditions;
+import org.apache.flink.runtime.instance.SharedSlot;
 
 public class CoLocationConstraint {
 	
@@ -42,7 +43,7 @@ public class CoLocationConstraint {
 	
 	public Instance getLocation() {
 		if (sharedSlot != null) {
-			return sharedSlot.getAllocatedSlot().getInstance();
+			return sharedSlot.getInstance();
 		} else {
 			throw new IllegalStateException("Not assigned");
 		}
@@ -52,7 +53,7 @@ public class CoLocationConstraint {
 		if (this.sharedSlot == sharedSlot) {
 			return;
 		}
-		else if (this.sharedSlot == null || this.sharedSlot.isDisposed()) {
+		else if (this.sharedSlot == null || this.sharedSlot.isDead()) {
 			this.sharedSlot = sharedSlot;
 		} else {
 			throw new IllegalStateException("Overriding shared slot that is still alive.");
@@ -64,12 +65,17 @@ public class CoLocationConstraint {
 	}
 	
 	public boolean isUnassignedOrDisposed() {
-		return this.sharedSlot == null || this.sharedSlot.isDisposed();
+		return this.sharedSlot == null || this.sharedSlot.isDead();
 	}
 	
 	public AbstractID getGroupId() {
 		return this.group.getId();
 	}
+
+	@Override
+	public String toString() {
+		return "CoLocation constraint id " + getGroupId() + " shared slot " + sharedSlot;
+	}
 	
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/91382bb8/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java
index 5df7d48..961ae96 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java
@@ -36,14 +36,19 @@ public class NoResourceAvailableException extends JobException {
 				+ ". You can decrease the operator parallelism or increase the number of slots per TaskManager in the configuration.");
 	}
 	
-	NoResourceAvailableException(int numInstances, int numSlotsTotal) {
-		super(String.format("%s Resources available to scheduler: Number of instances=%d, total number of slots=%d", 
-				BASE_MESSAGE, numInstances, numSlotsTotal));
+	public NoResourceAvailableException(int numInstances, int numSlotsTotal, int availableSlots) {
+		super(String.format("%s Resources available to scheduler: Number of instances=%d, total number of slots=%d, available slots=%d",
+				BASE_MESSAGE, numInstances, numSlotsTotal, availableSlots));
 	}
 	
-	NoResourceAvailableException(ScheduledUnit task, int numInstances, int numSlotsTotal) {
-		super(String.format("%s Task to schedule: < %s > in sharing group < %s >. Resources available to scheduler: Number of instances=%d, total number of slots=%d", 
-				BASE_MESSAGE, task.getTaskToExecute(), task.getSlotSharingGroup(), numInstances, numSlotsTotal));
+	NoResourceAvailableException(ScheduledUnit task, int numInstances, int numSlotsTotal, int availableSlots) {
+		super(String.format("%s Task to schedule: < %s > with groupID < %s > in sharing group < %s >. Resources available to scheduler: Number of instances=%d, total number of slots=%d, available slots=%d",
+				BASE_MESSAGE, task.getTaskToExecute(),
+				task.getLocationConstraint() == null ? task.getTaskToExecute().getVertex().getJobvertexId() : task.getLocationConstraint().getGroupId(),
+				task.getSlotSharingGroup(),
+				numInstances,
+				numSlotsTotal,
+				availableSlots));
 	}
 
 	public NoResourceAvailableException(String message) {

http://git-wip-us.apache.org/repos/asf/flink/blob/91382bb8/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
index 7807b73..22c5405 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
@@ -28,10 +28,14 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.flink.runtime.AbstractID;
+import org.apache.flink.runtime.instance.SharedSlot;
+import org.apache.flink.runtime.instance.SimpleSlot;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.instance.AllocatedSlot;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.InstanceDiedException;
 import org.apache.flink.runtime.instance.InstanceListener;
@@ -131,10 +135,10 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener {
 	//  Scheduling
 	// --------------------------------------------------------------------------------------------
 	
-	public AllocatedSlot scheduleImmediately(ScheduledUnit task) throws NoResourceAvailableException {
+	public SimpleSlot scheduleImmediately(ScheduledUnit task) throws NoResourceAvailableException {
 		Object ret = scheduleTask(task, false);
-		if (ret instanceof AllocatedSlot) {
-			return (AllocatedSlot) ret;
+		if (ret instanceof SimpleSlot) {
+			return (SimpleSlot) ret;
 		}
 		else {
 			throw new RuntimeException();
@@ -143,8 +147,8 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener {
 	
 	public SlotAllocationFuture scheduleQueued(ScheduledUnit task) throws NoResourceAvailableException {
 		Object ret = scheduleTask(task, true);
-		if (ret instanceof AllocatedSlot) {
-			return new SlotAllocationFuture((AllocatedSlot) ret);
+		if (ret instanceof SimpleSlot) {
+			return new SlotAllocationFuture((SimpleSlot) ret);
 		}
 		if (ret instanceof SlotAllocationFuture) {
 			return (SlotAllocationFuture) ret;
@@ -155,7 +159,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener {
 	}
 	
 	/**
-	 * Returns either an {@link AllocatedSlot}, or an {@link SlotAllocationFuture}.
+	 * Returns either an {@link org.apache.flink.runtime.instance.AllocatedSlot2}, or an {@link SlotAllocationFuture}.
 	 */
 	private Object scheduleTask(ScheduledUnit task, boolean queueIfNoResource) throws NoResourceAvailableException {
 		if (task == null) {
@@ -165,7 +169,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener {
 		if (LOG.isDebugEnabled()) {
 			LOG.debug("Scheduling task " + task);
 		}
-		
+
 		final ExecutionVertex vertex = task.getTaskToExecute().getVertex();
 	
 		synchronized (globalLock) {
@@ -183,15 +187,15 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener {
 				final CoLocationConstraint constraint = task.getLocationConstraint();
 				
 				// get a slot from the group, if the group has one for us (and can fulfill the constraint)
-				SubSlot slotFromGroup;
+				SimpleSlot slotFromGroup;
 				if (constraint == null) {
 					slotFromGroup = assignment.getSlotForTask(vertex);
 				}
 				else {
 					slotFromGroup = assignment.getSlotForTask(vertex, constraint);
 				}
-				
-				AllocatedSlot newSlot = null;
+
+				SimpleSlot newSlot = null;
 				
 				// the following needs to make sure any allocated slot is released in case of an error
 				try {
@@ -209,15 +213,15 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener {
 							vertex.getPreferredLocations() : Collections.singleton(constraint.getLocation());
 					
 					// get a new slot, since we could not place it into the group, or we could not place it locally
-					newSlot = getFreeSlotForTask(vertex, locations);
-					
-					SubSlot toUse;
+					newSlot = getFreeSubSlotForTask(vertex, locations, assignment, constraint);
+
+					SimpleSlot toUse;
 					
 					if (newSlot == null) {
 						if (slotFromGroup == null) {
 							// both null
 							if (constraint == null || constraint.isUnassigned()) {
-								throw new NoResourceAvailableException(task, getNumberOfAvailableInstances(), getTotalNumberOfSlots());
+								throw new NoResourceAvailableException(task, getNumberOfAvailableInstances(), getTotalNumberOfSlots(), getNumberOfAvailableSlots());
 							} else {
 								throw new NoResourceAvailableException("Could not allocate a slot on instance " + 
 											constraint.getLocation() + ", as required by the co-location constraint.");
@@ -233,11 +237,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener {
 							slotFromGroup.releaseSlot();
 						}
 						
-						if (constraint == null) {
-							toUse = assignment.addNewSlotWithTask(newSlot, vertex);
-						} else {
-							toUse = assignment.addNewSlotWithTask(newSlot, vertex, constraint);
-						}
+						toUse = newSlot;
 					}
 					else {
 						// both are available and usable. neither is local
@@ -249,7 +249,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener {
 					// if it was assigned before and the new one is not local, it is a fail
 					if (constraint != null) {
 						if (constraint.isUnassigned() || toUse.getLocality() == Locality.LOCAL) {
-							constraint.setSharedSlot(toUse.getSharedSlot());
+							constraint.setSharedSlot(toUse.getParent());
 						} else {
 							// the fail
 							throw new NoResourceAvailableException("Could not allocate a slot on instance " + 
@@ -277,7 +277,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener {
 		
 			// 2) === schedule without hints and sharing ===
 			
-			AllocatedSlot slot = getFreeSlotForTask(vertex, vertex.getPreferredLocations());
+			SimpleSlot slot = getFreeSlotForTask(vertex, vertex.getPreferredLocations());
 			if (slot != null) {
 				updateLocalityCounters(slot.getLocality());
 				return slot;
@@ -290,7 +290,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener {
 					return future;
 				}
 				else {
-					throw new NoResourceAvailableException(getNumberOfAvailableInstances(), getTotalNumberOfSlots());
+					throw new NoResourceAvailableException(getNumberOfAvailableInstances(), getTotalNumberOfSlots(), getNumberOfAvailableSlots());
 				}
 			}
 		}
@@ -304,69 +304,96 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener {
 	 * @param vertex The task to run. 
 	 * @return The instance to run the vertex on, it {@code null}, if no instance is available.
 	 */
-	protected AllocatedSlot getFreeSlotForTask(ExecutionVertex vertex, Iterable<Instance> requestedLocations) {
+	protected SimpleSlot getFreeSlotForTask(ExecutionVertex vertex, Iterable<Instance> requestedLocations) {
 		
 		// we need potentially to loop multiple times, because there may be false positives
 		// in the set-with-available-instances
 		while (true) {
-			if (this.instancesWithAvailableResources.isEmpty()) {
-				// check if the asynchronous calls did not yet return the queues
-				Instance queuedInstance = this.newlyAvailableInstances.poll();
-				if (queuedInstance == null) {
-					return null;
-				} else {
-					this.instancesWithAvailableResources.add(queuedInstance);
+			Pair<Instance, Locality> instanceLocalityPair = findInstance(requestedLocations);
+
+			if(instanceLocalityPair == null){
+				return null;
+			}
+
+			Instance instanceToUse = instanceLocalityPair.getLeft();
+			Locality locality = instanceLocalityPair.getRight();
+
+			if(LOG.isDebugEnabled()){
+				if(locality == Locality.LOCAL){
+					LOG.debug("Local assignment: " + vertex.getSimpleName() + " --> " + instanceToUse);
+				}else if(locality == Locality.NON_LOCAL){
+					LOG.debug("Non-local assignment: " + vertex.getSimpleName() + " --> " + instanceToUse);
+				}else if(locality == Locality.UNCONSTRAINED) {
+					LOG.debug("Unconstrained assignment: " + vertex.getSimpleName() + " --> " + instanceToUse);
 				}
 			}
-			
-			Iterator<Instance> locations = requestedLocations == null ? null : requestedLocations.iterator();
-			
-			Instance instanceToUse = null;
-			Locality locality = Locality.UNCONSTRAINED;
-			
-			if (locations != null && locations.hasNext()) {
-				// we have a locality preference
+
+			try {
+				SimpleSlot slot = instanceToUse.allocateSimpleSlot(vertex.getJobId(), vertex.getJobvertexId());
 				
-				while (locations.hasNext()) {
-					Instance location = locations.next();
-					
-					if (location != null && this.instancesWithAvailableResources.remove(location)) {
-						instanceToUse = location;
-						locality = Locality.LOCAL;
-						
-						if (LOG.isDebugEnabled()) {
-							LOG.debug("Local assignment: " + vertex.getSimpleName() + " --> " + location);
-						}
-						
-						break;
-					}
+				// if the instance has further available slots, re-add it to the set of available resources.
+				if (instanceToUse.hasResourcesAvailable()) {
+					this.instancesWithAvailableResources.add(instanceToUse);
 				}
 				
-				if (instanceToUse == null) {
-					instanceToUse = this.instancesWithAvailableResources.poll();
-					locality = Locality.NON_LOCAL;
-					if (LOG.isDebugEnabled()) {
-						LOG.debug("Non-local assignment: " + vertex.getSimpleName() + " --> " + instanceToUse);
-					}
+				if (slot != null) {
+					slot.setLocality(locality);
+					return slot;
 				}
 			}
-			else {
-				instanceToUse = this.instancesWithAvailableResources.poll();
-				if (LOG.isDebugEnabled()) {
+			catch (InstanceDiedException e) {
+				// the instance died it has not yet been propagated to this scheduler
+				// remove the instance from the set of available instances
+				this.allInstances.remove(instanceToUse);
+				this.instancesWithAvailableResources.remove(instanceToUse);
+			}
+			
+			// if we failed to get a slot, fall through the loop
+		}
+	}
+
+	protected SimpleSlot getFreeSubSlotForTask(ExecutionVertex vertex,
+											Iterable<Instance> requestedLocations,
+											SlotSharingGroupAssignment groupAssignment,
+											CoLocationConstraint constraint) {
+		// we need potentially to loop multiple times, because there may be false positives
+		// in the set-with-available-instances
+		while (true) {
+			Pair<Instance, Locality> instanceLocalityPair = findInstance(requestedLocations);
+
+			if(instanceLocalityPair == null){
+				return null;
+			}
+
+			Instance instanceToUse = instanceLocalityPair.getLeft();
+			Locality locality = instanceLocalityPair.getRight();
+
+			if(LOG.isDebugEnabled()){
+				if(locality == Locality.LOCAL){
+					LOG.debug("Local assignment: " + vertex.getSimpleName() + " --> " + instanceToUse);
+				}else if(locality == Locality.NON_LOCAL){
+					LOG.debug("Non-local assignment: " + vertex.getSimpleName() + " --> " + instanceToUse);
+				}else if(locality == Locality.UNCONSTRAINED) {
 					LOG.debug("Unconstrained assignment: " + vertex.getSimpleName() + " --> " + instanceToUse);
 				}
 			}
-			
+
 			try {
-				AllocatedSlot slot = instanceToUse.allocateSlot(vertex.getJobId());
-				
+				AbstractID groupID = constraint == null ? vertex.getJobvertexId() : constraint.getGroupId();
+
+				// root SharedSlot
+				SharedSlot sharedSlot = instanceToUse.allocateSharedSlot(vertex.getJobId(), groupAssignment, groupID);
+
+				// If constraint != null, then slot nested in a SharedSlot nested in sharedSlot
+				// If constraint == null, then slot nested in sharedSlot
+				SimpleSlot slot = groupAssignment.addSharedSlotAndAllocateSubSlot(sharedSlot, locality, groupID, constraint);
+
 				// if the instance has further available slots, re-add it to the set of available resources.
 				if (instanceToUse.hasResourcesAvailable()) {
 					this.instancesWithAvailableResources.add(instanceToUse);
 				}
-				
+
 				if (slot != null) {
-					slot.setLocality(locality);
 					return slot;
 				}
 			}
@@ -376,10 +403,61 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener {
 				this.allInstances.remove(instanceToUse);
 				this.instancesWithAvailableResources.remove(instanceToUse);
 			}
-			
+
 			// if we failed to get a slot, fall through the loop
 		}
 	}
+
+	/**
+	 * NOTE: This method is not thread-safe, it needs to be synchronized by the caller.
+	 *
+	 * Tries to find a requested instance. If no such instance is available it will return a non-
+	 * local instance. If no such instance exists (all slots occupied), then return null.
+	 *
+	 * @param requestedLocations
+	 * @return
+	 */
+	private Pair<Instance, Locality> findInstance(Iterable<Instance> requestedLocations){
+		if (this.instancesWithAvailableResources.isEmpty()) {
+			// check if the asynchronous calls did not yet return the queues
+			Instance queuedInstance = this.newlyAvailableInstances.poll();
+			if (queuedInstance == null) {
+				return null;
+			} else {
+				this.instancesWithAvailableResources.add(queuedInstance);
+			}
+		}
+
+		Iterator<Instance> locations = requestedLocations == null ? null : requestedLocations.iterator();
+
+		Instance instanceToUse = null;
+		Locality locality = Locality.UNCONSTRAINED;
+
+		if (locations != null && locations.hasNext()) {
+			// we have a locality preference
+
+			while (locations.hasNext()) {
+				Instance location = locations.next();
+
+				if (location != null && this.instancesWithAvailableResources.remove(location)) {
+					instanceToUse = location;
+					locality = Locality.LOCAL;
+
+					break;
+				}
+			}
+
+			if (instanceToUse == null) {
+				instanceToUse = this.instancesWithAvailableResources.poll();
+				locality = Locality.NON_LOCAL;
+			}
+		}
+		else {
+			instanceToUse = this.instancesWithAvailableResources.poll();
+		}
+
+		return new ImmutablePair<Instance, Locality>(instanceToUse, locality);
+	}
 	
 	@Override
 	public void newSlotAvailable(final Instance instance) {
@@ -393,7 +471,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener {
 		//                             (2) scheduler (to check whether to take a new task item
 		// 
 		// that leads with a high probability to deadlocks, when scheduling fast
-		
+
 		this.newlyAvailableInstances.add(instance);
 		
 		if (this.executor != null) {
@@ -428,7 +506,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener {
 				ExecutionVertex vertex = task.getTaskToExecute().getVertex();
 				
 				try {
-					AllocatedSlot newSlot = instance.allocateSlot(vertex.getJobId());
+					SimpleSlot newSlot = instance.allocateSimpleSlot(vertex.getJobId(), vertex.getJobvertexId());
 					if (newSlot != null) {
 						
 						// success, remove from the task queue and notify the future
@@ -536,7 +614,16 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener {
 	// --------------------------------------------------------------------------------------------
 
 	public int getNumberOfAvailableInstances() {
-		return allInstances.size();
+		int numberAvailableInstances = 0;
+		synchronized (this.globalLock) {
+			for(Instance instance: allInstances){
+				if(instance.isAlive()){
+					numberAvailableInstances++;
+				}
+			}
+		}
+
+		return numberAvailableInstances;
 	}
 	
 	public int getNumberOfInstancesWithAvailableSlots() {

http://git-wip-us.apache.org/repos/asf/flink/blob/91382bb8/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SharedSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SharedSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SharedSlot.java
deleted file mode 100644
index 3673512..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SharedSlot.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmanager.scheduler;
-
-import java.util.HashSet;
-import java.util.Set;
-
-import org.apache.flink.runtime.instance.AllocatedSlot;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-
-/**
- * 
- * NOTE: This class does no synchronization by itself and its mutating
- *       methods may only be called from within the synchronization scope of
- *       it associated SlotSharingGroupAssignment.
- */
-class SharedSlot {
-
-	private final AllocatedSlot allocatedSlot;
-	
-	private final SlotSharingGroupAssignment assignmentGroup;
-	
-	private final Set<SubSlot> subSlots;
-	
-	private int subSlotNumber;
-	
-	private volatile boolean disposed;
-	
-	// --------------------------------------------------------------------------------------------
-	
-	public SharedSlot(AllocatedSlot allocatedSlot, SlotSharingGroupAssignment assignmentGroup) {
-		if (allocatedSlot == null || assignmentGroup == null) {
-			throw new NullPointerException();
-		}
-		
-		this.allocatedSlot = allocatedSlot;
-		this.assignmentGroup = assignmentGroup;
-		this.subSlots = new HashSet<SubSlot>();
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	AllocatedSlot getAllocatedSlot() {
-		return this.allocatedSlot;
-	}
-	
-	boolean isDisposed() {
-		return disposed;
-	}
-	
-	int getNumberOfAllocatedSubSlots() {
-		return this.subSlots.size();
-	}
-	
-	SubSlot allocateSubSlot(JobVertexID jid) {
-		if (disposed) {
-			return null;
-		} else {
-			SubSlot ss = new SubSlot(this, subSlotNumber++, jid);
-			this.subSlots.add(ss);
-			return ss;
-		}
-	}
-	
-	void returnAllocatedSlot(SubSlot slot) {
-		if (!slot.isReleased()) {
-			throw new IllegalArgumentException("SubSlot is not released.");
-		}
-		
-		this.assignmentGroup.releaseSubSlot(slot, this);
-	}
-	
-	int releaseSlot(SubSlot slot) {
-		if (!this.subSlots.remove(slot)) {
-			throw new IllegalArgumentException("Wrong shared slot for subslot.");
-		}
-		return subSlots.size();
-	}
-	
-	void dispose() {
-		if (subSlots.isEmpty()) {
-			disposed = true;
-			this.allocatedSlot.releaseSlot();
-		} else {
-			throw new IllegalStateException("Cannot dispose while subslots are still alive.");
-		}
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	public String toString() {
-		return "Shared " + allocatedSlot.toString();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/91382bb8/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFuture.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFuture.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFuture.java
index eb5f9fb..31bd341 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFuture.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFuture.java
@@ -18,13 +18,13 @@
 
 package org.apache.flink.runtime.jobmanager.scheduler;
 
-import org.apache.flink.runtime.instance.AllocatedSlot;
+import org.apache.flink.runtime.instance.SimpleSlot;
 
 public class SlotAllocationFuture {
 	
 	private final Object monitor = new Object();
 	
-	private volatile AllocatedSlot slot;
+	private volatile SimpleSlot slot;
 	
 	private volatile SlotAllocationFutureAction action;
 	
@@ -32,17 +32,17 @@ public class SlotAllocationFuture {
 
 	public SlotAllocationFuture() {}
 	
-	public SlotAllocationFuture(AllocatedSlot slot) {
+	public SlotAllocationFuture(SimpleSlot slot) {
 		this.slot = slot;
 	}
 	
 	// --------------------------------------------------------------------------------------------
 	
-	public AllocatedSlot waitTillAllocated() throws InterruptedException {
+	public SimpleSlot waitTillAllocated() throws InterruptedException {
 		return waitTillAllocated(0);
 	}
 	
-	public AllocatedSlot waitTillAllocated(long timeout) throws InterruptedException {
+	public SimpleSlot waitTillAllocated(long timeout) throws InterruptedException {
 		synchronized (monitor) {
 			while (slot == null) {
 				monitor.wait(timeout);
@@ -66,7 +66,7 @@ public class SlotAllocationFuture {
 		}
 	}
 	
-	public void setSlot(AllocatedSlot slot) {
+	public void setSlot(SimpleSlot slot) {
 		if (slot == null) {
 			throw new NullPointerException();
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/91382bb8/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFutureAction.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFutureAction.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFutureAction.java
index 11137fd..f9d032f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFutureAction.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFutureAction.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.jobmanager.scheduler;
 
-import org.apache.flink.runtime.instance.AllocatedSlot;
+import org.apache.flink.runtime.instance.SimpleSlot;
 
 /**
  * An action that is invoked once a {@link SlotAllocationFuture} is triggered.
@@ -30,5 +30,5 @@ public interface SlotAllocationFutureAction {
 	 * 
 	 * @param slot The slot that has been allocated.
 	 */
-	void slotAllocated(AllocatedSlot slot);
+	void slotAllocated(SimpleSlot slot);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/91382bb8/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAvailabilityListener.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAvailabilityListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAvailabilityListener.java
index 639d2b7..f75f294 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAvailabilityListener.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAvailabilityListener.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.jobmanager.scheduler;
 import org.apache.flink.runtime.instance.Instance;
 
 /**
- * A SlotAvailabilityListener can be notified when new {@link org.apache.flink.runtime.instance.AllocatedSlot}s become available
+ * A SlotAvailabilityListener can be notified when new {@link org.apache.flink.runtime.instance.AllocatedSlot2}s become available
  * on an {@link org.apache.flink.runtime.instance.Instance}.
  */
 public interface SlotAvailabilityListener {

http://git-wip-us.apache.org/repos/asf/flink/blob/91382bb8/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroupAssignment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroupAssignment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroupAssignment.java
index a07dd6c..d847ec5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroupAssignment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroupAssignment.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.jobmanager.scheduler;
 
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
@@ -32,93 +33,93 @@ import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.flink.runtime.AbstractID;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.instance.AllocatedSlot;
 import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.instance.SharedSlot;
+import org.apache.flink.runtime.instance.SimpleSlot;
+import org.apache.flink.runtime.instance.Slot;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.slf4j.Logger;
 
 
-public class SlotSharingGroupAssignment {
+public class SlotSharingGroupAssignment implements Serializable {
 	
+	static final long serialVersionUID = 42L;
+
 	private static final Logger LOG = Scheduler.LOG;
-	
-	private final Object lock = new Object();
-	
+
+	private transient final Object lock = new Object();
+
 	/** All slots currently allocated to this sharing group */
 	private final Set<SharedSlot> allSlots = new LinkedHashSet<SharedSlot>();
-	
+
 	/** The slots available per vertex type (jid), keyed by instance, to make them locatable */
 	private final Map<AbstractID, Map<Instance, List<SharedSlot>>> availableSlotsPerJid = new LinkedHashMap<AbstractID, Map<Instance, List<SharedSlot>>>();
-	
-	
+
 	// --------------------------------------------------------------------------------------------
-	
-	
-	public SubSlot addNewSlotWithTask(AllocatedSlot slot, ExecutionVertex vertex) {
-		JobVertexID id = vertex.getJobvertexId();
-		return addNewSlotWithTask(slot, id, id);
-	}
-	
-	public SubSlot addNewSlotWithTask(AllocatedSlot slot, ExecutionVertex vertex, CoLocationConstraint constraint) {
-		AbstractID groupId = constraint.getGroupId();
-		return addNewSlotWithTask(slot, groupId, null);
-	}
-	
-	private SubSlot addNewSlotWithTask(AllocatedSlot slot, AbstractID groupId, JobVertexID vertexId) {
-		
-		final SharedSlot sharedSlot = new SharedSlot(slot, this);
-		final Instance location = slot.getInstance();
-		
+
+	public SimpleSlot addSharedSlotAndAllocateSubSlot(SharedSlot sharedSlot, Locality locality,
+													AbstractID groupId, CoLocationConstraint constraint) {
+
+		final Instance location = sharedSlot.getInstance();
+
 		synchronized (lock) {
 			// add to the total bookkeeping
 			allSlots.add(sharedSlot);
-			
-			// allocate us a sub slot to return
-			SubSlot subslot = sharedSlot.allocateSubSlot(vertexId);
-			
+
+			SimpleSlot subSlot = null;
+
+			if(constraint == null){
+				// allocate us a sub slot to return
+				subSlot = sharedSlot.allocateSubSlot(groupId);
+			} else {
+				// we need a colocation slot --> a SimpleSlot nested in a SharedSlot to host other colocated tasks
+				SharedSlot constraintGroupSlot = sharedSlot.allocateSharedSlot(groupId);
+				subSlot = constraintGroupSlot.allocateSubSlot(null);
+			}
+
 			// preserve the locality information
-			subslot.setLocality(slot.getLocality());
-			
+			subSlot.setLocality(locality);
+
 			boolean entryForNewJidExists = false;
-			
+
 			// let the other vertex types know about this one as well
 			for (Map.Entry<AbstractID, Map<Instance, List<SharedSlot>>> entry : availableSlotsPerJid.entrySet()) {
-				
+
 				if (entry.getKey().equals(groupId)) {
 					entryForNewJidExists = true;
 					continue;
 				}
-				
+
 				Map<Instance, List<SharedSlot>> available = entry.getValue();
 				putIntoMultiMap(available, location, sharedSlot);
 			}
-			
+
 			// make sure an empty entry exists for this group, if no other entry exists
 			if (!entryForNewJidExists) {
 				availableSlotsPerJid.put(groupId, new LinkedHashMap<Instance, List<SharedSlot>>());
 			}
-			
-			return subslot;
+
+			return subSlot;
 		}
 	}
-	
+
 	/**
 	 * Gets a slot suitable for the given task vertex. This method will prefer slots that are local
 	 * (with respect to {@link ExecutionVertex#getPreferredLocations()}), but will return non local
 	 * slots if no local slot is available. The method returns null, when no slot is available for the
 	 * given JobVertexID at all.
-	 * 
+	 *
 	 * @param vertex
-	 * 
+	 *
 	 * @return A task vertex for a task with the given JobVertexID, or null, if none is available.
 	 */
-	public SubSlot getSlotForTask(ExecutionVertex vertex) {
+	public SimpleSlot getSlotForTask(ExecutionVertex vertex) {
 		synchronized (lock) {
 			Pair<SharedSlot, Locality> p = getSlotForTaskInternal(vertex.getJobvertexId(), vertex, vertex.getPreferredLocations(), false);
-			
+
 			if (p != null) {
 				SharedSlot ss = p.getLeft();
-				SubSlot slot = ss.allocateSubSlot(vertex.getJobvertexId());
+				SimpleSlot slot = ss.allocateSubSlot(vertex.getJobvertexId());
 				slot.setLocality(p.getRight());
 				return slot;
 			}
@@ -126,17 +127,17 @@ public class SlotSharingGroupAssignment {
 				return null;
 			}
 		}
-		
+
 	}
-	
-	public SubSlot getSlotForTask(ExecutionVertex vertex, CoLocationConstraint constraint) {
-		
+
+	public SimpleSlot getSlotForTask(ExecutionVertex vertex, CoLocationConstraint constraint) {
+
 		synchronized (lock) {
 			SharedSlot shared = constraint.getSharedSlot();
-			
-			if (shared != null && !shared.isDisposed()) {
+
+			if (shared != null && !shared.isDead()) {
 				// initialized and set
-				SubSlot subslot = shared.allocateSubSlot(null);
+				SimpleSlot subslot = shared.allocateSubSlot(null);
 				subslot.setLocality(Locality.LOCAL);
 				return subslot;
 			}
@@ -144,85 +145,92 @@ public class SlotSharingGroupAssignment {
 				// not initialized, grab a new slot. preferred locations are defined by the vertex
 				// we only associate the slot with the constraint, if it was a local match
 				Pair<SharedSlot, Locality> p = getSlotForTaskInternal(constraint.getGroupId(), vertex, vertex.getPreferredLocations(), false);
+
 				if (p == null) {
 					return null;
 				} else {
 					shared = p.getLeft();
 					Locality l = p.getRight();
-					
-					SubSlot sub = shared.allocateSubSlot(null);
+
+					// we need a colocation slot --> SimpleSlot nested in a SharedSlot to host other colocated tasks
+					SharedSlot constraintGroupSlot = shared.allocateSharedSlot(constraint.getGroupId());
+					// Depth=3 => groupID==null
+					SimpleSlot sub = constraintGroupSlot.allocateSubSlot(null);
 					sub.setLocality(l);
-					
+
 					if (l != Locality.NON_LOCAL) {
-						constraint.setSharedSlot(shared);
+						constraint.setSharedSlot(constraintGroupSlot);
 					}
 					return sub;
 				}
 			}
 			else {
 				// disposed. get a new slot on the same instance
-				Instance location = shared.getAllocatedSlot().getInstance();
+				Instance location = shared.getInstance();
 				Pair<SharedSlot, Locality> p = getSlotForTaskInternal(constraint.getGroupId(), vertex, Collections.singleton(location), true);
+
 				if (p == null) {
 					return null;
 				} else {
 					shared = p.getLeft();
-					constraint.setSharedSlot(shared);
-					SubSlot subslot = shared.allocateSubSlot(null);
-					subslot.setLocality(Locality.LOCAL);
-					return subslot;
+					// we need colocation slot --> SimpleSlot nested in a SharedSlot to host other colocated tasks
+					SharedSlot constraintGroupSlot = shared.allocateSharedSlot(constraint.getGroupId());
+					constraint.setSharedSlot(constraintGroupSlot);
+					SimpleSlot subSlot = constraintGroupSlot.allocateSubSlot(null);
+					subSlot.setLocality(Locality.LOCAL);
+					return subSlot;
 				}
 			}
 		}
 	}
-	
+
 	/**
 	 * NOTE: This method is not synchronized by itself, needs to be synchronized externally.
-	 * 
+	 *
 	 * @return An allocated sub slot, or {@code null}, if no slot is available.
 	 */
 	private Pair<SharedSlot, Locality> getSlotForTaskInternal(AbstractID groupId, ExecutionVertex vertex, Iterable<Instance> preferredLocations, boolean localOnly) {
+		Map<Instance, List<SharedSlot>> slotsForGroup = availableSlotsPerJid.get(groupId);
+
 		if (allSlots.isEmpty()) {
 			return null;
 		}
-		
-		Map<Instance, List<SharedSlot>> slotsForGroup = availableSlotsPerJid.get(groupId);
-		
+
 		// get the available slots for the group
 		if (slotsForGroup == null) {
 			// no task is yet scheduled for that group, so all slots are available
 			slotsForGroup = new LinkedHashMap<Instance, List<SharedSlot>>();
 			availableSlotsPerJid.put(groupId, slotsForGroup);
-			
+
 			for (SharedSlot availableSlot : allSlots) {
-				putIntoMultiMap(slotsForGroup, availableSlot.getAllocatedSlot().getInstance(), availableSlot);
+				putIntoMultiMap(slotsForGroup, availableSlot.getInstance(), availableSlot);
 			}
 		}
 		else if (slotsForGroup.isEmpty()) {
 			return null;
 		}
-		
+
 		// check whether we can schedule the task to a preferred location
 		boolean didNotGetPreferred = false;
-		
+
 		if (preferredLocations != null) {
 			for (Instance location : preferredLocations) {
-				
+
 				// set the flag that we failed a preferred location. If one will be found,
 				// we return early anyways and skip the flag evaluation
 				didNotGetPreferred = true;
-				
+
 				SharedSlot slot = removeFromMultiMap(slotsForGroup, location);
-				if (slot != null && !slot.isDisposed()) {
+				if (slot != null && !slot.isDead()) {
 					if (LOG.isDebugEnabled()) {
 						LOG.debug("Local assignment in shared group : " + vertex + " --> " + slot);
 					}
-					
+
 					return new ImmutablePair<SharedSlot, Locality>(slot, Locality.LOCAL);
 				}
 			}
 		}
-		
+
 		// if we want only local assignments, exit now with a "not found" result
 		if (didNotGetPreferred && localOnly) {
 			if (LOG.isDebugEnabled()) {
@@ -230,84 +238,153 @@ public class SlotSharingGroupAssignment {
 			}
 			return null;
 		}
-		
+
 		// schedule the task to any available location
 		SharedSlot slot = pollFromMultiMap(slotsForGroup);
-		if (slot != null && !slot.isDisposed()) {
+		if (slot != null && !slot.isDead()) {
 			if (LOG.isDebugEnabled()) {
 				LOG.debug((didNotGetPreferred ? "Non-local" : "Unconstrained") + " assignment in shared group : " + vertex + " --> " + slot);
 			}
-			
+
 			return new ImmutablePair<SharedSlot, Locality>(slot, didNotGetPreferred ? Locality.NON_LOCAL : Locality.UNCONSTRAINED);
 		}
 		else {
 			return null;
 		}
 	}
-	
-	
-	void releaseSubSlot(SubSlot subslot, SharedSlot sharedSlot) {
-		
-		AbstractID groupId = subslot.getGroupId();
-		
+
+	/**
+	 * Removes the shared slot from the assignment group.
+	 *
+	 * @param sharedSlot
+	 */
+	private void removeSharedSlot(SharedSlot sharedSlot){
+		if (!allSlots.contains(sharedSlot)) {
+			throw new IllegalArgumentException("Slot was not associated with this SlotSharingGroup before.");
+		}
+
+		allSlots.remove(sharedSlot);
+
+		Instance location = sharedSlot.getInstance();
+
+		for(Map.Entry<AbstractID, Map<Instance, List<SharedSlot>>> mapEntry: availableSlotsPerJid.entrySet()){
+			Map<Instance, List<SharedSlot>> map = mapEntry.getValue();
+
+			List<SharedSlot> list = map.get(location);
+
+			if(list == null || !list.remove(sharedSlot)){
+				throw new IllegalStateException("Bug: SharedSlot was not available to another vertex type that it was not allocated for before.");
+			}
+
+			if(list.isEmpty()){
+				map.remove(location);
+			}
+		}
+
+		sharedSlot.markCancelled();
+
+		returnAllocatedSlot(sharedSlot);
+	}
+
+	/**
+	 * Releases the shared slot from the assignment group.
+	 * @param sharedSlot The SharedSlot to be released
+	 */
+	public void releaseSharedSlot(SharedSlot sharedSlot){
 		synchronized (lock) {
+			Set<Slot> subSlots = sharedSlot.getSubSlots();
 
-			if (!allSlots.contains(sharedSlot)) {
-				throw new IllegalArgumentException("Slot was not associated with this SlotSharingGroup before.");
+			for(Slot subSlot: subSlots) {
+
+				subSlot.markDisposed();
+
+				if(subSlot instanceof SharedSlot){
+					releaseSharedSlot((SharedSlot) subSlot);
+				}else if(subSlot instanceof SimpleSlot){
+					releaseSimpleSlot((SimpleSlot) subSlot);
+				}
 			}
-			
-			int slotsRemaining = sharedSlot.releaseSlot(subslot);
-			
-			if (slotsRemaining == 0) {
-				// this was the last sub slot. remove this from the availability list 
-				// and trigger disposal
-				try {
-					allSlots.remove(sharedSlot);
-					
-					Instance location = sharedSlot.getAllocatedSlot().getInstance();
-
-					if (groupId != null) {
-						for (Map.Entry<AbstractID, Map<Instance, List<SharedSlot>>> mapEntry : availableSlotsPerJid.entrySet()) {
-							AbstractID id = mapEntry.getKey();
-							
-							// hack: we identify co location hint entries by the fact that they are keyed
-							//       by an abstract id, rather than a job vertex id
-							if (id.getClass() == AbstractID.class || id.equals(groupId)) {
-								continue;
-							}
-							
-							Map<Instance, List<SharedSlot>> map = mapEntry.getValue();
-							List<SharedSlot> list = map.get(location);
-							if (list == null || !list.remove(sharedSlot)) {
-								throw new IllegalStateException("Bug: SharedSlot was not available to another vertex type that it was not allocated for before.");
-							}
-							if (list.isEmpty()) {
-								map.remove(location);
-							}
-						}
+
+			subSlots.clear();
+
+			returnSlot(sharedSlot);
+		}
+	}
+
+	/**
+	 * Releases the simple slot from the assignment group.
+	 * @param simpleSlot The SimpleSlot to be released
+	 */
+	public void releaseSimpleSlot(SimpleSlot simpleSlot){
+		synchronized (lock) {
+			simpleSlot.cancel();
+
+			returnSlot(simpleSlot);
+		}
+
+	}
+
+	/**
+	 * Removes the given slot from the assignment group. If the slot is a root object, then it has
+	 * to be a SharedSlot and it is removed from the availableSlotsPerJid field and the slot is
+	 * returned to the instance. If the slot is a sub slot of the root slot, then this sub slot
+	 * is marked available again for tasks of the same group. Otherwise, the slot is simply removed
+	 * from its parent if it is not already marked as disposed. If a slot is already marked to be
+	 * disposed, then the releasing was called from a parent slot which will take care of the
+	 * disposal.
+	 *
+	 * IMPORTANT: The method is not synchronized. The caller is responsible for that.
+	 *
+	 * @param slot The slot to be returned.
+	 */
+	private void returnSlot(Slot slot){
+		// each slot can only be returned once, if a slot is returned then it should no longer be used --> markDead
+		if(slot.markDead()) {
+			// slot is a root slot
+			if(slot.getParent() == null){
+				// only SharedSlots are allowed to be root slots in a SlotSharingGroupAssignment
+				if(slot instanceof SharedSlot){
+					removeSharedSlot((SharedSlot) slot);
+				} else {
+					throw new IllegalStateException("Simple slot cannot be returned from SlotSharingGroupAssignment.");
+				}
+			} else {
+				AbstractID groupID = slot.getGroupID();
+				SharedSlot parent = slot.getParent();
+
+				// Only colocation constraint slots (SimpleSlot nested in a SharedSlot nested in a SharedSlot) have a groupID==null
+				// One can also say, all nested slots deeper than 2 have a groupID==null
+				if(groupID != null){
+					if (!allSlots.contains(parent)) {
+						throw new IllegalArgumentException("Slot was not associated with this SlotSharingGroup before.");
+					}
+
+					// make the shared slot available to tasks within the group it available to
+					Map<Instance, List<SharedSlot>> slotsForJid = availableSlotsPerJid.get(groupID);
+
+					// sanity check
+					if (slotsForJid == null) {
+						throw new IllegalStateException("Trying to return a slot for group " + groupID +
+								" when available slots indicated that all slots were available.");
 					}
-				} finally {
-					sharedSlot.dispose();
+
+					putIntoMultiMap(slotsForJid, parent.getInstance(), parent);
 				}
-			}
-			else if (groupId != null) {
-				// make the shared slot available to tasks within the group it available to
-				Map<Instance, List<SharedSlot>> slotsForJid = availableSlotsPerJid.get(groupId);
-				
-				// sanity check
-				if (slotsForJid == null) {
-					throw new IllegalStateException("Trying to return a slot for group " + groupId + 
-							" when available slots indicated that all slots were available.");
+
+				// if no one else takes care of disposal, then remove the slot from the parent
+				if(slot.markDisposed()) {
+					if (slot.getParent().freeSubSlot(slot) == 0) {
+						releaseSharedSlot(slot.getParent());
+					}
 				}
-				
-				putIntoMultiMap(slotsForJid, sharedSlot.getAllocatedSlot().getInstance(), sharedSlot);
 			}
 		}
 	}
-	
-	
-	
-	
+
+	private void returnAllocatedSlot(SharedSlot slot){
+		slot.getInstance().returnAllocatedSlot(slot);
+	}
+
 	// --------------------------------------------------------------------------------------------
 	//  State
 	// --------------------------------------------------------------------------------------------


Mime
View raw message