flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [10/50] [abbrv] flink git commit: [FLINK-5810] [flip-6] Make slot registration static
Date Wed, 03 May 2017 12:10:17 GMT
[FLINK-5810] [flip-6] Make slot registration static


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d16a5a29
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d16a5a29
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d16a5a29

Branch: refs/heads/table-retraction
Commit: d16a5a297008fc9665889dd56e24ff81472db2c6
Parents: 759f46e
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Thu Apr 27 15:53:36 2017 +0200
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Fri Apr 28 15:28:26 2017 +0200

----------------------------------------------------------------------
 .../slotmanager/SlotManager.java                | 76 +++++++++++---------
 .../slotmanager/TaskManagerRegistration.java    | 22 +++---
 .../flink/runtime/taskexecutor/SlotReport.java  |  4 --
 .../slotmanager/SlotManagerTest.java            | 30 ++++----
 4 files changed, 70 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d16a5a29/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
index 31edbf3..f09b73a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
@@ -250,12 +250,33 @@ public class SlotManager implements AutoCloseable {
 		checkInit();
 
 		// we identify task managers by their instance id
-		if (!taskManagerRegistrations.containsKey(taskExecutorConnection.getInstanceID())) {
-			TaskManagerRegistration taskManagerRegistration = new TaskManagerRegistration(taskExecutorConnection);
+		if (taskManagerRegistrations.containsKey(taskExecutorConnection.getInstanceID())) {
+			reportSlotStatus(taskExecutorConnection.getInstanceID(), initialSlotReport);
+		} else {
+			// first register the TaskManager
+			ArrayList<SlotID> reportedSlots = new ArrayList<>();
+
+			for (SlotStatus slotStatus : initialSlotReport) {
+				reportedSlots.add(slotStatus.getSlotID());
+			}
+
+			TaskManagerRegistration taskManagerRegistration = new TaskManagerRegistration(taskExecutorConnection,
reportedSlots);
 			taskManagerRegistrations.put(taskExecutorConnection.getInstanceID(), taskManagerRegistration);
+
+			// next register the new slots
+			for (SlotStatus slotStatus : initialSlotReport) {
+				registerSlot(
+					slotStatus.getSlotID(),
+					slotStatus.getAllocationID(),
+					slotStatus.getResourceProfile(),
+					taskExecutorConnection);
+			}
+
+			if (!anySlotUsed(taskManagerRegistration.getSlots())) {
+				registerTaskManagerTimeout(taskManagerRegistration);
+			}
 		}
 
-		reportSlotStatus(taskExecutorConnection.getInstanceID(), initialSlotReport);
 	}
 
 	/**
@@ -296,31 +317,22 @@ public class SlotManager implements AutoCloseable {
 		TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(instanceId);
 
 		if (null != taskManagerRegistration) {
-			ArrayList<SlotID> slotsToRemove = new ArrayList<>(taskManagerRegistration.getSlots());
 			boolean idle = true;
 
 			for (SlotStatus slotStatus : slotReport) {
-				if (slotsToRemove.remove(slotStatus.getSlotID())) {
-					// slot which was already registered
-					updateSlot(slotStatus.getSlotID(), slotStatus.getAllocationID());
+
+				// We assume that the slots of a TaskManager don't change over its lifetime and they
are registered
+				// once when the TaskManager is registered
+				if (taskManagerRegistration.containsSlot(slotStatus.getSlotID()) && updateSlot(slotStatus.getSlotID(),
slotStatus.getAllocationID())) {
+					TaskManagerSlot slot = slots.get(slotStatus.getSlotID());
+					idle &= slot.isFree();
 				} else {
-					// new slot
-					registerSlot(
-						taskManagerRegistration,
-						slotStatus.getSlotID(),
-						slotStatus.getAllocationID(),
-						slotStatus.getResourceProfile(),
-						taskManagerRegistration.getTaskManagerConnection());
+					// sanity check to guarantee that slots of a TaskManager don't change
+					throw new IllegalStateException("Reported a slot status for slot " +  slotStatus.getSlotID()
+
+						" which has not been registered.");
 				}
-
-				TaskManagerSlot slot = slots.get(slotStatus.getSlotID());
-
-				idle &= slot.isFree();
 			}
 
-			// remove the slots for which we haven't received a slot status message
-			removeSlots(slotsToRemove);
-
 			if (idle) {
 				// no slot of this task manager is being used --> register timer to free this resource
 				registerTaskManagerTimeout(taskManagerRegistration);
@@ -439,20 +451,21 @@ public class SlotManager implements AutoCloseable {
 	 * the given slot id. The given resource profile defines the available resources for the
slot.
 	 * The task manager connection can be used to communicate with the task manager.
 	 *
-	 * @param taskManagerRegistration Task manager for which to register the given slot
 	 * @param slotId identifying the slot on the task manager
 	 * @param allocationId which is currently deployed in the slot
 	 * @param resourceProfile of the slot
 	 * @param taskManagerConnection to communicate with the remote task manager
 	 */
 	private void registerSlot(
-			TaskManagerRegistration taskManagerRegistration,
 			SlotID slotId,
 			AllocationID allocationId,
 			ResourceProfile resourceProfile,
 			TaskExecutorConnection taskManagerConnection) {
 
-		Preconditions.checkNotNull(taskManagerRegistration);
+		if (slots.containsKey(slotId)) {
+			// remove the old slot first
+			removeSlot(slotId);
+		}
 
 		TaskManagerSlot slot = new TaskManagerSlot(
 			slotId,
@@ -462,8 +475,6 @@ public class SlotManager implements AutoCloseable {
 
 		slots.put(slotId, slot);
 
-		taskManagerRegistration.addSlot(slotId);
-
 		if (slot.isFree()) {
 			handleFreeSlot(slot);
 		}
@@ -478,8 +489,9 @@ public class SlotManager implements AutoCloseable {
 	 *
 	 * @param slotId to update
 	 * @param allocationId specifying the current allocation of the slot
+	 * @return True if the slot could be updated; otherwise false
 	 */
-	private void updateSlot(SlotID slotId, AllocationID allocationId) {
+	private boolean updateSlot(SlotID slotId, AllocationID allocationId) {
 		TaskManagerSlot slot = slots.get(slotId);
 
 		if (null != slot) {
@@ -516,8 +528,12 @@ public class SlotManager implements AutoCloseable {
 					taskManagerRegistration.cancelTimeout();
 				}
 			}
+
+			return true;
 		} else {
 			LOG.debug("Trying to update unknown slot with slot id {}.", slotId);
+
+			return false;
 		}
 	}
 
@@ -670,12 +686,6 @@ public class SlotManager implements AutoCloseable {
 			AllocationID oldAllocationId = slot.getAllocationId();
 
 			fulfilledSlotRequests.remove(oldAllocationId);
-
-			TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(slot.getInstanceId());
-
-			if (null != taskManagerRegistration) {
-				taskManagerRegistration.removeSlot(slotId);
-			}
 		} else {
 			LOG.debug("There was no slot registered with slot id {}.", slotId);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/d16a5a29/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerRegistration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerRegistration.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerRegistration.java
index 8e23dbb..3a15cb3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerRegistration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerRegistration.java
@@ -23,8 +23,8 @@ import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
 import org.apache.flink.util.Preconditions;
 
+import java.util.Collection;
 import java.util.HashSet;
-import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ScheduledFuture;
 
@@ -38,10 +38,14 @@ public class TaskManagerRegistration {
 
 	private ScheduledFuture<?> timeoutFuture;
 
-	public TaskManagerRegistration(TaskExecutorConnection taskManagerConnection) {
-		this.taskManagerConnection = Preconditions.checkNotNull(taskManagerConnection);
+	public TaskManagerRegistration(
+		TaskExecutorConnection taskManagerConnection,
+		Collection<SlotID> slots) {
 
-		slots = new HashSet<>(4);
+		this.taskManagerConnection = Preconditions.checkNotNull(taskManagerConnection, "taskManagerConnection");
+		Preconditions.checkNotNull(slots, "slots");
+
+		this.slots = new HashSet<>(slots);
 
 		timeoutIdentifier = null;
 		timeoutFuture = null;
@@ -59,16 +63,12 @@ public class TaskManagerRegistration {
 		return timeoutIdentifier;
 	}
 
-	public Set<SlotID> getSlots() {
+	public Iterable<SlotID> getSlots() {
 		return slots;
 	}
 
-	public boolean removeSlot(SlotID slotId) {
-		return slots.remove(slotId);
-	}
-
-	public void addSlot(SlotID slotId) {
-		slots.add(slotId);
+	public boolean containsSlot(SlotID slotId) {
+		return slots.contains(slotId);
 	}
 
 	public void cancelTimeout() {

http://git-wip-us.apache.org/repos/asf/flink/blob/d16a5a29/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotReport.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotReport.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotReport.java
index 7a9da28..94ecaff 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotReport.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotReport.java
@@ -49,10 +49,6 @@ public class SlotReport implements Serializable, Iterable<SlotStatus>
{
 		this.slotsStatus = checkNotNull(slotsStatus);
 	}
 
-	public Collection<SlotStatus> getSlotsStatus() {
-		return slotsStatus;
-	}
-
 	@Override
 	public Iterator<SlotStatus> iterator() {
 		return slotsStatus.iterator();

http://git-wip-us.apache.org/repos/asf/flink/blob/d16a5a29/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
index b0b5d32..fff2829 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
@@ -587,8 +587,7 @@ public class SlotManagerTest extends TestLogger {
 
 	/**
 	 * Tests that slots are updated with respect to the latest incoming slot report. This means
that
-	 * slot for which not report has been received will be removed and those for which a report
was
-	 * received are updated accordingly.
+	 * slots for which a report was received are updated accordingly.
 	 */
 	@Test
 	public void testUpdateSlotReport() throws Exception {
@@ -601,7 +600,6 @@ public class SlotManagerTest extends TestLogger {
 		final ResourceID resourceId = ResourceID.generate();
 		final SlotID slotId1 = new SlotID(resourceId, 0);
 		final SlotID slotId2 = new SlotID(resourceId, 1);
-		final SlotID slotId3 = new SlotID(resourceId, 2);
 
 
 		final ResourceProfile resourceProfile = new ResourceProfile(1.0, 1);
@@ -609,10 +607,9 @@ public class SlotManagerTest extends TestLogger {
 		final SlotStatus slotStatus2 = new SlotStatus(slotId2, resourceProfile);
 
 		final SlotStatus newSlotStatus2 = new SlotStatus(slotId2, resourceProfile, jobId, allocationId);
-		final SlotStatus slotStatus3 = new SlotStatus(slotId3, resourceProfile);
 
 		final SlotReport slotReport1 = new SlotReport(Arrays.asList(slotStatus1, slotStatus2));
-		final SlotReport slotReport2 = new SlotReport(Arrays.asList(newSlotStatus2, slotStatus3));
+		final SlotReport slotReport2 = new SlotReport(Arrays.asList(newSlotStatus2, slotStatus1));
 
 		final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
 		final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
@@ -623,20 +620,21 @@ public class SlotManagerTest extends TestLogger {
 
 			slotManager.registerTaskManager(taskManagerConnection, slotReport1);
 
-			TaskManagerSlot slot = slotManager.getSlot(slotId2);
+			TaskManagerSlot slot1 = slotManager.getSlot(slotId1);
+			TaskManagerSlot slot2 = slotManager.getSlot(slotId2);
 
 			assertTrue(2 == slotManager.getNumberRegisteredSlots());
 
-			assertTrue(slot.isFree());
+			assertTrue(slot1.isFree());
+			assertTrue(slot2.isFree());
 
 			assertTrue(slotManager.reportSlotStatus(taskManagerConnection.getInstanceID(), slotReport2));
 
 			assertTrue(2 == slotManager.getNumberRegisteredSlots());
 
 			// the slot manager should have removed slotId1
-			assertNull(slotManager.getSlot(slotId1));
-
-			assertNotNull(slotManager.getSlot(slotId3));
+			assertNotNull(slotManager.getSlot(slotId1));
+			assertNotNull(slotManager.getSlot(slotId2));
 
 			// slotId2 should have been allocated for allocationId
 			assertEquals(allocationId, slotManager.getSlot(slotId2).getAllocationId());
@@ -840,7 +838,6 @@ public class SlotManagerTest extends TestLogger {
 		final ResourceID resourceId = ResourceID.generate();
 		final SlotID slotId1 = new SlotID(resourceId, 0);
 		final SlotID slotId2 = new SlotID(resourceId, 1);
-		final SlotID slotId3 = new SlotID(resourceId, 2);
 		final SlotStatus slotStatus1 = new SlotStatus(slotId1, resourceProfile);
 		final SlotStatus slotStatus2 = new SlotStatus(slotId2, resourceProfile);
 		final SlotReport slotReport = new SlotReport(Arrays.asList(slotStatus1, slotStatus2));
@@ -870,11 +867,16 @@ public class SlotManagerTest extends TestLogger {
 				eq(leaderId),
 				any(Time.class));
 
+			final SlotID requestedSlotdId = slotIdCaptor.getValue();
+			final SlotID freeSlotId = requestedSlotdId.equals(slotId1) ? slotId2 : slotId1;
+
+			assertTrue(slotManager.getSlot(freeSlotId).isFree());
+
 			final SlotStatus newSlotStatus1 = new SlotStatus(slotIdCaptor.getValue(), resourceProfile,
new JobID(), new AllocationID());
-			final SlotStatus newSlotStatus2 = new SlotStatus(slotId3, resourceProfile);
+			final SlotStatus newSlotStatus2 = new SlotStatus(freeSlotId, resourceProfile);
 			final SlotReport newSlotReport = new SlotReport(Arrays.asList(newSlotStatus1, newSlotStatus2));
 
-			// this should remove the unused slot, replacing it with slotId3 and retry the pending
slot request
+			// this should update the slot with the pending slot request triggering the reassignment
of it
 			slotManager.reportSlotStatus(taskManagerConnection.getInstanceID(), newSlotReport);
 
 			ArgumentCaptor<Runnable> runnableArgumentCaptor = ArgumentCaptor.forClass(Runnable.class);
@@ -899,7 +901,7 @@ public class SlotManagerTest extends TestLogger {
 
 			final SlotID requestedSlotId = slotIdCaptor.getValue();
 
-			assertEquals(slotId3, requestedSlotId);
+			assertEquals(slotId2, requestedSlotId);
 
 			TaskManagerSlot slot = slotManager.getSlot(requestedSlotId);
 


Mime
View raw message