Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id CF80C200D52 for ; Sat, 2 Dec 2017 17:04:58 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id CE0FC160C19; Sat, 2 Dec 2017 16:04:58 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id EDF42160BF7 for ; Sat, 2 Dec 2017 17:04:57 +0100 (CET) Received: (qmail 49820 invoked by uid 500); 2 Dec 2017 16:04:57 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 49811 invoked by uid 99); 2 Dec 2017 16:04:57 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 02 Dec 2017 16:04:57 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id E5F40E2F03; Sat, 2 Dec 2017 16:04:56 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: trohrmann@apache.org To: commits@flink.apache.org Date: Sat, 02 Dec 2017 16:04:56 -0000 Message-Id: <15292fb164c54165810b192d95899657@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/2] flink git commit: [FLINK-7851] [scheduling] Improve scheduling balance by round robin distribution archived-at: Sat, 02 Dec 2017 16:04:59 -0000 Repository: flink Updated Branches: refs/heads/master ee9027e49 -> 49f690986 [FLINK-7851] [scheduling] Improve scheduling balance by round robin distribution Make sure that the value maps are of type LinkedHashMap in SlotSharingGroupAssignment#availableSlotsPerJid This closes #4839. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d9c669d4 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d9c669d4 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d9c669d4 Branch: refs/heads/master Commit: d9c669d4781f095806013651c1a579eae0ca2650 Parents: ee9027e Author: Till Authored: Mon Oct 16 16:18:23 2017 +0200 Committer: Till Rohrmann Committed: Sat Dec 2 16:39:16 2017 +0100 ---------------------------------------------------------------------- .../instance/SlotSharingGroupAssignment.java | 46 ++++++----- .../SlotSharingGroupAssignmentTest.java | 82 ++++++++++++++++++++ 2 files changed, 108 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/d9c669d4/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java index 7618b18..4371290 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java @@ -96,7 +96,7 @@ public class SlotSharingGroupAssignment { private final Set allSlots = new LinkedHashSet(); /** The slots available per vertex type (JobVertexId), keyed by TaskManager, to make them locatable */ - private final Map>> availableSlotsPerJid = new LinkedHashMap<>(); + private final Map>> availableSlotsPerJid = new LinkedHashMap<>(); // -------------------------------------------------------------------------------------------- @@ -233,7 +233,7 @@ public class SlotSharingGroupAssignment { // can place a task into this slot. boolean entryForNewJidExists = false; - for (Map.Entry>> entry : availableSlotsPerJid.entrySet()) { + for (Map.Entry>> entry : availableSlotsPerJid.entrySet()) { // there is already an entry for this groupID if (entry.getKey().equals(groupIdForMap)) { entryForNewJidExists = true; @@ -246,7 +246,7 @@ public class SlotSharingGroupAssignment { // make sure an empty entry exists for this group, if no other entry exists if (!entryForNewJidExists) { - availableSlotsPerJid.put(groupIdForMap, new LinkedHashMap>()); + availableSlotsPerJid.put(groupIdForMap, new LinkedHashMap<>()); } return subSlot; @@ -391,7 +391,7 @@ public class SlotSharingGroupAssignment { } // get the available slots for the group - Map> slotsForGroup = availableSlotsPerJid.get(groupId); + LinkedHashMap> slotsForGroup = availableSlotsPerJid.get(groupId); if (slotsForGroup == null) { // we have a new group, so all slots are available @@ -621,20 +621,26 @@ public class SlotSharingGroupAssignment { private static SharedSlot pollFromMultiMap(Map> map) { Iterator>> iter = map.entrySet().iterator(); - + while (iter.hasNext()) { - List slots = iter.next().getValue(); - - if (slots.isEmpty()) { - iter.remove(); - } - else if (slots.size() == 1) { - SharedSlot slot = slots.remove(0); - iter.remove(); - return slot; - } - else { - return slots.remove(slots.size() - 1); + Map.Entry> slotEntry = iter.next(); + + // remove first entry to add it at the back if there are still slots left + iter.remove(); + + List slots = slotEntry.getValue(); + + if (!slots.isEmpty()) { + + SharedSlot result = slots.remove(slots.size() - 1); + + if (!slots.isEmpty()) { + // reinserts the entry; since it is a LinkedHashMap, we will iterate over this entry + // only after having polled from all other entries + map.put(slotEntry.getKey(), slots); + } + + return result; } } @@ -642,11 +648,11 @@ public class SlotSharingGroupAssignment { } private static void removeSlotFromAllEntries( - Map>> availableSlots, SharedSlot slot) - { + Map>> availableSlots, + SharedSlot slot) { final ResourceID taskManagerId = slot.getTaskManagerID(); - for (Map.Entry>> entry : availableSlots.entrySet()) { + for (Map.Entry>> entry : availableSlots.entrySet()) { Map> map = entry.getValue(); List list = map.get(taskManagerId); http://git-wip-us.apache.org/repos/asf/flink/blob/d9c669d4/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignmentTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignmentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignmentTest.java new file mode 100644 index 0000000..dca47d3 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignmentTest.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.instance; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmanager.scheduler.Locality; +import org.apache.flink.runtime.jobmanager.slots.SlotOwner; +import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.Collections; + +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.mockito.Mockito.mock; + +public class SlotSharingGroupAssignmentTest extends TestLogger { + + /** + * Tests that slots are allocated in a round robin fashion from the set of available resources. + */ + @Test + public void testRoundRobinPolling() throws UnknownHostException { + final SlotSharingGroupAssignment slotSharingGroupAssignment = new SlotSharingGroupAssignment(); + final int numberTaskManagers = 2; + final int numberSlots = 2; + final JobVertexID sourceId = new JobVertexID(); + final JobVertexID sinkId = new JobVertexID(); + final JobID jobId = new JobID(); + + for (int i = 0; i < numberTaskManagers; i++) { + final TaskManagerLocation taskManagerLocation = new TaskManagerLocation(ResourceID.generate(), InetAddress.getLocalHost(), i + 1000); + + for (int j = 0; j < numberSlots; j++) { + final SharedSlot slot = new SharedSlot( + jobId, + mock(SlotOwner.class), + taskManagerLocation, + j, + mock(TaskManagerGateway.class), + slotSharingGroupAssignment); + + slotSharingGroupAssignment.addSharedSlotAndAllocateSubSlot(slot, Locality.UNKNOWN, sourceId); + } + } + + SimpleSlot allocatedSlot1 = slotSharingGroupAssignment.getSlotForTask(sinkId, Collections.emptyList()); + SimpleSlot allocatedSlot2 = slotSharingGroupAssignment.getSlotForTask(sinkId, Collections.emptyList()); + + assertNotEquals(allocatedSlot1.getTaskManagerLocation(), allocatedSlot2.getTaskManagerLocation()); + + // let's check that we can still allocate all 4 slots + SimpleSlot allocatedSlot3 = slotSharingGroupAssignment.getSlotForTask(sinkId, Collections.emptyList()); + assertNotNull(allocatedSlot3); + + SimpleSlot allocatedSlot4 = slotSharingGroupAssignment.getSlotForTask(sinkId, Collections.emptyList()); + assertNotNull(allocatedSlot4); + } +}