flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [5/7] flink git commit: [FLINK-1376] [runtime] Add proper shared slot release in case of a fatal TaskManager failure.
Date Thu, 05 Feb 2015 12:25:46 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/db1b8b99/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
index b22ccd0..272a911 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
@@ -26,9 +26,9 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import org.apache.flink.runtime.instance.SimpleSlot;
 import akka.actor.ActorSystem;
 import akka.testkit.JavaTestKit;
-import org.apache.flink.runtime.instance.AllocatedSlot;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
@@ -77,18 +77,18 @@ public class ScheduleWithCoLocationHintTest {
 			CoLocationConstraint c6 = new CoLocationConstraint(ccg);
 			
 			// schedule 4 tasks from the first vertex group
-			AllocatedSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 0, 6), sharingGroup, c1));
-			AllocatedSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 1, 6), sharingGroup, c2));
-			AllocatedSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 2, 6), sharingGroup, c3));
-			AllocatedSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 3, 6), sharingGroup, c4));
-			AllocatedSlot s5 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 0, 6), sharingGroup, c1));
-			AllocatedSlot s6 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 1, 6), sharingGroup, c2));
-			AllocatedSlot s7 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 2, 6), sharingGroup, c3));
-			AllocatedSlot s8 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 4, 6), sharingGroup, c5));
-			AllocatedSlot s9 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 5, 6), sharingGroup, c6));
-			AllocatedSlot s10 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 3, 6), sharingGroup, c4));
-			AllocatedSlot s11 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 4, 6), sharingGroup, c5));
-			AllocatedSlot s12 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 5, 6), sharingGroup, c6));
+			SimpleSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 0, 6), sharingGroup, c1));
+			SimpleSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 1, 6), sharingGroup, c2));
+			SimpleSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 2, 6), sharingGroup, c3));
+			SimpleSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 3, 6), sharingGroup, c4));
+			SimpleSlot s5 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 0, 6), sharingGroup, c1));
+			SimpleSlot s6 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 1, 6), sharingGroup, c2));
+			SimpleSlot s7 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 2, 6), sharingGroup, c3));
+			SimpleSlot s8 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 4, 6), sharingGroup, c5));
+			SimpleSlot s9 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 5, 6), sharingGroup, c6));
+			SimpleSlot s10 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 3, 6), sharingGroup, c4));
+			SimpleSlot s11 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 4, 6), sharingGroup, c5));
+			SimpleSlot s12 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 5, 6), sharingGroup, c6));
 
 			assertNotNull(s1);
 			assertNotNull(s2);
@@ -104,18 +104,18 @@ public class ScheduleWithCoLocationHintTest {
 			assertNotNull(s12);
 			
 			// check that each slot got exactly two tasks
-			assertEquals(2, ((SubSlot) s1).getSharedSlot().getNumberOfAllocatedSubSlots());
-			assertEquals(2, ((SubSlot) s2).getSharedSlot().getNumberOfAllocatedSubSlots());
-			assertEquals(2, ((SubSlot) s3).getSharedSlot().getNumberOfAllocatedSubSlots());
-			assertEquals(2, ((SubSlot) s4).getSharedSlot().getNumberOfAllocatedSubSlots());
-			assertEquals(2, ((SubSlot) s5).getSharedSlot().getNumberOfAllocatedSubSlots());
-			assertEquals(2, ((SubSlot) s6).getSharedSlot().getNumberOfAllocatedSubSlots());
-			assertEquals(2, ((SubSlot) s7).getSharedSlot().getNumberOfAllocatedSubSlots());
-			assertEquals(2, ((SubSlot) s8).getSharedSlot().getNumberOfAllocatedSubSlots());
-			assertEquals(2, ((SubSlot) s9).getSharedSlot().getNumberOfAllocatedSubSlots());
-			assertEquals(2, ((SubSlot) s10).getSharedSlot().getNumberOfAllocatedSubSlots());
-			assertEquals(2, ((SubSlot) s11).getSharedSlot().getNumberOfAllocatedSubSlots());
-			assertEquals(2, ((SubSlot) s12).getSharedSlot().getNumberOfAllocatedSubSlots());
+			assertEquals(2, s1.getRoot().getNumberLeaves());
+			assertEquals(2, s2.getRoot().getNumberLeaves());
+			assertEquals(2, s3.getRoot().getNumberLeaves());
+			assertEquals(2, s4.getRoot().getNumberLeaves());
+			assertEquals(2, s5.getRoot().getNumberLeaves());
+			assertEquals(2, s6.getRoot().getNumberLeaves());
+			assertEquals(2, s7.getRoot().getNumberLeaves());
+			assertEquals(2, s8.getRoot().getNumberLeaves());
+			assertEquals(2, s9.getRoot().getNumberLeaves());
+			assertEquals(2, s10.getRoot().getNumberLeaves());
+			assertEquals(2, s11.getRoot().getNumberLeaves());
+			assertEquals(2, s12.getRoot().getNumberLeaves());
 			
 			assertEquals(s1.getInstance(), s5.getInstance());
 			assertEquals(s2.getInstance(), s6.getInstance());
@@ -150,7 +150,7 @@ public class ScheduleWithCoLocationHintTest {
 			s12.releaseSlot();
 			assertTrue(scheduler.getNumberOfAvailableSlots() >= 1);
 			
-			AllocatedSlot single = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(new JobVertexID(), 0, 1)));
+			SimpleSlot single = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(new JobVertexID(), 0, 1)));
 			assertNotNull(single);
 			
 			s1.releaseSlot();
@@ -197,10 +197,10 @@ public class ScheduleWithCoLocationHintTest {
 			SlotSharingGroup sharingGroup = new SlotSharingGroup();
 			CoLocationConstraint c1 = new CoLocationConstraint(new CoLocationGroup());
 			
-			AllocatedSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 0, 1), sharingGroup, c1));
-			AllocatedSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 0, 1), sharingGroup, c1));
+			SimpleSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 0, 1), sharingGroup, c1));
+			SimpleSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 0, 1), sharingGroup, c1));
 			
-			AllocatedSlot sSolo = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid4, 0, 1)));
+			SimpleSlot sSolo = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid4, 0, 1)));
 			
 			Instance loc = s1.getInstance();
 			
@@ -208,7 +208,7 @@ public class ScheduleWithCoLocationHintTest {
 			s2.releaseSlot();
 			sSolo.releaseSlot();
 			
-			AllocatedSlot sNew = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 0, 1), sharingGroup, c1));
+			SimpleSlot sNew = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 0, 1), sharingGroup, c1));
 			assertEquals(loc, sNew.getInstance());
 			
 			assertEquals(2, scheduler.getNumberOfLocalizedAssignments());
@@ -241,7 +241,7 @@ public class ScheduleWithCoLocationHintTest {
 			SlotSharingGroup sharingGroup = new SlotSharingGroup();
 			CoLocationConstraint c1 = new CoLocationConstraint(new CoLocationGroup());
 			
-			AllocatedSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 0, 1), sharingGroup, c1));
+			SimpleSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 0, 1), sharingGroup, c1));
 			s1.releaseSlot();
 			
 			scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 0, 1)));
@@ -296,16 +296,16 @@ public class ScheduleWithCoLocationHintTest {
 			scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 3, 4), shareGroup));
 			
 			// second wave
-			AllocatedSlot s21 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 0, 4), shareGroup, clc1));
-			AllocatedSlot s22 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 2, 4), shareGroup, clc2));
-			AllocatedSlot s23 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 1, 4), shareGroup, clc3));
-			AllocatedSlot s24 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 3, 4), shareGroup, clc4));
+			SimpleSlot s21 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 0, 4), shareGroup, clc1));
+			SimpleSlot s22 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 2, 4), shareGroup, clc2));
+			SimpleSlot s23 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 1, 4), shareGroup, clc3));
+			SimpleSlot s24 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 3, 4), shareGroup, clc4));
 			
 			// third wave
