Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id DC2BF200B75 for ; Sun, 4 Sep 2016 23:10:50 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id DA9ED160AC0; Sun, 4 Sep 2016 21:10:50 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 5DE98160AB9 for ; Sun, 4 Sep 2016 23:10:49 +0200 (CEST) Received: (qmail 50612 invoked by uid 500); 4 Sep 2016 21:10:48 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 50583 invoked by uid 99); 4 Sep 2016 21:10:48 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 04 Sep 2016 21:10:48 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 2D9A8E058E; Sun, 4 Sep 2016 21:10:48 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sewen@apache.org To: commits@flink.apache.org Date: Sun, 04 Sep 2016 21:10:48 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/2] flink git commit: [FLINK-4459] [distributed runtime] Introduce SlotProvider for Scheduler archived-at: Sun, 04 Sep 2016 21:10:51 -0000 Repository: flink Updated Branches: refs/heads/master 502a79d39 -> 6e40f5901 http://git-wip-us.apache.org/repos/asf/flink/blob/6e40f590/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java index fd0523b..c4121f6 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java @@ -62,10 +62,10 @@ public class SchedulerSlotSharingTest { scheduler.newInstanceAvailable(i2); // schedule 4 tasks from the first vertex group - SimpleSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 0, 8), sharingGroup)); - SimpleSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 1, 8), sharingGroup)); - SimpleSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 2, 8), sharingGroup)); - SimpleSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 3, 8), sharingGroup)); + SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 8), sharingGroup), false).get(); + SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 8), sharingGroup), false).get(); + SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 8), sharingGroup), false).get(); + SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 8), sharingGroup), false).get(); assertNotNull(s1); assertNotNull(s2); @@ -76,7 +76,7 @@ public class SchedulerSlotSharingTest { // we cannot schedule another task from the first vertex group try { - scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 4, 8), sharingGroup)); + scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 8), sharingGroup), false).get(); fail("Scheduler accepted too many tasks at the same time"); } catch (NoResourceAvailableException e) { @@ -90,7 +90,7 @@ public class SchedulerSlotSharingTest { s3.releaseSlot(); // allocate another slot from that group - SimpleSlot s5 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 4, 8), sharingGroup)); + SimpleSlot s5 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 8), sharingGroup), false).get(); assertNotNull(s5); // release all old slots @@ -98,9 +98,9 @@ public class SchedulerSlotSharingTest { s2.releaseSlot(); s4.releaseSlot(); - SimpleSlot s6 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 5, 8), sharingGroup)); - SimpleSlot s7 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 6, 8), sharingGroup)); - SimpleSlot s8 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 7, 8), sharingGroup)); + SimpleSlot s6 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 5, 8), sharingGroup), false).get(); + SimpleSlot s7 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 6, 8), sharingGroup), false).get(); + SimpleSlot s8 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 7, 8), sharingGroup), false).get(); assertNotNull(s6); assertNotNull(s7); @@ -135,7 +135,7 @@ public class SchedulerSlotSharingTest { } @Test - public void scheduleImmediatelyWithSharing() { + public void allocateSlotWithSharing() { try { JobVertexID jid1 = new JobVertexID(); JobVertexID jid2 = new JobVertexID(); @@ -147,10 +147,10 @@ public class SchedulerSlotSharingTest { scheduler.newInstanceAvailable(getRandomInstance(2)); // schedule 4 tasks from the first vertex group - SimpleSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 0, 5), sharingGroup)); - SimpleSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 1, 5), sharingGroup)); - SimpleSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 2, 5), sharingGroup)); - SimpleSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 3, 5), sharingGroup)); + SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 5), sharingGroup), false).get(); + SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 5), sharingGroup), false).get(); + SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 5), sharingGroup), false).get(); + SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 5), sharingGroup), false).get(); assertNotNull(s1); assertNotNull(s2); @@ -161,7 +161,7 @@ public class SchedulerSlotSharingTest { // we cannot schedule another task from the first vertex group try { - scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 4, 5), sharingGroup)); + scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 5), sharingGroup), false).get(); fail("Scheduler accepted too many tasks at the same time"); } catch (NoResourceAvailableException e) { @@ -172,10 +172,10 @@ public class SchedulerSlotSharingTest { } // schedule some tasks from the second ID group - SimpleSlot s1_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 0, 5), sharingGroup)); - SimpleSlot s2_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 1, 5), sharingGroup)); - SimpleSlot s3_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 2, 5), sharingGroup)); - SimpleSlot s4_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 3, 5), sharingGroup)); + SimpleSlot s1_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 5), sharingGroup), false).get(); + SimpleSlot s2_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 5), sharingGroup), false).get(); + SimpleSlot s3_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 5), sharingGroup), false).get(); + SimpleSlot s4_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 5), sharingGroup), false).get(); assertNotNull(s1_2); assertNotNull(s2_2); @@ -184,7 +184,7 @@ public class SchedulerSlotSharingTest { // we cannot schedule another task from the second vertex group try { - scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 4, 5), sharingGroup)); + scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 4, 5), sharingGroup), false).get(); fail("Scheduler accepted too many tasks at the same time"); } catch (NoResourceAvailableException e) { @@ -205,7 +205,7 @@ public class SchedulerSlotSharingTest { // we can still not schedule anything from the second group of vertices try { - scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 4, 5), sharingGroup)); + scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 4, 5), sharingGroup), false).get(); fail("Scheduler accepted too many tasks at the same time"); } catch (NoResourceAvailableException e) { @@ -216,7 +216,7 @@ public class SchedulerSlotSharingTest { } // we can schedule something from the first vertex group - SimpleSlot s5 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 4, 5), sharingGroup)); + SimpleSlot s5 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 5), sharingGroup), false).get(); assertNotNull(s5); assertEquals(4, sharingGroup.getTaskAssignment().getNumberOfSlots()); @@ -226,7 +226,7 @@ public class SchedulerSlotSharingTest { // now we release a slot from the second vertex group and schedule another task from that group s2_2.releaseSlot(); - SimpleSlot s5_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 4, 5), sharingGroup)); + SimpleSlot s5_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 4, 5), sharingGroup), false).get(); assertNotNull(s5_2); // release all slots @@ -255,7 +255,7 @@ public class SchedulerSlotSharingTest { } @Test - public void scheduleImmediatelyWithIntermediateTotallyEmptySharingGroup() { + public void allocateSlotWithIntermediateTotallyEmptySharingGroup() { try { JobVertexID jid1 = new JobVertexID(); JobVertexID jid2 = new JobVertexID(); @@ -267,10 +267,10 @@ public class SchedulerSlotSharingTest { scheduler.newInstanceAvailable(getRandomInstance(2)); // schedule 4 tasks from the first vertex group - SimpleSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 0, 4), sharingGroup)); - SimpleSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 1, 4), sharingGroup)); - SimpleSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup)); - SimpleSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 3, 4), sharingGroup)); + SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 4), sharingGroup), false).get(); + SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 4), sharingGroup), false).get(); + SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup), false).get(); + SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 4), sharingGroup), false).get(); assertEquals(4, sharingGroup.getTaskAssignment().getNumberOfSlots()); assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jid1)); @@ -286,10 +286,10 @@ public class SchedulerSlotSharingTest { assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jid2)); // schedule some tasks from the second ID group - SimpleSlot s1_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 0, 4), sharingGroup)); - SimpleSlot s2_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 1, 4), sharingGroup)); - SimpleSlot s3_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 2, 4), sharingGroup)); - SimpleSlot s4_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 3, 4), sharingGroup)); + SimpleSlot s1_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 4), sharingGroup), false).get(); + SimpleSlot s2_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 4), sharingGroup), false).get(); + SimpleSlot s3_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 4), sharingGroup), false).get(); + SimpleSlot s4_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 4), sharingGroup), false).get(); assertEquals(4, sharingGroup.getTaskAssignment().getNumberOfSlots()); assertEquals(4, sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jid1)); @@ -319,7 +319,7 @@ public class SchedulerSlotSharingTest { } @Test - public void scheduleImmediatelyWithTemprarilyEmptyVertexGroup() { + public void allocateSlotWithTemprarilyEmptyVertexGroup() { try { JobVertexID jid1 = new JobVertexID(); JobVertexID jid2 = new JobVertexID(); @@ -332,10 +332,10 @@ public class SchedulerSlotSharingTest { scheduler.newInstanceAvailable(getRandomInstance(2)); // schedule 4 tasks from the first vertex group - SimpleSlot s1_1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 0, 4), sharingGroup)); - SimpleSlot s2_1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 1, 4), sharingGroup)); - SimpleSlot s3_1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup)); - SimpleSlot s4_1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 3, 4), sharingGroup)); + SimpleSlot s1_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 4), sharingGroup), false).get(); + SimpleSlot s2_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 4), sharingGroup), false).get(); + SimpleSlot s3_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup), false).get(); + SimpleSlot s4_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 4), sharingGroup), false).get(); assertNotNull(s1_1); assertNotNull(s2_1); @@ -345,10 +345,10 @@ public class SchedulerSlotSharingTest { assertTrue(areAllDistinct(s1_1, s2_1, s3_1, s4_1)); // schedule 4 tasks from the second vertex group - SimpleSlot s1_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 0, 7), sharingGroup)); - SimpleSlot s2_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 1, 7), sharingGroup)); - SimpleSlot s3_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 2, 7), sharingGroup)); - SimpleSlot s4_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 3, 7), sharingGroup)); + SimpleSlot s1_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 7), sharingGroup), false).get(); + SimpleSlot s2_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 7), sharingGroup), false).get(); + SimpleSlot s3_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 7), sharingGroup), false).get(); + SimpleSlot s4_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 7), sharingGroup), false).get(); assertNotNull(s1_2); assertNotNull(s2_2); @@ -358,10 +358,10 @@ public class SchedulerSlotSharingTest { assertTrue(areAllDistinct(s1_2, s2_2, s3_2, s4_2)); // schedule 4 tasks from the third vertex group - SimpleSlot s1_3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 0, 4), sharingGroup)); - SimpleSlot s2_3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 1, 4), sharingGroup)); - SimpleSlot s3_3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 2, 4), sharingGroup)); - SimpleSlot s4_3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 3, 4), sharingGroup)); + SimpleSlot s1_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 4), sharingGroup), false).get(); + SimpleSlot s2_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 1, 4), sharingGroup), false).get(); + SimpleSlot s3_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 2, 4), sharingGroup), false).get(); + SimpleSlot s4_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 3, 4), sharingGroup), false).get(); assertNotNull(s1_3); assertNotNull(s2_3); @@ -373,7 +373,7 @@ public class SchedulerSlotSharingTest { // we cannot schedule another task from the second vertex group try { - scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 4, 5), sharingGroup)); + scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 5), sharingGroup), false).get(); fail("Scheduler accepted too many tasks at the same time"); } catch (NoResourceAvailableException e) { @@ -389,9 +389,9 @@ public class SchedulerSlotSharingTest { s3_2.releaseSlot(); s4_2.releaseSlot(); - SimpleSlot s5_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 5, 7), sharingGroup)); - SimpleSlot s6_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 6, 7), sharingGroup)); - SimpleSlot s7_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 7, 7), sharingGroup)); + SimpleSlot s5_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 5, 7), sharingGroup), false).get(); + SimpleSlot s6_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 6, 7), sharingGroup), false).get(); + SimpleSlot s7_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 7, 7), sharingGroup), false).get(); assertNotNull(s5_2); assertNotNull(s6_2); @@ -430,7 +430,7 @@ public class SchedulerSlotSharingTest { } @Test - public void scheduleImmediatelyWithTemprarilyEmptyVertexGroup2() { + public void allocateSlotWithTemporarilyEmptyVertexGroup2() { try { JobVertexID jid1 = new JobVertexID(); JobVertexID jid2 = new JobVertexID(); @@ -442,9 +442,9 @@ public class SchedulerSlotSharingTest { scheduler.newInstanceAvailable(getRandomInstance(2)); // schedule 1 tasks from the first vertex group and 2 from the second - SimpleSlot s1_1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 0, 2), sharingGroup)); - SimpleSlot s2_1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 0, 2), sharingGroup)); - SimpleSlot s2_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 1, 2), sharingGroup)); + SimpleSlot s1_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 2), sharingGroup), false).get(); + SimpleSlot s2_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 2), sharingGroup), false).get(); + SimpleSlot s2_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 2), sharingGroup), false).get(); assertNotNull(s1_1); assertNotNull(s2_1); @@ -460,7 +460,7 @@ public class SchedulerSlotSharingTest { // this should free one slot so we can allocate one non-shared - SimpleSlot sx = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 0, 1))); + SimpleSlot sx = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 1)), false).get(); assertNotNull(sx); assertEquals(1, sharingGroup.getTaskAssignment().getNumberOfSlots()); @@ -495,28 +495,28 @@ public class SchedulerSlotSharingTest { scheduler.newInstanceAvailable(getRandomInstance(2)); // schedule some individual vertices - SimpleSlot sA1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jidA, 0, 2))); - SimpleSlot sA2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jidA, 1, 2))); + SimpleSlot sA1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidA, 0, 2)), false).get(); + SimpleSlot sA2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidA, 1, 2)), false).get(); assertNotNull(sA1); assertNotNull(sA2); // schedule some vertices in the sharing group - SimpleSlot s1_0 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 0, 4), sharingGroup)); - SimpleSlot s1_1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 1, 4), sharingGroup)); - SimpleSlot s2_0 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 0, 4), sharingGroup)); - SimpleSlot s2_1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 1, 4), sharingGroup)); + SimpleSlot s1_0 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 4), sharingGroup), false).get(); + SimpleSlot s1_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 4), sharingGroup), false).get(); + SimpleSlot s2_0 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 4), sharingGroup), false).get(); + SimpleSlot s2_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 4), sharingGroup), false).get(); assertNotNull(s1_0); assertNotNull(s1_1); assertNotNull(s2_0); assertNotNull(s2_1); // schedule another isolated vertex - SimpleSlot sB1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jidB, 1, 3))); + SimpleSlot sB1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidB, 1, 3)), false).get(); assertNotNull(sB1); // should not be able to schedule more vertices try { - scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup)); + scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup), false).get(); fail("Scheduler accepted too many tasks at the same time"); } catch (NoResourceAvailableException e) { @@ -527,7 +527,7 @@ public class SchedulerSlotSharingTest { } try { - scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 2, 4), sharingGroup)); + scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 4), sharingGroup), false).get(); fail("Scheduler accepted too many tasks at the same time"); } catch (NoResourceAvailableException e) { @@ -538,7 +538,7 @@ public class SchedulerSlotSharingTest { } try { - scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jidB, 0, 3))); + scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidB, 0, 3)), false); fail("Scheduler accepted too many tasks at the same time"); } catch (NoResourceAvailableException e) { @@ -549,7 +549,7 @@ public class SchedulerSlotSharingTest { } try { - scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jidC, 0, 1))); + scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidC, 0, 1)), false); fail("Scheduler accepted too many tasks at the same time"); } catch (NoResourceAvailableException e) { @@ -562,8 +562,8 @@ public class SchedulerSlotSharingTest { // release some isolated task and check that the sharing group may grow sA1.releaseSlot(); - SimpleSlot s1_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup)); - SimpleSlot s2_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 3, 4), sharingGroup)); + SimpleSlot s1_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup), false).get(); + SimpleSlot s2_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 4), sharingGroup), false).get(); assertNotNull(s1_2); assertNotNull(s2_2); @@ -575,19 +575,19 @@ public class SchedulerSlotSharingTest { assertEquals(1, scheduler.getNumberOfAvailableSlots()); // schedule one more no-shared task - SimpleSlot sB0 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jidB, 0, 3))); + SimpleSlot sB0 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidB, 0, 3)), false).get(); assertNotNull(sB0); // release the last of the original shared slots and allocate one more non-shared slot s2_1.releaseSlot(); - SimpleSlot sB2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jidB, 2, 3))); + SimpleSlot sB2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidB, 2, 3)), false).get(); assertNotNull(sB2); // release on non-shared and add some shared slots sA2.releaseSlot(); - SimpleSlot s1_3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 3, 4), sharingGroup)); - SimpleSlot s2_3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 2, 4), sharingGroup)); + SimpleSlot s1_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 4), sharingGroup), false).get(); + SimpleSlot s2_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 4), sharingGroup), false).get(); assertNotNull(s1_3); assertNotNull(s2_3); @@ -597,8 +597,8 @@ public class SchedulerSlotSharingTest { s1_3.releaseSlot(); s2_3.releaseSlot(); - SimpleSlot sC0 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jidC, 1, 2))); - SimpleSlot sC1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jidC, 0, 2))); + SimpleSlot sC0 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidC, 1, 2)), false).get(); + SimpleSlot sC1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidC, 0, 2)), false).get(); assertNotNull(sC0); assertNotNull(sC1); @@ -646,8 +646,8 @@ public class SchedulerSlotSharingTest { // schedule one to each instance - SimpleSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup)); - SimpleSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc2), sharingGroup)); + SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup), false).get(); + SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc2), sharingGroup), false).get(); assertNotNull(s1); assertNotNull(s2); @@ -656,8 +656,8 @@ public class SchedulerSlotSharingTest { assertEquals(1, i2.getNumberOfAvailableSlots()); // schedule one from the other group to each instance - SimpleSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc1), sharingGroup)); - SimpleSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc2), sharingGroup)); + SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc1), sharingGroup), false).get(); + SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc2), sharingGroup), false).get(); assertNotNull(s3); assertNotNull(s4); @@ -699,8 +699,8 @@ public class SchedulerSlotSharingTest { // schedule one to each instance - SimpleSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup)); - SimpleSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc1), sharingGroup)); + SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup), false).get(); + SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc1), sharingGroup), false).get(); assertNotNull(s1); assertNotNull(s2); @@ -709,8 +709,8 @@ public class SchedulerSlotSharingTest { assertEquals(2, i2.getNumberOfAvailableSlots()); // schedule one from the other group to each instance - SimpleSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc2), sharingGroup)); - SimpleSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc2), sharingGroup)); + SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc2), sharingGroup), false).get(); + SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc2), sharingGroup), false).get(); assertNotNull(s3); assertNotNull(s4); @@ -750,14 +750,14 @@ public class SchedulerSlotSharingTest { scheduler.newInstanceAvailable(i2); // schedule until the one instance is full - SimpleSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup)); - SimpleSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc1), sharingGroup)); - SimpleSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 4, loc1), sharingGroup)); - SimpleSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 4, loc1), sharingGroup)); + SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup), false).get(); + SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc1), sharingGroup), false).get(); + SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 4, loc1), sharingGroup), false).get(); + SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 4, loc1), sharingGroup), false).get(); // schedule two more with preference of same instance --> need to go to other instance - SimpleSlot s5 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 3, 4, loc1), sharingGroup)); - SimpleSlot s6 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 4, 4, loc1), sharingGroup)); + SimpleSlot s5 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 3, 4, loc1), sharingGroup), false).get(); + SimpleSlot s6 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 4, 4, loc1), sharingGroup), false).get(); assertNotNull(s1); assertNotNull(s2); @@ -803,19 +803,19 @@ public class SchedulerSlotSharingTest { scheduler.newInstanceAvailable(getRandomInstance(4)); // allocate something from group 1 and 2 interleaved with schedule for group 3 - SimpleSlot slot_1_1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 0, 4), sharingGroup)); - SimpleSlot slot_1_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 1, 4), sharingGroup)); + SimpleSlot slot_1_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 4), sharingGroup), false).get(); + SimpleSlot slot_1_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 4), sharingGroup), false).get(); - SimpleSlot slot_2_1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 0, 4), sharingGroup)); - SimpleSlot slot_2_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 1, 4), sharingGroup)); + SimpleSlot slot_2_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 4), sharingGroup), false).get(); + SimpleSlot slot_2_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 4), sharingGroup), false).get(); - SimpleSlot slot_3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 0, 1), sharingGroup)); + SimpleSlot slot_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 1), sharingGroup), false).get(); - SimpleSlot slot_1_3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup)); - SimpleSlot slot_1_4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 3, 4), sharingGroup)); + SimpleSlot slot_1_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup), false).get(); + SimpleSlot slot_1_4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 4), sharingGroup), false).get(); - SimpleSlot slot_2_3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 2, 4), sharingGroup)); - SimpleSlot slot_2_4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 3, 4), sharingGroup)); + SimpleSlot slot_2_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 4), sharingGroup), false).get(); + SimpleSlot slot_2_4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 4), sharingGroup), false).get(); // release groups 1 and 2 @@ -831,10 +831,10 @@ public class SchedulerSlotSharingTest { // allocate group 4 - SimpleSlot slot_4_1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid4, 0, 4), sharingGroup)); - SimpleSlot slot_4_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid4, 1, 4), sharingGroup)); - SimpleSlot slot_4_3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid4, 2, 4), sharingGroup)); - SimpleSlot slot_4_4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid4, 3, 4), sharingGroup)); + SimpleSlot slot_4_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 0, 4), sharingGroup), false).get(); + SimpleSlot slot_4_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 1, 4), sharingGroup), false).get(); + SimpleSlot slot_4_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 2, 4), sharingGroup), false).get(); + SimpleSlot slot_4_4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 3, 4), sharingGroup), false).get(); // release groups 3 and 4 @@ -885,7 +885,7 @@ public class SchedulerSlotSharingTest { @Override public void run() { try { - SimpleSlot slot = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid4, enumerator4.getAndIncrement(), 4), sharingGroup)); + SimpleSlot slot = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, enumerator4.getAndIncrement(), 4), sharingGroup), false).get(); sleepUninterruptibly(rnd.nextInt(5)); slot.releaseSlot(); @@ -908,7 +908,7 @@ public class SchedulerSlotSharingTest { public void run() { try { if (flag3.compareAndSet(false, true)) { - SimpleSlot slot = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 0, 1), sharingGroup)); + SimpleSlot slot = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 1), sharingGroup), false).get(); sleepUninterruptibly(5); @@ -937,7 +937,7 @@ public class SchedulerSlotSharingTest { @Override public void run() { try { - SimpleSlot slot = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, enumerator2.getAndIncrement(), 4), sharingGroup)); + SimpleSlot slot = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, enumerator2.getAndIncrement(), 4), sharingGroup), false).get(); // wait a bit till scheduling the successor sleepUninterruptibly(rnd.nextInt(5)); @@ -964,7 +964,7 @@ public class SchedulerSlotSharingTest { @Override public void run() { try { - SimpleSlot slot = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, enumerator1.getAndIncrement(), 4), sharingGroup)); + SimpleSlot slot = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, enumerator1.getAndIncrement(), 4), sharingGroup), false).get(); // wait a bit till scheduling the successor sleepUninterruptibly(rnd.nextInt(5)); @@ -1039,27 +1039,27 @@ public class SchedulerSlotSharingTest { scheduler.newInstanceAvailable(getRandomInstance(4)); // schedule one task for the first and second vertex - SimpleSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 0, 1), sharingGroup)); - SimpleSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 0, 1), sharingGroup)); + SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 1), sharingGroup), false).get(); + SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 1), sharingGroup), false).get(); assertTrue( s1.getParent() == s2.getParent() ); assertEquals(3, scheduler.getNumberOfAvailableSlots()); - SimpleSlot s3_0 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 0, 5), sharingGroup)); - SimpleSlot s3_1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 1, 5), sharingGroup)); - SimpleSlot s4_0 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid4, 0, 4), sharingGroup)); - SimpleSlot s4_1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid4, 1, 4), sharingGroup)); + SimpleSlot s3_0 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 5), sharingGroup), false).get(); + SimpleSlot s3_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 1, 5), sharingGroup), false).get(); + SimpleSlot s4_0 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 0, 4), sharingGroup), false).get(); + SimpleSlot s4_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 1, 4), sharingGroup), false).get(); s1.releaseSlot(); s2.releaseSlot(); - SimpleSlot s3_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 2, 5), sharingGroup)); - SimpleSlot s3_3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 3, 5), sharingGroup)); - SimpleSlot s4_2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid4, 2, 4), sharingGroup)); - SimpleSlot s4_3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid4, 3, 4), sharingGroup)); + SimpleSlot s3_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 2, 5), sharingGroup), false).get(); + SimpleSlot s3_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 3, 5), sharingGroup), false).get(); + SimpleSlot s4_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 2, 4), sharingGroup), false).get(); + SimpleSlot s4_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 3, 4), sharingGroup), false).get(); try { - scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 4, 5), sharingGroup)); + scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 4, 5), sharingGroup), false); fail("should throw an exception"); } catch (NoResourceAvailableException e) {