Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id A879D200C8E for ; Wed, 3 May 2017 14:10:10 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id A73F6160BDB; Wed, 3 May 2017 12:10:10 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 9FAE1160BCD for ; Wed, 3 May 2017 14:10:09 +0200 (CEST) Received: (qmail 58639 invoked by uid 500); 3 May 2017 12:10:08 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 58070 invoked by uid 99); 3 May 2017 12:10:08 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 03 May 2017 12:10:08 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 2DD75E3823; Wed, 3 May 2017 12:10:08 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: fhueske@apache.org To: commits@flink.apache.org Date: Wed, 03 May 2017 12:10:17 -0000 Message-Id: In-Reply-To: <2de511ffe12d45ca85e8d3de85af107f@git.apache.org> References: <2de511ffe12d45ca85e8d3de85af107f@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [10/50] [abbrv] flink git commit: [FLINK-5810] [flip-6] Make slot registration static archived-at: Wed, 03 May 2017 12:10:10 -0000 [FLINK-5810] [flip-6] Make slot registration static Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d16a5a29 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d16a5a29 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d16a5a29 Branch: refs/heads/table-retraction Commit: d16a5a297008fc9665889dd56e24ff81472db2c6 Parents: 759f46e Author: Till Rohrmann Authored: Thu Apr 27 15:53:36 2017 +0200 Committer: Till Rohrmann Committed: Fri Apr 28 15:28:26 2017 +0200 ---------------------------------------------------------------------- .../slotmanager/SlotManager.java | 76 +++++++++++--------- .../slotmanager/TaskManagerRegistration.java | 22 +++--- .../flink/runtime/taskexecutor/SlotReport.java | 4 -- .../slotmanager/SlotManagerTest.java | 30 ++++---- 4 files changed, 70 insertions(+), 62 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/d16a5a29/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java index 31edbf3..f09b73a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java @@ -250,12 +250,33 @@ public class SlotManager implements AutoCloseable { checkInit(); // we identify task managers by their instance id - if (!taskManagerRegistrations.containsKey(taskExecutorConnection.getInstanceID())) { - TaskManagerRegistration taskManagerRegistration = new TaskManagerRegistration(taskExecutorConnection); + if (taskManagerRegistrations.containsKey(taskExecutorConnection.getInstanceID())) { + reportSlotStatus(taskExecutorConnection.getInstanceID(), initialSlotReport); + } else { + // first register the TaskManager + ArrayList reportedSlots = new ArrayList<>(); + + for (SlotStatus slotStatus : initialSlotReport) { + reportedSlots.add(slotStatus.getSlotID()); + } + + TaskManagerRegistration taskManagerRegistration = new TaskManagerRegistration(taskExecutorConnection, reportedSlots); taskManagerRegistrations.put(taskExecutorConnection.getInstanceID(), taskManagerRegistration); + + // next register the new slots + for (SlotStatus slotStatus : initialSlotReport) { + registerSlot( + slotStatus.getSlotID(), + slotStatus.getAllocationID(), + slotStatus.getResourceProfile(), + taskExecutorConnection); + } + + if (!anySlotUsed(taskManagerRegistration.getSlots())) { + registerTaskManagerTimeout(taskManagerRegistration); + } } - reportSlotStatus(taskExecutorConnection.getInstanceID(), initialSlotReport); } /** @@ -296,31 +317,22 @@ public class SlotManager implements AutoCloseable { TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(instanceId); if (null != taskManagerRegistration) { - ArrayList slotsToRemove = new ArrayList<>(taskManagerRegistration.getSlots()); boolean idle = true; for (SlotStatus slotStatus : slotReport) { - if (slotsToRemove.remove(slotStatus.getSlotID())) { - // slot which was already registered - updateSlot(slotStatus.getSlotID(), slotStatus.getAllocationID()); + + // We assume that the slots of a TaskManager don't change over its lifetime and they are registered + // once when the TaskManager is registered + if (taskManagerRegistration.containsSlot(slotStatus.getSlotID()) && updateSlot(slotStatus.getSlotID(), slotStatus.getAllocationID())) { + TaskManagerSlot slot = slots.get(slotStatus.getSlotID()); + idle &= slot.isFree(); } else { - // new slot - registerSlot( - taskManagerRegistration, - slotStatus.getSlotID(), - slotStatus.getAllocationID(), - slotStatus.getResourceProfile(), - taskManagerRegistration.getTaskManagerConnection()); + // sanity check to guarantee that slots of a TaskManager don't change + throw new IllegalStateException("Reported a slot status for slot " + slotStatus.getSlotID() + + " which has not been registered."); } - - TaskManagerSlot slot = slots.get(slotStatus.getSlotID()); - - idle &= slot.isFree(); } - // remove the slots for which we haven't received a slot status message - removeSlots(slotsToRemove); - if (idle) { // no slot of this task manager is being used --> register timer to free this resource registerTaskManagerTimeout(taskManagerRegistration); @@ -439,20 +451,21 @@ public class SlotManager implements AutoCloseable { * the given slot id. The given resource profile defines the available resources for the slot. * The task manager connection can be used to communicate with the task manager. * - * @param taskManagerRegistration Task manager for which to register the given slot * @param slotId identifying the slot on the task manager * @param allocationId which is currently deployed in the slot * @param resourceProfile of the slot * @param taskManagerConnection to communicate with the remote task manager */ private void registerSlot( - TaskManagerRegistration taskManagerRegistration, SlotID slotId, AllocationID allocationId, ResourceProfile resourceProfile, TaskExecutorConnection taskManagerConnection) { - Preconditions.checkNotNull(taskManagerRegistration); + if (slots.containsKey(slotId)) { + // remove the old slot first + removeSlot(slotId); + } TaskManagerSlot slot = new TaskManagerSlot( slotId, @@ -462,8 +475,6 @@ public class SlotManager implements AutoCloseable { slots.put(slotId, slot); - taskManagerRegistration.addSlot(slotId); - if (slot.isFree()) { handleFreeSlot(slot); } @@ -478,8 +489,9 @@ public class SlotManager implements AutoCloseable { * * @param slotId to update * @param allocationId specifying the current allocation of the slot + * @return True if the slot could be updated; otherwise false */ - private void updateSlot(SlotID slotId, AllocationID allocationId) { + private boolean updateSlot(SlotID slotId, AllocationID allocationId) { TaskManagerSlot slot = slots.get(slotId); if (null != slot) { @@ -516,8 +528,12 @@ public class SlotManager implements AutoCloseable { taskManagerRegistration.cancelTimeout(); } } + + return true; } else { LOG.debug("Trying to update unknown slot with slot id {}.", slotId); + + return false; } } @@ -670,12 +686,6 @@ public class SlotManager implements AutoCloseable { AllocationID oldAllocationId = slot.getAllocationId(); fulfilledSlotRequests.remove(oldAllocationId); - - TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(slot.getInstanceId()); - - if (null != taskManagerRegistration) { - taskManagerRegistration.removeSlot(slotId); - } } else { LOG.debug("There was no slot registered with slot id {}.", slotId); } http://git-wip-us.apache.org/repos/asf/flink/blob/d16a5a29/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerRegistration.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerRegistration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerRegistration.java index 8e23dbb..3a15cb3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerRegistration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerRegistration.java @@ -23,8 +23,8 @@ import org.apache.flink.runtime.instance.InstanceID; import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection; import org.apache.flink.util.Preconditions; +import java.util.Collection; import java.util.HashSet; -import java.util.Set; import java.util.UUID; import java.util.concurrent.ScheduledFuture; @@ -38,10 +38,14 @@ public class TaskManagerRegistration { private ScheduledFuture timeoutFuture; - public TaskManagerRegistration(TaskExecutorConnection taskManagerConnection) { - this.taskManagerConnection = Preconditions.checkNotNull(taskManagerConnection); + public TaskManagerRegistration( + TaskExecutorConnection taskManagerConnection, + Collection slots) { - slots = new HashSet<>(4); + this.taskManagerConnection = Preconditions.checkNotNull(taskManagerConnection, "taskManagerConnection"); + Preconditions.checkNotNull(slots, "slots"); + + this.slots = new HashSet<>(slots); timeoutIdentifier = null; timeoutFuture = null; @@ -59,16 +63,12 @@ public class TaskManagerRegistration { return timeoutIdentifier; } - public Set getSlots() { + public Iterable getSlots() { return slots; } - public boolean removeSlot(SlotID slotId) { - return slots.remove(slotId); - } - - public void addSlot(SlotID slotId) { - slots.add(slotId); + public boolean containsSlot(SlotID slotId) { + return slots.contains(slotId); } public void cancelTimeout() { http://git-wip-us.apache.org/repos/asf/flink/blob/d16a5a29/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotReport.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotReport.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotReport.java index 7a9da28..94ecaff 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotReport.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotReport.java @@ -49,10 +49,6 @@ public class SlotReport implements Serializable, Iterable { this.slotsStatus = checkNotNull(slotsStatus); } - public Collection getSlotsStatus() { - return slotsStatus; - } - @Override public Iterator iterator() { return slotsStatus.iterator(); http://git-wip-us.apache.org/repos/asf/flink/blob/d16a5a29/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java index b0b5d32..fff2829 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java @@ -587,8 +587,7 @@ public class SlotManagerTest extends TestLogger { /** * Tests that slots are updated with respect to the latest incoming slot report. This means that - * slot for which not report has been received will be removed and those for which a report was - * received are updated accordingly. + * slots for which a report was received are updated accordingly. */ @Test public void testUpdateSlotReport() throws Exception { @@ -601,7 +600,6 @@ public class SlotManagerTest extends TestLogger { final ResourceID resourceId = ResourceID.generate(); final SlotID slotId1 = new SlotID(resourceId, 0); final SlotID slotId2 = new SlotID(resourceId, 1); - final SlotID slotId3 = new SlotID(resourceId, 2); final ResourceProfile resourceProfile = new ResourceProfile(1.0, 1); @@ -609,10 +607,9 @@ public class SlotManagerTest extends TestLogger { final SlotStatus slotStatus2 = new SlotStatus(slotId2, resourceProfile); final SlotStatus newSlotStatus2 = new SlotStatus(slotId2, resourceProfile, jobId, allocationId); - final SlotStatus slotStatus3 = new SlotStatus(slotId3, resourceProfile); final SlotReport slotReport1 = new SlotReport(Arrays.asList(slotStatus1, slotStatus2)); - final SlotReport slotReport2 = new SlotReport(Arrays.asList(newSlotStatus2, slotStatus3)); + final SlotReport slotReport2 = new SlotReport(Arrays.asList(newSlotStatus2, slotStatus1)); final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class); final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway); @@ -623,20 +620,21 @@ public class SlotManagerTest extends TestLogger { slotManager.registerTaskManager(taskManagerConnection, slotReport1); - TaskManagerSlot slot = slotManager.getSlot(slotId2); + TaskManagerSlot slot1 = slotManager.getSlot(slotId1); + TaskManagerSlot slot2 = slotManager.getSlot(slotId2); assertTrue(2 == slotManager.getNumberRegisteredSlots()); - assertTrue(slot.isFree()); + assertTrue(slot1.isFree()); + assertTrue(slot2.isFree()); assertTrue(slotManager.reportSlotStatus(taskManagerConnection.getInstanceID(), slotReport2)); assertTrue(2 == slotManager.getNumberRegisteredSlots()); // the slot manager should have removed slotId1 - assertNull(slotManager.getSlot(slotId1)); - - assertNotNull(slotManager.getSlot(slotId3)); + assertNotNull(slotManager.getSlot(slotId1)); + assertNotNull(slotManager.getSlot(slotId2)); // slotId2 should have been allocated for allocationId assertEquals(allocationId, slotManager.getSlot(slotId2).getAllocationId()); @@ -840,7 +838,6 @@ public class SlotManagerTest extends TestLogger { final ResourceID resourceId = ResourceID.generate(); final SlotID slotId1 = new SlotID(resourceId, 0); final SlotID slotId2 = new SlotID(resourceId, 1); - final SlotID slotId3 = new SlotID(resourceId, 2); final SlotStatus slotStatus1 = new SlotStatus(slotId1, resourceProfile); final SlotStatus slotStatus2 = new SlotStatus(slotId2, resourceProfile); final SlotReport slotReport = new SlotReport(Arrays.asList(slotStatus1, slotStatus2)); @@ -870,11 +867,16 @@ public class SlotManagerTest extends TestLogger { eq(leaderId), any(Time.class)); + final SlotID requestedSlotdId = slotIdCaptor.getValue(); + final SlotID freeSlotId = requestedSlotdId.equals(slotId1) ? slotId2 : slotId1; + + assertTrue(slotManager.getSlot(freeSlotId).isFree()); + final SlotStatus newSlotStatus1 = new SlotStatus(slotIdCaptor.getValue(), resourceProfile, new JobID(), new AllocationID()); - final SlotStatus newSlotStatus2 = new SlotStatus(slotId3, resourceProfile); + final SlotStatus newSlotStatus2 = new SlotStatus(freeSlotId, resourceProfile); final SlotReport newSlotReport = new SlotReport(Arrays.asList(newSlotStatus1, newSlotStatus2)); - // this should remove the unused slot, replacing it with slotId3 and retry the pending slot request + // this should update the slot with the pending slot request triggering the reassignment of it slotManager.reportSlotStatus(taskManagerConnection.getInstanceID(), newSlotReport); ArgumentCaptor runnableArgumentCaptor = ArgumentCaptor.forClass(Runnable.class); @@ -899,7 +901,7 @@ public class SlotManagerTest extends TestLogger { final SlotID requestedSlotId = slotIdCaptor.getValue(); - assertEquals(slotId3, requestedSlotId); + assertEquals(slotId2, requestedSlotId); TaskManagerSlot slot = slotManager.getSlot(requestedSlotId);