-			AllocatedSlot s31 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 1, 4), shareGroup, clc2));
-			AllocatedSlot s32 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 2, 4), shareGroup, clc3));
-			AllocatedSlot s33 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 3, 4), shareGroup, clc4));
-			AllocatedSlot s34 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 0, 4), shareGroup, clc1));
+			SimpleSlot s31 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 1, 4), shareGroup, clc2));
+			SimpleSlot s32 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 2, 4), shareGroup, clc3));
+			SimpleSlot s33 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 3, 4), shareGroup, clc4));
+			SimpleSlot s34 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 0, 4), shareGroup, clc1));
 			
 			scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid4, 0, 4), shareGroup));
 			scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid4, 1, 4), shareGroup));
@@ -352,24 +352,24 @@ public class ScheduleWithCoLocationHintTest {
 			CoLocationConstraint cc2 = new CoLocationConstraint(ccg);
 
 			// schedule something into the shared group so that both instances are in the sharing group
-			AllocatedSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, i1), sharingGroup));
-			AllocatedSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, i2), sharingGroup));
+			SimpleSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, i1), sharingGroup));
+			SimpleSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, i2), sharingGroup));
 			
 			// schedule one locally to instance 1
-			AllocatedSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, i1), sharingGroup, cc1));
+			SimpleSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, i1), sharingGroup, cc1));
 
 			// schedule with co location constraint (yet unassigned) and a preference for
 			// instance 1, but it can only get instance 2
-			AllocatedSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, i1), sharingGroup, cc2));
+			SimpleSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, i1), sharingGroup, cc2));
 			
 			// schedule something into the assigned co-location constraints and check that they override the
 			// other preferences
-			AllocatedSlot s5 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid3, 0, 2, i2), sharingGroup, cc1));
-			AllocatedSlot s6 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid3, 1, 2, i1), sharingGroup, cc2));
+			SimpleSlot s5 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid3, 0, 2, i2), sharingGroup, cc1));
+			SimpleSlot s6 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid3, 1, 2, i1), sharingGroup, cc2));
 			
 			// check that each slot got three
-			assertEquals(3, ((SubSlot) s1).getSharedSlot().getNumberOfAllocatedSubSlots());
-			assertEquals(3, ((SubSlot) s2).getSharedSlot().getNumberOfAllocatedSubSlots());
+			assertEquals(3, s1.getRoot().getNumberLeaves());
+			assertEquals(3, s2.getRoot().getNumberLeaves());
 			
 			assertEquals(s1.getInstance(), s3.getInstance());
 			assertEquals(s2.getInstance(), s4.getInstance());
@@ -420,8 +420,8 @@ public class ScheduleWithCoLocationHintTest {
 			CoLocationConstraint cc1 = new CoLocationConstraint(ccg);
 			CoLocationConstraint cc2 = new CoLocationConstraint(ccg);
 
-			AllocatedSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, i1), sharingGroup, cc1));
-			AllocatedSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, i2), sharingGroup, cc2));
+			SimpleSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, i1), sharingGroup, cc1));
+			SimpleSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, i2), sharingGroup, cc2));
 			
 			s1.releaseSlot();
 			s2.releaseSlot();
@@ -429,8 +429,8 @@ public class ScheduleWithCoLocationHintTest {
 			assertEquals(2, scheduler.getNumberOfAvailableSlots());
 			assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfSlots());
 
-			AllocatedSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, i2), sharingGroup, cc1));
-			AllocatedSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, i1), sharingGroup, cc2));
+			SimpleSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, i2), sharingGroup, cc1));
+			SimpleSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, i1), sharingGroup, cc2));
 			
 			// still preserves the previous instance mapping)
 			assertEquals(i1, s3.getInstance());
@@ -474,8 +474,8 @@ public class ScheduleWithCoLocationHintTest {
 			CoLocationConstraint cc1 = new CoLocationConstraint(ccg);
 			CoLocationConstraint cc2 = new CoLocationConstraint(ccg);
 
-			AllocatedSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, i1), sharingGroup, cc1));
-			AllocatedSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, i2), sharingGroup, cc2));
+			SimpleSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, i1), sharingGroup, cc1));
+			SimpleSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, i2), sharingGroup, cc2));
 			
 			s1.releaseSlot();
 			s2.releaseSlot();
@@ -483,8 +483,8 @@ public class ScheduleWithCoLocationHintTest {
 			assertEquals(2, scheduler.getNumberOfAvailableSlots());
 			assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfSlots());
 
-			AllocatedSlot sa = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jidx, 0, 2)));
-			AllocatedSlot sb = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jidx, 1, 2)));
+			SimpleSlot sa = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jidx, 0, 2)));
+			SimpleSlot sb = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jidx, 1, 2)));
 			
 			try {
 				scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, i2), sharingGroup, cc1));
@@ -535,15 +535,15 @@ public class ScheduleWithCoLocationHintTest {
 			// schedule something from the second job vertex id before the first is filled,
 			// and give locality preferences that hint at using the same shared slot for both
 			// co location constraints (which we seek to prevent)
-			AllocatedSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, i1), sharingGroup, cc1));
-			AllocatedSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, i1), sharingGroup, cc2));
+			SimpleSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, i1), sharingGroup, cc1));
+			SimpleSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, i1), sharingGroup, cc2));
 
-			AllocatedSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, i1), sharingGroup, cc1));
-			AllocatedSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, i1), sharingGroup, cc2));
+			SimpleSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, i1), sharingGroup, cc1));
+			SimpleSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, i1), sharingGroup, cc2));
 			
 			// check that each slot got three
-			assertEquals(2, ((SubSlot) s1).getSharedSlot().getNumberOfAllocatedSubSlots());
-			assertEquals(2, ((SubSlot) s2).getSharedSlot().getNumberOfAllocatedSubSlots());
+			assertEquals(2, s1.getRoot().getNumberLeaves());
+			assertEquals(2, s2.getRoot().getNumberLeaves());
 			
 			assertEquals(s1.getInstance(), s3.getInstance());
 			assertEquals(s2.getInstance(), s4.getInstance());
@@ -594,15 +594,15 @@ public class ScheduleWithCoLocationHintTest {
 			CoLocationConstraint cc1 = new CoLocationConstraint(ccg);
 			CoLocationConstraint cc2 = new CoLocationConstraint(ccg);
 
-			AllocatedSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, i1), sharingGroup, cc1));
-			AllocatedSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, i2), sharingGroup, cc2));
+			SimpleSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, i1), sharingGroup, cc1));
+			SimpleSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, i2), sharingGroup, cc2));
 
-			AllocatedSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, i1), sharingGroup));
-			AllocatedSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, i1), sharingGroup));
+			SimpleSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, i1), sharingGroup));
+			SimpleSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, i1), sharingGroup));
 			
-			// check that each slot got three
-			assertEquals(2, ((SubSlot) s1).getSharedSlot().getNumberOfAllocatedSubSlots());
-			assertEquals(2, ((SubSlot) s2).getSharedSlot().getNumberOfAllocatedSubSlots());
+			// check that each slot got two
+			assertEquals(2, s1.getRoot().getNumberLeaves());
+			assertEquals(2, s2.getRoot().getNumberLeaves());
 			
 			s1.releaseSlot();
 			s2.releaseSlot();

http://git-wip-us.apache.org/repos/asf/flink/blob/db1b8b99/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
index 7209842..a7f0f04 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
@@ -24,6 +24,7 @@ import static org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils.g
 import static org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils.getRandomInstance;
 import static org.junit.Assert.*;
 
