flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [1/2] flink git commit: [FLINK-7851] [scheduling] Improve scheduling balance by round robin distribution
Date Sat, 02 Dec 2017 16:04:56 GMT
Repository: flink
Updated Branches:
  refs/heads/master ee9027e49 -> 49f690986


[FLINK-7851] [scheduling] Improve scheduling balance by round robin distribution

Make sure that the value maps are of type LinkedHashMap in SlotSharingGroupAssignment#availableSlotsPerJid

This closes #4839.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d9c669d4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d9c669d4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d9c669d4

Branch: refs/heads/master
Commit: d9c669d4781f095806013651c1a579eae0ca2650
Parents: ee9027e
Author: Till <till.rohrmann@gmail.com>
Authored: Mon Oct 16 16:18:23 2017 +0200
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Sat Dec 2 16:39:16 2017 +0100

----------------------------------------------------------------------
 .../instance/SlotSharingGroupAssignment.java    | 46 ++++++-----
 .../SlotSharingGroupAssignmentTest.java         | 82 ++++++++++++++++++++
 2 files changed, 108 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d9c669d4/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
index 7618b18..4371290 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
@@ -96,7 +96,7 @@ public class SlotSharingGroupAssignment {
 	private final Set<SharedSlot> allSlots = new LinkedHashSet<SharedSlot>();
 
 	/** The slots available per vertex type (JobVertexId), keyed by TaskManager, to make them
locatable */
-	private final Map<AbstractID, Map<ResourceID, List<SharedSlot>>> availableSlotsPerJid
= new LinkedHashMap<>();
+	private final Map<AbstractID, LinkedHashMap<ResourceID, List<SharedSlot>>>
availableSlotsPerJid = new LinkedHashMap<>();
 
 
 	// --------------------------------------------------------------------------------------------
@@ -233,7 +233,7 @@ public class SlotSharingGroupAssignment {
 				// can place a task into this slot.
 				boolean entryForNewJidExists = false;
 				
-				for (Map.Entry<AbstractID, Map<ResourceID, List<SharedSlot>>> entry
: availableSlotsPerJid.entrySet()) {
+				for (Map.Entry<AbstractID, LinkedHashMap<ResourceID, List<SharedSlot>>>
entry : availableSlotsPerJid.entrySet()) {
 					// there is already an entry for this groupID
 					if (entry.getKey().equals(groupIdForMap)) {
 						entryForNewJidExists = true;
@@ -246,7 +246,7 @@ public class SlotSharingGroupAssignment {
 
 				// make sure an empty entry exists for this group, if no other entry exists
 				if (!entryForNewJidExists) {
-					availableSlotsPerJid.put(groupIdForMap, new LinkedHashMap<ResourceID, List<SharedSlot>>());
+					availableSlotsPerJid.put(groupIdForMap, new LinkedHashMap<>());
 				}
 
 				return subSlot;
@@ -391,7 +391,7 @@ public class SlotSharingGroupAssignment {
 		}
 
 		// get the available slots for the group
-		Map<ResourceID, List<SharedSlot>> slotsForGroup = availableSlotsPerJid.get(groupId);
+		LinkedHashMap<ResourceID, List<SharedSlot>> slotsForGroup = availableSlotsPerJid.get(groupId);
 		
 		if (slotsForGroup == null) {
 			// we have a new group, so all slots are available
@@ -621,20 +621,26 @@ public class SlotSharingGroupAssignment {
 	
 	private static SharedSlot pollFromMultiMap(Map<ResourceID, List<SharedSlot>>
map) {
 		Iterator<Map.Entry<ResourceID, List<SharedSlot>>> iter = map.entrySet().iterator();
-		
+
 		while (iter.hasNext()) {
-			List<SharedSlot> slots = iter.next().getValue();
-			
-			if (slots.isEmpty()) {
-				iter.remove();
-			}
-			else if (slots.size() == 1) {
-				SharedSlot slot = slots.remove(0);
-				iter.remove();
-				return slot;
-			}
-			else {
-				return slots.remove(slots.size() - 1);
+			Map.Entry<ResourceID, List<SharedSlot>> slotEntry = iter.next();
+
+			// remove first entry to add it at the back if there are still slots left
+			iter.remove();
+
+			List<SharedSlot> slots = slotEntry.getValue();
+
+			if (!slots.isEmpty()) {
+
+				SharedSlot result = slots.remove(slots.size() - 1);
+
+				if (!slots.isEmpty()) {
+					// reinserts the entry; since it is a LinkedHashMap, we will iterate over this entry
+					// only after having polled from all other entries
+					map.put(slotEntry.getKey(), slots);
+				}
+
+				return result;
 			}
 		}
 		
@@ -642,11 +648,11 @@ public class SlotSharingGroupAssignment {
 	}
 	
 	private static void removeSlotFromAllEntries(
-			Map<AbstractID, Map<ResourceID, List<SharedSlot>>> availableSlots, SharedSlot
slot)
-	{
+			Map<AbstractID, LinkedHashMap<ResourceID, List<SharedSlot>>> availableSlots,
+			SharedSlot slot) {
 		final ResourceID taskManagerId = slot.getTaskManagerID();
 		
-		for (Map.Entry<AbstractID, Map<ResourceID, List<SharedSlot>>> entry :
availableSlots.entrySet()) {
+		for (Map.Entry<AbstractID, LinkedHashMap<ResourceID, List<SharedSlot>>>
entry : availableSlots.entrySet()) {
 			Map<ResourceID, List<SharedSlot>> map = entry.getValue();
 
 			List<SharedSlot> list = map.get(taskManagerId);

http://git-wip-us.apache.org/repos/asf/flink/blob/d9c669d4/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignmentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignmentTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignmentTest.java
new file mode 100644
index 0000000..dca47d3
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignmentTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.instance;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Collections;
+
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Mockito.mock;
+
+public class SlotSharingGroupAssignmentTest extends TestLogger {
+
+	/**
+	 * Tests that slots are allocated in a round robin fashion from the set of available resources.
+	 */
+	@Test
+	public void testRoundRobinPolling() throws UnknownHostException {
+		final SlotSharingGroupAssignment slotSharingGroupAssignment = new SlotSharingGroupAssignment();
+		final int numberTaskManagers = 2;
+		final int numberSlots = 2;
+		final JobVertexID sourceId = new JobVertexID();
+		final JobVertexID sinkId = new JobVertexID();
+		final JobID jobId = new JobID();
+
+		for (int i = 0; i < numberTaskManagers; i++) {
+			final TaskManagerLocation taskManagerLocation = new TaskManagerLocation(ResourceID.generate(),
InetAddress.getLocalHost(), i + 1000);
+
+			for (int j = 0; j < numberSlots; j++) {
+				final SharedSlot slot = new SharedSlot(
+					jobId,
+					mock(SlotOwner.class),
+					taskManagerLocation,
+					j,
+					mock(TaskManagerGateway.class),
+					slotSharingGroupAssignment);
+
+				slotSharingGroupAssignment.addSharedSlotAndAllocateSubSlot(slot, Locality.UNKNOWN, sourceId);
+			}
+		}
+
+		SimpleSlot allocatedSlot1 = slotSharingGroupAssignment.getSlotForTask(sinkId, Collections.emptyList());
+		SimpleSlot allocatedSlot2 = slotSharingGroupAssignment.getSlotForTask(sinkId, Collections.emptyList());
+
+		assertNotEquals(allocatedSlot1.getTaskManagerLocation(), allocatedSlot2.getTaskManagerLocation());
+
+		// let's check that we can still allocate all 4 slots
+		SimpleSlot allocatedSlot3 = slotSharingGroupAssignment.getSlotForTask(sinkId, Collections.emptyList());
+		assertNotNull(allocatedSlot3);
+
+		SimpleSlot allocatedSlot4 = slotSharingGroupAssignment.getSlotForTask(sinkId, Collections.emptyList());
+		assertNotNull(allocatedSlot4);
+	}
+}


Mime
View raw message