+import org.apache.flink.runtime.instance.SimpleSlot;
 import akka.actor.ActorSystem;
 import akka.testkit.JavaTestKit;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
@@ -40,7 +41,6 @@ import java.util.List;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import org.apache.flink.runtime.instance.AllocatedSlot;
 import org.apache.flink.runtime.instance.Instance;
 
 /**
@@ -137,11 +137,11 @@ public class SchedulerIsolatedTasksTest {
 			assertEquals(5, scheduler.getNumberOfAvailableSlots());
 			
 			// schedule something into all slots
-			AllocatedSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getDummyTask()));
-			AllocatedSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getDummyTask()));
-			AllocatedSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getDummyTask()));
-			AllocatedSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getDummyTask()));
-			AllocatedSlot s5 = scheduler.scheduleImmediately(new ScheduledUnit(getDummyTask()));
+			SimpleSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getDummyTask()));
+			SimpleSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getDummyTask()));
+			SimpleSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getDummyTask()));
+			SimpleSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getDummyTask()));
+			SimpleSlot s5 = scheduler.scheduleImmediately(new ScheduledUnit(getDummyTask()));
 			
 			// the slots should all be different
 			assertTrue(areAllDistinct(s1, s2, s3, s4, s5));
@@ -160,8 +160,8 @@ public class SchedulerIsolatedTasksTest {
 			assertEquals(2, scheduler.getNumberOfAvailableSlots());
 			
 			// now we can schedule some more slots
-			AllocatedSlot s6 = scheduler.scheduleImmediately(new ScheduledUnit(getDummyTask()));
-			AllocatedSlot s7 = scheduler.scheduleImmediately(new ScheduledUnit(getDummyTask()));
+			SimpleSlot s6 = scheduler.scheduleImmediately(new ScheduledUnit(getDummyTask()));
+			SimpleSlot s7 = scheduler.scheduleImmediately(new ScheduledUnit(getDummyTask()));
 			
 			assertTrue(areAllDistinct(s1, s2, s3, s4, s5, s6, s7));
 			
@@ -215,7 +215,7 @@ public class SchedulerIsolatedTasksTest {
 			List<SlotAllocationFuture> allAllocatedSlots = new ArrayList<SlotAllocationFuture>();
 			
 			// slots that need to be released
-			final Set<AllocatedSlot> toRelease = new HashSet<AllocatedSlot>();
+			final Set<SimpleSlot> toRelease = new HashSet<SimpleSlot>();
 			
 			// flag to track errors in the concurrent thread
 			final AtomicBoolean errored = new AtomicBoolean(false);
@@ -223,7 +223,7 @@ public class SchedulerIsolatedTasksTest {
 			
 			SlotAllocationFutureAction action = new SlotAllocationFutureAction() {
 				@Override
-				public void slotAllocated(AllocatedSlot slot) {
+				public void slotAllocated(SimpleSlot slot) {
 					synchronized (toRelease) {
 						toRelease.add(slot);
 						toRelease.notifyAll();
@@ -244,8 +244,8 @@ public class SchedulerIsolatedTasksTest {
 									toRelease.wait();
 								}
 								
-								Iterator<AllocatedSlot> iter = toRelease.iterator();
-								AllocatedSlot next = iter.next();
+								Iterator<SimpleSlot> iter = toRelease.iterator();
+								SimpleSlot next = iter.next();
 								iter.remove();
 								
 								next.releaseSlot();
@@ -272,7 +272,7 @@ public class SchedulerIsolatedTasksTest {
 			
 			assertFalse("The slot releasing thread caused an error.", errored.get());
 			
-			List<AllocatedSlot> slotsAfter = new ArrayList<AllocatedSlot>();
+			List<SimpleSlot> slotsAfter = new ArrayList<SimpleSlot>();
 			for (SlotAllocationFuture future : allAllocatedSlots) {
 				slotsAfter.add(future.waitTillAllocated());
 			}
@@ -303,7 +303,7 @@ public class SchedulerIsolatedTasksTest {
 			scheduler.newInstanceAvailable(i2);
 			scheduler.newInstanceAvailable(i3);
 			
-			List<AllocatedSlot> slots = new ArrayList<AllocatedSlot>();
+			List<SimpleSlot> slots = new ArrayList<SimpleSlot>();
 			slots.add(scheduler.scheduleImmediately(new ScheduledUnit(getDummyTask())));
 			slots.add(scheduler.scheduleImmediately(new ScheduledUnit(getDummyTask())));
 			slots.add(scheduler.scheduleImmediately(new ScheduledUnit(getDummyTask())));
@@ -312,7 +312,7 @@ public class SchedulerIsolatedTasksTest {
 			
 			i2.markDead();
 			
-			for (AllocatedSlot slot : slots) {
+			for (SimpleSlot slot : slots) {
 				if (slot.getInstance() == i2) {
 					assertTrue(slot.isCanceled());
 				} else {
@@ -364,7 +364,7 @@ public class SchedulerIsolatedTasksTest {
 			scheduler.newInstanceAvailable(i3);
 			
 			// schedule something on an arbitrary instance
-			AllocatedSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(Collections.<Instance>emptyList())));
+			SimpleSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(Collections.<Instance>emptyList())));
 			
 			// figure out how we use the location hints
 			Instance first = s1.getInstance();
@@ -372,28 +372,28 @@ public class SchedulerIsolatedTasksTest {
 			Instance third = first == i3 ? i2 : i3;
 			
 			// something that needs to go to the first instance again
-			AllocatedSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(Collections.singletonList(s1.getInstance()))));
+			SimpleSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(Collections.singletonList(s1.getInstance()))));
 			assertEquals(first, s2.getInstance());
 
 			// first or second --> second, because first is full
-			AllocatedSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(Arrays.asList(first, second))));
+			SimpleSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(Arrays.asList(first, second))));
 			assertEquals(second, s3.getInstance());
 			
 			// first or third --> third (because first is full)
-			AllocatedSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(Arrays.asList(first, third))));
-			AllocatedSlot s5 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(Arrays.asList(first, third))));
+			SimpleSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(Arrays.asList(first, third))));
+			SimpleSlot s5 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(Arrays.asList(first, third))));
 			assertEquals(third, s4.getInstance());
 			assertEquals(third, s5.getInstance());
 			
 			// first or third --> second, because all others are full
-			AllocatedSlot s6 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(Arrays.asList(first, third))));
+			SimpleSlot s6 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(Arrays.asList(first, third))));
 			assertEquals(second, s6.getInstance());
 			
 			// release something on the first and second instance
 			s2.releaseSlot();
 			s6.releaseSlot();
 			
-			AllocatedSlot s7 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(Arrays.asList(first, third))));
+			SimpleSlot s7 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(Arrays.asList(first, third))));
 			assertEquals(first, s7.getInstance());
 			
 			assertEquals(1, scheduler.getNumberOfUnconstrainedAssignments());

http://git-wip-us.apache.org/repos/asf/flink/blob/db1b8b99/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
index de90701..d1c6938 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
@@ -31,13 +31,13 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.flink.runtime.instance.SimpleSlot;
 import akka.actor.ActorSystem;
 import akka.testkit.JavaTestKit;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
-import org.apache.flink.runtime.instance.AllocatedSlot;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 
@@ -74,10 +74,10 @@ public class SchedulerSlotSharingTest {
 			scheduler.newInstanceAvailable(i2);
 			
 			// schedule 4 tasks from the first vertex group
-			AllocatedSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 0, 8), sharingGroup));
-			AllocatedSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 1, 8), sharingGroup));
-			AllocatedSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 2, 8), sharingGroup));
-			AllocatedSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 3, 8), sharingGroup));
+			SimpleSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 0, 8), sharingGroup));
+			SimpleSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 1, 8), sharingGroup));
+			SimpleSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 2, 8), sharingGroup));
+			SimpleSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 3, 8), sharingGroup));
 			
 			assertNotNull(s1);
 			assertNotNull(s2);
@@ -102,7 +102,7 @@ public class SchedulerSlotSharingTest {
 			s3.releaseSlot();
 			
 			// allocate another slot from that group
-			AllocatedSlot s5 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 4, 8), sharingGroup));
+			SimpleSlot s5 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 4, 8), sharingGroup));
 			assertNotNull(s5);
 			
 			// release all old slots
@@ -110,9 +110,9 @@ public class SchedulerSlotSharingTest {
 			s2.releaseSlot();
 			s4.releaseSlot();
 			
-			AllocatedSlot s6 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 5, 8), sharingGroup));
-			AllocatedSlot s7 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 6, 8), sharingGroup));
-			AllocatedSlot s8 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 7, 8), sharingGroup));
+			SimpleSlot s6 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 5, 8), sharingGroup));
+			SimpleSlot s7 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 6, 8), sharingGroup));
+			SimpleSlot s8 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 7, 8), sharingGroup));
 			
 			assertNotNull(s6);
 			assertNotNull(s7);
@@ -159,10 +159,10 @@ public class SchedulerSlotSharingTest {
 			scheduler.newInstanceAvailable(getRandomInstance(2));
 			
 			// schedule 4 tasks from the first vertex group
-			AllocatedSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 0, 5), sharingGroup));
-			AllocatedSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 1, 5), sharingGroup));
-			AllocatedSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 2, 5), sharingGroup));
-			AllocatedSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 3, 5), sharingGroup));
+			SimpleSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 0, 5), sharingGroup));
+			SimpleSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 1, 5), sharingGroup));
+			SimpleSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 2, 5), sharingGroup));
+			SimpleSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 3, 5), sharingGroup));
 			
 			assertNotNull(s1);
 			assertNotNull(s2);
@@ -184,10 +184,10 @@ public class SchedulerSlotSharingTest {
 			}
 			
 			// schedule some tasks from the second ID group
-			AllocatedSlot s1_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 0, 5), sharingGroup));
-			AllocatedSlot s2_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 1, 5), sharingGroup));
-			AllocatedSlot s3_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 2, 5), sharingGroup));
-			AllocatedSlot s4_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 3, 5), sharingGroup));
+			SimpleSlot s1_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 0, 5), sharingGroup));
+			SimpleSlot s2_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 1, 5), sharingGroup));
+			SimpleSlot s3_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 2, 5), sharingGroup));
+			SimpleSlot s4_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 3, 5), sharingGroup));
 			
 			assertNotNull(s1_2);
 			assertNotNull(s2_2);
@@ -228,7 +228,7 @@ public class SchedulerSlotSharingTest {
 			}
 			
 			// we can schedule something from the first vertex group
-			AllocatedSlot s5 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 4, 5), sharingGroup));
+			SimpleSlot s5 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 4, 5), sharingGroup));
 			assertNotNull(s5);
 			
 			assertEquals(4, sharingGroup.getTaskAssignment().getNumberOfSlots());
@@ -238,7 +238,7 @@ public class SchedulerSlotSharingTest {
 			
 			// now we release a slot from the second vertex group and schedule another task from that group
 			s2_2.releaseSlot();
-			AllocatedSlot s5_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 4, 5), sharingGroup));
+			SimpleSlot s5_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 4, 5), sharingGroup));
 			assertNotNull(s5_2);
 			
 			// release all slots
@@ -279,10 +279,10 @@ public class SchedulerSlotSharingTest {
 			scheduler.newInstanceAvailable(getRandomInstance(2));
 			
 			// schedule 4 tasks from the first vertex group
-			AllocatedSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 0, 4), sharingGroup));
-			AllocatedSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 1, 4), sharingGroup));
-			AllocatedSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup));
-			AllocatedSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 3, 4), sharingGroup));
+			SimpleSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 0, 4), sharingGroup));
+			SimpleSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 1, 4), sharingGroup));
+			SimpleSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup));
+			SimpleSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 3, 4), sharingGroup));
 			
 			assertEquals(4, sharingGroup.getTaskAssignment().getNumberOfSlots());
 			assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForJid(jid1));
@@ -298,10 +298,10 @@ public class SchedulerSlotSharingTest {
 			assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForJid(jid2));
 			
 			// schedule some tasks from the second ID group
-			AllocatedSlot s1_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 0, 4), sharingGroup));
-			AllocatedSlot s2_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 1, 4), sharingGroup));
-			AllocatedSlot s3_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 2, 4), sharingGroup));
-			AllocatedSlot s4_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 3, 4), sharingGroup));
+			SimpleSlot s1_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 0, 4), sharingGroup));
+			SimpleSlot s2_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 1, 4), sharingGroup));
+			SimpleSlot s3_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 2, 4), sharingGroup));
+			SimpleSlot s4_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 3, 4), sharingGroup));
 
 			assertEquals(4, sharingGroup.getTaskAssignment().getNumberOfSlots());
 			assertEquals(4, sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForJid(jid1));
@@ -344,10 +344,10 @@ public class SchedulerSlotSharingTest {
 			scheduler.newInstanceAvailable(getRandomInstance(2));
 			
 			// schedule 4 tasks from the first vertex group
-			AllocatedSlot s1_1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 0, 4), sharingGroup));
-			AllocatedSlot s2_1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 1, 4), sharingGroup));
-			AllocatedSlot s3_1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup));
-			AllocatedSlot s4_1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 3, 4), sharingGroup));
+			SimpleSlot s1_1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 0, 4), sharingGroup));
+			SimpleSlot s2_1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 1, 4), sharingGroup));
+			SimpleSlot s3_1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup));
+			SimpleSlot s4_1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 3, 4), sharingGroup));
 			
 			assertNotNull(s1_1);
 			assertNotNull(s2_1);
@@ -357,10 +357,10 @@ public class SchedulerSlotSharingTest {
 			assertTrue(areAllDistinct(s1_1, s2_1, s3_1, s4_1));
 			
 			// schedule 4 tasks from the second vertex group
-			AllocatedSlot s1_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 0, 7), sharingGroup));
-			AllocatedSlot s2_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 1, 7), sharingGroup));
-			AllocatedSlot s3_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 2, 7), sharingGroup));
-			AllocatedSlot s4_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 3, 7), sharingGroup));
+			SimpleSlot s1_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 0, 7), sharingGroup));
+			SimpleSlot s2_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 1, 7), sharingGroup));
+			SimpleSlot s3_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 2, 7), sharingGroup));
+			SimpleSlot s4_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 3, 7), sharingGroup));
 			
 			assertNotNull(s1_2);
 			assertNotNull(s2_2);
@@ -370,10 +370,10 @@ public class SchedulerSlotSharingTest {
 			assertTrue(areAllDistinct(s1_2, s2_2, s3_2, s4_2));
 			
 			// schedule 4 tasks from the third vertex group
-			AllocatedSlot s1_3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 0, 4), sharingGroup));
-			AllocatedSlot s2_3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 1, 4), sharingGroup));
-			AllocatedSlot s3_3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 2, 4), sharingGroup));
-			AllocatedSlot s4_3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 3, 4), sharingGroup));
+			SimpleSlot s1_3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 0, 4), sharingGroup));
+			SimpleSlot s2_3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 1, 4), sharingGroup));
+			SimpleSlot s3_3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 2, 4), sharingGroup));
+			SimpleSlot s4_3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 3, 4), sharingGroup));
 			
 			assertNotNull(s1_3);
 			assertNotNull(s2_3);
@@ -401,9 +401,9 @@ public class SchedulerSlotSharingTest {
 			s3_2.releaseSlot();
 			s4_2.releaseSlot();
 			
-			AllocatedSlot s5_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 5, 7), sharingGroup));
-			AllocatedSlot s6_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 6, 7), sharingGroup));
-			AllocatedSlot s7_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 7, 7), sharingGroup));
+			SimpleSlot s5_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 5, 7), sharingGroup));
+			SimpleSlot s6_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 6, 7), sharingGroup));
+			SimpleSlot s7_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 7, 7), sharingGroup));
 			
 			assertNotNull(s5_2);
 			assertNotNull(s6_2);
@@ -454,9 +454,9 @@ public class SchedulerSlotSharingTest {
 			scheduler.newInstanceAvailable(getRandomInstance(2));
 			
 			// schedule 1 tasks from the first vertex group and 2 from the second
-			AllocatedSlot s1_1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 0, 2), sharingGroup));
-			AllocatedSlot s2_1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 0, 2), sharingGroup));
-			AllocatedSlot s2_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 1, 2), sharingGroup));
+			SimpleSlot s1_1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 0, 2), sharingGroup));
+			SimpleSlot s2_1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 0, 2), sharingGroup));
+			SimpleSlot s2_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 1, 2), sharingGroup));
 			
 			assertNotNull(s1_1);
 			assertNotNull(s2_1);
@@ -472,7 +472,7 @@ public class SchedulerSlotSharingTest {
 			
 			
 			// this should free one slot so we can allocate one non-shared
-			AllocatedSlot sx = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 0, 1)));
+			SimpleSlot sx = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 0, 1)));
 			assertNotNull(sx);
 			
 			assertEquals(1, sharingGroup.getTaskAssignment().getNumberOfSlots());
@@ -507,23 +507,23 @@ public class SchedulerSlotSharingTest {
 			scheduler.newInstanceAvailable(getRandomInstance(2));
 			
 			// schedule some individual vertices
-			AllocatedSlot sA1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jidA, 0, 2)));
-			AllocatedSlot sA2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jidA, 1, 2)));
+			SimpleSlot sA1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jidA, 0, 2)));
+			SimpleSlot sA2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jidA, 1, 2)));
 			assertNotNull(sA1);
 			assertNotNull(sA2);
 			
 			// schedule some vertices in the sharing group
-			AllocatedSlot s1_0 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 0, 4), sharingGroup));
-			AllocatedSlot s1_1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 1, 4), sharingGroup));
-			AllocatedSlot s2_0 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 0, 4), sharingGroup));
-			AllocatedSlot s2_1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 1, 4), sharingGroup));
+			SimpleSlot s1_0 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 0, 4), sharingGroup));
+			SimpleSlot s1_1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 1, 4), sharingGroup));
+			SimpleSlot s2_0 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 0, 4), sharingGroup));
+			SimpleSlot s2_1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 1, 4), sharingGroup));
 			assertNotNull(s1_0);
 			assertNotNull(s1_1);
 			assertNotNull(s2_0);
 			assertNotNull(s2_1);
 			
 			// schedule another isolated vertex
-			AllocatedSlot sB1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jidB, 1, 3)));
+			SimpleSlot sB1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jidB, 1, 3)));
 			assertNotNull(sB1);
 			
 			// should not be able to schedule more vertices
@@ -574,8 +574,8 @@ public class SchedulerSlotSharingTest {
 			// release some isolated task and check that the sharing group may grow
 			sA1.releaseSlot();
 			
-			AllocatedSlot s1_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup));
-			AllocatedSlot s2_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 3, 4), sharingGroup));
+			SimpleSlot s1_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup));
+			SimpleSlot s2_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 3, 4), sharingGroup));
 			assertNotNull(s1_2);
 			assertNotNull(s2_2);
 			
@@ -587,19 +587,19 @@ public class SchedulerSlotSharingTest {
 			assertEquals(1, scheduler.getNumberOfAvailableSlots());
 			
 			// schedule one more no-shared task
-			AllocatedSlot sB0 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jidB, 0, 3)));
+			SimpleSlot sB0 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jidB, 0, 3)));
 			assertNotNull(sB0);
 			
 			// release the last of the original shared slots and allocate one more non-shared slot
 			s2_1.releaseSlot();
-			AllocatedSlot sB2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jidB, 2, 3)));
+			SimpleSlot sB2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jidB, 2, 3)));
 			assertNotNull(sB2);
 			
 			
 			// release on non-shared and add some shared slots
 			sA2.releaseSlot();
-			AllocatedSlot s1_3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 3, 4), sharingGroup));
-			AllocatedSlot s2_3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 2, 4), sharingGroup));
+			SimpleSlot s1_3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 3, 4), sharingGroup));
+			SimpleSlot s2_3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 2, 4), sharingGroup));
 			assertNotNull(s1_3);
 			assertNotNull(s2_3);
 			
@@ -609,8 +609,8 @@ public class SchedulerSlotSharingTest {
 			s1_3.releaseSlot();
 			s2_3.releaseSlot();
 			
-			AllocatedSlot sC0 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jidC, 1, 2)));
-			AllocatedSlot sC1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jidC, 0, 2)));
+			SimpleSlot sC0 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jidC, 1, 2)));
+			SimpleSlot sC1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jidC, 0, 2)));
 			assertNotNull(sC0);
 			assertNotNull(sC1);
 			
@@ -655,8 +655,8 @@ public class SchedulerSlotSharingTest {
 			
 			
 			// schedule one to each instance
-			AllocatedSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, i1), sharingGroup));
-			AllocatedSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, i2), sharingGroup));
+			SimpleSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, i1), sharingGroup));
+			SimpleSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, i2), sharingGroup));
 			assertNotNull(s1);
 			assertNotNull(s2);
 			
@@ -665,8 +665,8 @@ public class SchedulerSlotSharingTest {
 			assertEquals(1, i2.getNumberOfAvailableSlots());
 			
 			// schedule one from the other group to each instance
-			AllocatedSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, i1), sharingGroup));
-			AllocatedSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, i2), sharingGroup));
+			SimpleSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, i1), sharingGroup));
+			SimpleSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, i2), sharingGroup));
 			assertNotNull(s3);
 			assertNotNull(s4);
 			
@@ -705,8 +705,8 @@ public class SchedulerSlotSharingTest {
 			
 			
 			// schedule one to each instance
-			AllocatedSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, i1), sharingGroup));
-			AllocatedSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, i1), sharingGroup));
+			SimpleSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, i1), sharingGroup));
+			SimpleSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, i1), sharingGroup));
 			assertNotNull(s1);
 			assertNotNull(s2);
 			
@@ -715,8 +715,8 @@ public class SchedulerSlotSharingTest {
 			assertEquals(2, i2.getNumberOfAvailableSlots());
 			
 			// schedule one from the other group to each instance
-			AllocatedSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, i2), sharingGroup));
-			AllocatedSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, i2), sharingGroup));
+			SimpleSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, i2), sharingGroup));
+			SimpleSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, i2), sharingGroup));
 			assertNotNull(s3);
 			assertNotNull(s4);
 			
@@ -754,14 +754,14 @@ public class SchedulerSlotSharingTest {
 			scheduler.newInstanceAvailable(i2);
 			
 			// schedule until the one instance is full
-			AllocatedSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, i1), sharingGroup));
-			AllocatedSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, i1), sharingGroup));
-			AllocatedSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 4, i1), sharingGroup));
-			AllocatedSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 4, i1), sharingGroup));
+			SimpleSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, i1), sharingGroup));
+			SimpleSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, i1), sharingGroup));
+			SimpleSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 4, i1), sharingGroup));
+			SimpleSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 4, i1), sharingGroup));
 
 			// schedule two more with preference of same instance --> need to go to other instance
-			AllocatedSlot s5 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 3, 4, i1), sharingGroup));
-			AllocatedSlot s6 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 4, 4, i1), sharingGroup));
+			SimpleSlot s5 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 3, 4, i1), sharingGroup));
+			SimpleSlot s6 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 4, 4, i1), sharingGroup));
 			
 			assertNotNull(s1);
 			assertNotNull(s2);
@@ -808,19 +808,19 @@ public class SchedulerSlotSharingTest {
 			scheduler.newInstanceAvailable(getRandomInstance(4));
 			
 			// allocate something from group 1 and 2 interleaved with schedule for group 3
-			AllocatedSlot slot_1_1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 0, 4), sharingGroup));
-			AllocatedSlot slot_1_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 1, 4), sharingGroup));
+			SimpleSlot slot_1_1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 0, 4), sharingGroup));
+			SimpleSlot slot_1_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 1, 4), sharingGroup));
 
-			AllocatedSlot slot_2_1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 0, 4), sharingGroup));
-			AllocatedSlot slot_2_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 1, 4), sharingGroup));
+			SimpleSlot slot_2_1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 0, 4), sharingGroup));
+			SimpleSlot slot_2_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 1, 4), sharingGroup));
 			
-			AllocatedSlot slot_3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 0, 1), sharingGroup));
+			SimpleSlot slot_3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 0, 1), sharingGroup));
 			
-			AllocatedSlot slot_1_3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup));
-			AllocatedSlot slot_1_4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 3, 4), sharingGroup));
+			SimpleSlot slot_1_3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup));
+			SimpleSlot slot_1_4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 3, 4), sharingGroup));
 			
-			AllocatedSlot slot_2_3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 2, 4), sharingGroup));
-			AllocatedSlot slot_2_4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 3, 4), sharingGroup));
+			SimpleSlot slot_2_3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 2, 4), sharingGroup));
+			SimpleSlot slot_2_4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 3, 4), sharingGroup));
 			
 			// release groups 1 and 2
 			
@@ -836,10 +836,10 @@ public class SchedulerSlotSharingTest {
 			
 			// allocate group 4
 			
-			AllocatedSlot slot_4_1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid4, 0, 4), sharingGroup));
-			AllocatedSlot slot_4_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid4, 1, 4), sharingGroup));
-			AllocatedSlot slot_4_3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid4, 2, 4), sharingGroup));
-			AllocatedSlot slot_4_4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid4, 3, 4), sharingGroup));
+			SimpleSlot slot_4_1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid4, 0, 4), sharingGroup));
+			SimpleSlot slot_4_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid4, 1, 4), sharingGroup));
+			SimpleSlot slot_4_3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid4, 2, 4), sharingGroup));
+			SimpleSlot slot_4_4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid4, 3, 4), sharingGroup));
 			
 			// release groups 3 and 4
 			
@@ -892,11 +892,11 @@ public class SchedulerSlotSharingTest {
 					@Override
 					public void run() {
 						try {
-							AllocatedSlot slot = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid4, enumerator4.getAndIncrement(), 4), sharingGroup));
-							
+							SimpleSlot slot = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid4, enumerator4.getAndIncrement(), 4), sharingGroup));
+
 							sleepUninterruptibly(rnd.nextInt(5));
 							slot.releaseSlot();
-							
+
 							if (completed.incrementAndGet() == 13) {
 								synchronized (completed) {
 									completed.notifyAll();
@@ -915,7 +915,7 @@ public class SchedulerSlotSharingTest {
 					public void run() {
 						try {
 							if (flag3.compareAndSet(false, true)) {
-								AllocatedSlot slot = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 0, 1), sharingGroup));
+								SimpleSlot slot = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 0, 1), sharingGroup));
 								
 								sleepUninterruptibly(5);
 								
@@ -944,7 +944,7 @@ public class SchedulerSlotSharingTest {
 					@Override
 					public void run() {
 						try {
-							AllocatedSlot slot = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, enumerator2.getAndIncrement(), 4), sharingGroup));
+							SimpleSlot slot = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, enumerator2.getAndIncrement(), 4), sharingGroup));
 							
 							// wait a bit till scheduling the successor
 							sleepUninterruptibly(rnd.nextInt(5));
@@ -971,7 +971,7 @@ public class SchedulerSlotSharingTest {
 					@Override
 					public void run() {
 						try {
-							AllocatedSlot slot = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, enumerator1.getAndIncrement(), 4), sharingGroup));
+							SimpleSlot slot = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, enumerator1.getAndIncrement(), 4), sharingGroup));
 							
 							// wait a bit till scheduling the successor
 							sleepUninterruptibly(rnd.nextInt(5));
@@ -1049,24 +1049,24 @@ public class SchedulerSlotSharingTest {
 			scheduler.newInstanceAvailable(getRandomInstance(4));
 			
 			// schedule one task for the first and second vertex
-			AllocatedSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 0, 1), sharingGroup));
-			AllocatedSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 0, 1), sharingGroup));
+			SimpleSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 0, 1), sharingGroup));
+			SimpleSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 0, 1), sharingGroup));
 			
-			assertTrue( ((SubSlot) s1).getSharedSlot() == ((SubSlot) s2).getSharedSlot() );
+			assertTrue(  s1.getParent() == s2.getParent() );
 			assertEquals(3, scheduler.getNumberOfAvailableSlots());
 			
-			AllocatedSlot s3_0 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 0, 5), sharingGroup));
-			AllocatedSlot s3_1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 1, 5), sharingGroup));
-			AllocatedSlot s4_0 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid4, 0, 4), sharingGroup));
-			AllocatedSlot s4_1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid4, 1, 4), sharingGroup));
+			SimpleSlot s3_0 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 0, 5), sharingGroup));
+			SimpleSlot s3_1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 1, 5), sharingGroup));
+			SimpleSlot s4_0 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid4, 0, 4), sharingGroup));
+			SimpleSlot s4_1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid4, 1, 4), sharingGroup));
 			
 			s1.releaseSlot();
 			s2.releaseSlot();
 			
-			AllocatedSlot s3_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 2, 5), sharingGroup));
-			AllocatedSlot s3_3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 3, 5), sharingGroup));
-			AllocatedSlot s4_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid4, 2, 4), sharingGroup));
-			AllocatedSlot s4_3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid4, 3, 4), sharingGroup));
+			SimpleSlot s3_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 2, 5), sharingGroup));
+			SimpleSlot s3_3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 3, 5), sharingGroup));
+			SimpleSlot s4_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid4, 2, 4), sharingGroup));
+			SimpleSlot s4_3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid4, 3, 4), sharingGroup));
 			
 			try {
 				scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 4, 5), sharingGroup));

http://git-wip-us.apache.org/repos/asf/flink/blob/db1b8b99/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SharedSlotsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SharedSlotsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SharedSlotsTest.java
index feb3005..fcb638f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SharedSlotsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SharedSlotsTest.java
@@ -23,6 +23,8 @@ import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Matchers.any;
 import static org.junit.Assert.*;
 
+import org.apache.flink.runtime.instance.SharedSlot;
+import org.apache.flink.runtime.instance.SimpleSlot;
 import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -39,41 +41,45 @@ public class SharedSlotsTest {
 			doAnswer(new Answer<Void>() {
 				@Override
 				public Void answer(InvocationOnMock invocation) throws Throwable {
-					final SubSlot sub = (SubSlot) invocation.getArguments()[0];
-					final SharedSlot shared = (SharedSlot) invocation.getArguments()[1];
-					shared.releaseSlot(sub);
+					final SimpleSlot simpleSlot = (SimpleSlot) invocation.getArguments()[0];
+					final SharedSlot sharedSlot = simpleSlot.getParent();
+
+					sharedSlot.freeSubSlot(simpleSlot);
+
 					return null;
 				}
 				
-			}).when(assignment).releaseSubSlot(any(SubSlot.class), any(SharedSlot.class));
+			}).when(assignment).releaseSimpleSlot(any(SimpleSlot.class));
+
+			JobVertexID id1 = new JobVertexID();
 			
 			Instance instance = SchedulerTestUtils.getRandomInstance(1);
 			
-			SharedSlot slot = new SharedSlot(instance.allocateSlot(new JobID()), assignment);
-			assertFalse(slot.isDisposed());
+			SharedSlot slot = instance.allocateSharedSlot(new JobID(), assignment, id1);
+			assertFalse(slot.isDead());
 			
-			SubSlot ss1 = slot.allocateSubSlot(new JobVertexID());
+			SimpleSlot ss1 = slot.allocateSubSlot(id1);
 			assertNotNull(ss1);
 			
 			// verify resources
 			assertEquals(instance, ss1.getInstance());
 			assertEquals(0, ss1.getSlotNumber());
-			assertEquals(slot.getAllocatedSlot().getJobID(), ss1.getJobID());
+			assertEquals(slot.getJobID(), ss1.getJobID());
 			
-			SubSlot ss2 = slot.allocateSubSlot(new JobVertexID());
+			SimpleSlot ss2 = slot.allocateSubSlot(new JobVertexID());
 			assertNotNull(ss2);
 			
-			assertEquals(2, slot.getNumberOfAllocatedSubSlots());
+			assertEquals(2, slot.getNumberLeaves());
 			
 			// release first slot, should not trigger release
 			ss1.releaseSlot();
-			assertFalse(slot.isDisposed());
+			assertFalse(slot.isDead());
 			
 			ss2.releaseSlot();
-			assertFalse(slot.isDisposed());
+			assertFalse(slot.isDead());
 			
 			// the shared slot should now dispose itself
-			assertEquals(0, slot.getNumberOfAllocatedSubSlots());
+			assertEquals(0, slot.getNumberLeaves());
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -85,46 +91,49 @@ public class SharedSlotsTest {
 	public void createAndRelease() {
 		try {
 			SlotSharingGroupAssignment assignment = mock(SlotSharingGroupAssignment.class);
-			doAnswer(new Answer<Void>() {
+			doAnswer(new Answer<Boolean>() {
 				@Override
-				public Void answer(InvocationOnMock invocation) throws Throwable {
-					final SubSlot sub = (SubSlot) invocation.getArguments()[0];
-					final SharedSlot shared = (SharedSlot) invocation.getArguments()[1];
-					if (shared.releaseSlot(sub) == 0) {
-						shared.dispose();
+				public Boolean answer(InvocationOnMock invocation) throws Throwable {
+					final SimpleSlot slot = (SimpleSlot) invocation.getArguments()[0];
+					final SharedSlot shared = slot.getParent();
+					if (shared.freeSubSlot(slot) == 0) {
+						shared.markDead();
+						return true;
 					}
-					return null;
+					return false;
 				}
-				
-			}).when(assignment).releaseSubSlot(any(SubSlot.class), any(SharedSlot.class));
-			
+
+			}).when(assignment).releaseSimpleSlot(any(SimpleSlot.class));
+
+			JobVertexID id1 = new JobVertexID();
+
 			Instance instance = SchedulerTestUtils.getRandomInstance(1);
 			
-			SharedSlot slot = new SharedSlot(instance.allocateSlot(new JobID()), assignment);
-			assertFalse(slot.isDisposed());
+			SharedSlot slot = instance.allocateSharedSlot(new JobID(), assignment, id1);
+			assertFalse(slot.isDead());
 			
-			SubSlot ss1 = slot.allocateSubSlot(new JobVertexID());
+			SimpleSlot ss1 = slot.allocateSubSlot(id1);
 			assertNotNull(ss1);
 			
 			// verify resources
 			assertEquals(instance, ss1.getInstance());
 			assertEquals(0, ss1.getSlotNumber());
-			assertEquals(slot.getAllocatedSlot().getJobID(), ss1.getJobID());
+			assertEquals(slot.getJobID(), ss1.getJobID());
 			
-			SubSlot ss2 = slot.allocateSubSlot(new JobVertexID());
+			SimpleSlot ss2 = slot.allocateSubSlot(new JobVertexID());
 			assertNotNull(ss2);
 			
-			assertEquals(2, slot.getNumberOfAllocatedSubSlots());
+			assertEquals(2, slot.getNumberLeaves());
 			
 			// release first slot, should not trigger release
 			ss1.releaseSlot();
-			assertFalse(slot.isDisposed());
+			assertFalse(slot.isDead());
 			
 			ss2.releaseSlot();
-			assertTrue(slot.isDisposed());
+			assertTrue(slot.isDead());
 			
 			// the shared slot should now dispose itself
-			assertEquals(0, slot.getNumberOfAllocatedSubSlots());
+			assertEquals(0, slot.getNumberLeaves());
 			
 			assertNull(slot.allocateSubSlot(new JobVertexID()));
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/db1b8b99/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFutureTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFutureTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFutureTest.java
index 47afacb..23a5c94 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFutureTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFutureTest.java
@@ -23,7 +23,7 @@ import static org.junit.Assert.*;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.flink.runtime.instance.AllocatedSlot;
+import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.jobgraph.JobID;
 import org.junit.Test;
 
@@ -36,7 +36,7 @@ public class SlotAllocationFutureTest {
 			
 			SlotAllocationFutureAction action = new SlotAllocationFutureAction() {
 				@Override
-				public void slotAllocated(AllocatedSlot slot) {}
+				public void slotAllocated(SimpleSlot slot) {}
 			};
 			
 			future.setFutureAction(action);
@@ -47,8 +47,8 @@ public class SlotAllocationFutureTest {
 				// expected
 			}
 			
-			final AllocatedSlot slot1 = new AllocatedSlot(new JobID(), SchedulerTestUtils.getRandomInstance(1), 0);
-			final AllocatedSlot slot2 = new AllocatedSlot(new JobID(), SchedulerTestUtils.getRandomInstance(1), 0);
+			final SimpleSlot slot1 = new SimpleSlot(new JobID(), SchedulerTestUtils.getRandomInstance(1), 0, null, null);
+			final SimpleSlot slot2 = new SimpleSlot(new JobID(), SchedulerTestUtils.getRandomInstance(1), 0, null, null);
 			
 			future.setSlot(slot1);
 			try {
@@ -71,13 +71,13 @@ public class SlotAllocationFutureTest {
 			// action before the slot
 			{
 				final AtomicInteger invocations = new AtomicInteger();
-				final AllocatedSlot thisSlot = new AllocatedSlot(new JobID(), SchedulerTestUtils.getRandomInstance(1), 0);
+				final SimpleSlot thisSlot = new SimpleSlot(new JobID(), SchedulerTestUtils.getRandomInstance(1), 0, null, null);
 				
 				SlotAllocationFuture future = new SlotAllocationFuture();
 				
 				future.setFutureAction(new SlotAllocationFutureAction() {
 					@Override
-					public void slotAllocated(AllocatedSlot slot) {
+					public void slotAllocated(SimpleSlot slot) {
 						assertEquals(thisSlot, slot);
 						invocations.incrementAndGet();
 					}
@@ -91,14 +91,14 @@ public class SlotAllocationFutureTest {
 			// slot before action
 			{
 				final AtomicInteger invocations = new AtomicInteger();
-				final AllocatedSlot thisSlot = new AllocatedSlot(new JobID(), SchedulerTestUtils.getRandomInstance(1), 0);
+				final SimpleSlot thisSlot = new SimpleSlot(new JobID(), SchedulerTestUtils.getRandomInstance(1), 0, null, null);
 				
 				SlotAllocationFuture future = new SlotAllocationFuture();
 				future.setSlot(thisSlot);
 				
 				future.setFutureAction(new SlotAllocationFutureAction() {
 					@Override
-					public void slotAllocated(AllocatedSlot slot) {
+					public void slotAllocated(SimpleSlot slot) {
 						assertEquals(thisSlot, slot);
 						invocations.incrementAndGet();
 					}
@@ -121,7 +121,7 @@ public class SlotAllocationFutureTest {
 				final AtomicInteger invocations = new AtomicInteger();
 				final AtomicBoolean error = new AtomicBoolean();
 				
-				final AllocatedSlot thisSlot = new AllocatedSlot(new JobID(), SchedulerTestUtils.getRandomInstance(1), 0);
+				final SimpleSlot thisSlot = new SimpleSlot(new JobID(), SchedulerTestUtils.getRandomInstance(1), 0, null, null);
 				
 				final SlotAllocationFuture future = new SlotAllocationFuture();
 				
@@ -130,7 +130,7 @@ public class SlotAllocationFutureTest {
 					@Override
 					public void run() {
 						try {
-							AllocatedSlot syncSlot = future.waitTillAllocated();
+							SimpleSlot syncSlot = future.waitTillAllocated();
 							if (syncSlot == null || syncSlot != thisSlot) {
 								error.set(true);
 								return;
@@ -158,12 +158,12 @@ public class SlotAllocationFutureTest {
 			
 			// setting slot before syncing
 			{
-				final AllocatedSlot thisSlot = new AllocatedSlot(new JobID(), SchedulerTestUtils.getRandomInstance(1), 0);
+				final SimpleSlot thisSlot = new SimpleSlot(new JobID(), SchedulerTestUtils.getRandomInstance(1), 0, null, null);
 				final SlotAllocationFuture future = new SlotAllocationFuture();
 
 				future.setSlot(thisSlot);
 				
-				AllocatedSlot retrieved = future.waitTillAllocated();
+				SimpleSlot retrieved = future.waitTillAllocated();
 				
 				assertNotNull(retrieved);
 				assertEquals(thisSlot, retrieved);

http://git-wip-us.apache.org/repos/asf/flink/blob/db1b8b99/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
index 36f0f92..9bcd163 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
@@ -62,7 +62,7 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
         within(1 second) {
           jm ! SubmitJob(jobGraph)
 
-          expectMsg(SubmissionFailure(jobGraph.getJobID, new NoResourceAvailableException(1,1)))
+          expectMsg(SubmissionFailure(jobGraph.getJobID, new NoResourceAvailableException(1,1,0)))
 
           expectNoMsg()
         }

http://git-wip-us.apache.org/repos/asf/flink/blob/db1b8b99/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsITCase.scala
index 0f6eeca..22ee2c1 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsITCase.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsITCase.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.jobmanager
 
-import akka.actor.{ActorSystem, PoisonPill}
+import akka.actor.{Kill, ActorSystem, PoisonPill}
 import akka.testkit.{ImplicitSender, TestKit}
 import org.apache.flink.runtime.jobgraph.{AbstractJobVertex, DistributionPattern, JobGraph}
 import org.apache.flink.runtime.jobmanager.Tasks.{BlockingReceiver, Sender}
@@ -41,7 +41,7 @@ with WordSpecLike with Matchers with BeforeAndAfterAll {
   }
 
   "The JobManager" should {
-    "handle failing task manager" in {
+    "handle gracefully failing task manager" in {
       val num_tasks = 31
       val sender = new AbstractJobVertex("Sender")
       val receiver = new AbstractJobVertex("Receiver")
@@ -78,6 +78,41 @@ with WordSpecLike with Matchers with BeforeAndAfterAll {
         cluster.stop()
       }
     }
+
+    "handle hard failing task manager" in {
+      val num_tasks = 31
+      val sender = new AbstractJobVertex("Sender")
+      val receiver = new AbstractJobVertex("Receiver")
+      sender.setInvokableClass(classOf[Sender])
+      receiver.setInvokableClass(classOf[BlockingReceiver])
+      sender.setParallelism(num_tasks)
+      receiver.setParallelism(num_tasks)
+      receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
+
+      val jobGraph = new JobGraph("Pointwise Job", sender, receiver)
+      val jobID = jobGraph.getJobID
+
+      val cluster = TestingUtils.startTestingCluster(num_tasks, 2)
+
+      val taskManagers = cluster.getTaskManagers
+      val jm = cluster.getJobManager
+
+      try {
+        within(TestingUtils.TESTING_DURATION) {
+          jm ! SubmitJob(jobGraph)
+          expectMsg(SubmissionSuccess(jobGraph.getJobID))
+
+          jm ! WaitForAllVerticesToBeRunningOrFinished(jobID)
+          expectMsg(AllVerticesRunning(jobID))
+
+          // kill one task manager
+          taskManagers(0) ! Kill
+          expectMsgType[JobResultFailed]
+        }
+      }finally{
+        cluster.stop()
+      }
+    }
   }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/db1b8b99/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala
index fba7c76..626c518 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.jobmanager
 
-import akka.actor.{ActorSystem, PoisonPill}
+import akka.actor.{Kill, ActorSystem, PoisonPill}
 import akka.testkit.{ImplicitSender, TestKit}
 import org.apache.flink.runtime.jobgraph.{AbstractJobVertex, DistributionPattern, JobGraph}
 import org.apache.flink.runtime.jobmanager.Tasks.{BlockingReceiver, Sender}
@@ -41,7 +41,7 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll {
   }
 
   "The JobManager" should {
-    "handle task manager failures with slot sharing" in {
+    "handle gracefully failing task manager with slot sharing" in {
       val num_tasks = 20
 
       val sender = new AbstractJobVertex("Sender")
@@ -83,6 +83,48 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll {
         cluster.stop()
       }
     }
+
+    "handle hard failing task manager with slot sharing" in {
+      val num_tasks = 20
+
+      val sender = new AbstractJobVertex("Sender")
+      val receiver = new AbstractJobVertex("Receiver")
+
+      sender.setInvokableClass(classOf[Sender])
+      receiver.setInvokableClass(classOf[BlockingReceiver])
+
+      sender.setParallelism(num_tasks)
+      receiver.setParallelism(num_tasks)
+      receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
+
+      val sharingGroup = new SlotSharingGroup()
+      sender.setSlotSharingGroup(sharingGroup)
+      receiver.setSlotSharingGroup(sharingGroup)
+
+      val jobGraph = new JobGraph("Pointwise Job", sender, receiver)
+      val jobID = jobGraph.getJobID
+
+      val cluster = TestingUtils.startTestingCluster(num_tasks/2, 2)
+      val jm = cluster.getJobManager
+      val taskManagers = cluster.getTaskManagers
+
+      try{
+        within(TestingUtils.TESTING_DURATION) {
+          jm ! SubmitJob(jobGraph)
+          expectMsg(SubmissionSuccess(jobGraph.getJobID))
+
+          jm ! WaitForAllVerticesToBeRunningOrFinished(jobID)
+          expectMsg(AllVerticesRunning(jobID))
+
+          //kill task manager
+          taskManagers(0) ! Kill
+
+          expectMsgType[JobResultFailed]
+        }
+      }finally{
+        cluster.stop()
+      }
+    }
   }
 
 }


Mime
View raw message