Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 6B8BB17891 for ; Thu, 5 Feb 2015 12:25:43 +0000 (UTC) Received: (qmail 11928 invoked by uid 500); 5 Feb 2015 12:25:43 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 11879 invoked by uid 500); 5 Feb 2015 12:25:43 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 11728 invoked by uid 99); 5 Feb 2015 12:25:43 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 05 Feb 2015 12:25:43 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id D9EDAE05DF; Thu, 5 Feb 2015 12:25:42 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sewen@apache.org To: commits@flink.apache.org Date: Thu, 05 Feb 2015 12:25:48 -0000 Message-Id: <64a0acdb4d2244579c9b946812740504@git.apache.org> In-Reply-To: <6539ad141b994bfab0b2fd306d6e2239@git.apache.org> References: <6539ad141b994bfab0b2fd306d6e2239@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [7/7] flink git commit: [FLINK-1376] [runtime] Add proper shared slot release in case of a fatal TaskManager failure. [FLINK-1376] [runtime] Add proper shared slot release in case of a fatal TaskManager failure. Fixes concurrent modification exception of SharedSlot's subSlots field by synchronizing all state changing operations through the associated assignment group. Fixes deadlock where Instance.markDead first acquires InstanceLock and then by releasing the associated slots the assignment group lockcan block with a direct releaseSlot call on a SharedSlot which first acquires the assignment group lock and then the instance lock in order to return the slot to the instance. Fixes colocation shared slot releasing. A colocation constraint is now realized as a SharedSlot in a SharedSlot where the colocated tasks allocate sub slots. This cloes #317 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/db1b8b99 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/db1b8b99 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/db1b8b99 Branch: refs/heads/master Commit: db1b8b993c12f2e74b6cc9a48414265666dc0e69 Parents: 9d181a8 Author: Till Rohrmann Authored: Mon Jan 12 10:58:45 2015 +0100 Committer: Stephan Ewen Committed: Thu Feb 5 12:17:15 2015 +0100 ---------------------------------------------------------------------- .../flink/runtime/deployment/PartitionInfo.java | 8 +- .../flink/runtime/executiongraph/Execution.java | 22 +- .../runtime/executiongraph/ExecutionVertex.java | 10 +- .../flink/runtime/instance/AllocatedSlot.java | 192 ------- .../apache/flink/runtime/instance/Instance.java | 81 ++- .../flink/runtime/instance/SharedSlot.java | 154 ++++++ .../flink/runtime/instance/SimpleSlot.java | 124 +++++ .../org/apache/flink/runtime/instance/Slot.java | 191 +++++++ .../scheduler/CoLocationConstraint.java | 12 +- .../scheduler/NoResourceAvailableException.java | 17 +- .../runtime/jobmanager/scheduler/Scheduler.java | 227 +++++--- .../jobmanager/scheduler/SharedSlot.java | 114 ---- .../scheduler/SlotAllocationFuture.java | 12 +- .../scheduler/SlotAllocationFutureAction.java | 4 +- .../scheduler/SlotAvailabilityListener.java | 2 +- .../scheduler/SlotSharingGroupAssignment.java | 334 ++++++----- .../runtime/jobmanager/scheduler/SubSlot.java | 75 --- .../jobmanager/web/JobManagerInfoServlet.java | 553 ++++++++++++++++++ .../jobmanager/web/JobmanagerInfoServlet.java | 554 ------------------- .../runtime/jobmanager/web/JsonFactory.java | 4 +- .../runtime/jobmanager/web/WebInfoServer.java | 2 +- .../profiling/impl/JobProfilingData.java | 7 +- .../flink/runtime/jobmanager/JobManager.scala | 1 + .../ExecutionGraphDeploymentTest.java | 4 +- .../executiongraph/ExecutionGraphTestUtils.java | 4 +- .../ExecutionStateProgressTest.java | 4 +- .../ExecutionVertexCancelTest.java | 22 +- .../ExecutionVertexDeploymentTest.java | 18 +- .../ExecutionVertexSchedulingTest.java | 8 +- .../runtime/instance/AllocatedSlotTest.java | 26 +- .../flink/runtime/instance/InstanceTest.java | 22 +- .../ScheduleWithCoLocationHintTest.java | 136 ++--- .../scheduler/SchedulerIsolatedTasksTest.java | 44 +- .../scheduler/SchedulerSlotSharingTest.java | 210 +++---- .../jobmanager/scheduler/SharedSlotsTest.java | 73 +-- .../scheduler/SlotAllocationFutureTest.java | 24 +- .../runtime/jobmanager/JobManagerITCase.scala | 2 +- .../jobmanager/TaskManagerFailsITCase.scala | 39 +- .../TaskManagerFailsWithSlotSharingITCase.scala | 46 +- 39 files changed, 1881 insertions(+), 1501 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/db1b8b99/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartitionInfo.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartitionInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartitionInfo.java index cdaf289..dd2c063 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartitionInfo.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartitionInfo.java @@ -26,7 +26,7 @@ import org.apache.flink.runtime.executiongraph.Execution; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.ExecutionEdge; import org.apache.flink.runtime.executiongraph.IntermediateResultPartition; -import org.apache.flink.runtime.instance.AllocatedSlot; +import org.apache.flink.runtime.instance.SimpleSlot; import org.apache.flink.runtime.io.network.RemoteAddress; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; @@ -114,7 +114,7 @@ public class PartitionInfo implements IOReadableWritable, Serializable { // ------------------------------------------------------------------------ - public static PartitionInfo fromEdge(ExecutionEdge edge, AllocatedSlot consumerSlot) { + public static PartitionInfo fromEdge(ExecutionEdge edge, SimpleSlot consumerSlot) { IntermediateResultPartition partition = edge.getSource(); IntermediateResultPartitionID partitionId = partition.getPartitionId(); @@ -125,7 +125,7 @@ public class PartitionInfo implements IOReadableWritable, Serializable { RemoteAddress producerAddress = null; PartitionLocation producerLocation = PartitionLocation.UNKNOWN; - AllocatedSlot producerSlot = producer.getAssignedResource(); + SimpleSlot producerSlot = producer.getAssignedResource(); ExecutionState producerState = producer.getState(); // The producer needs to be running, otherwise the consumer might request a partition, @@ -145,7 +145,7 @@ public class PartitionInfo implements IOReadableWritable, Serializable { return new PartitionInfo(partitionId, producerExecutionId, producerLocation, producerAddress); } - public static PartitionInfo[] fromEdges(ExecutionEdge[] edges, AllocatedSlot consumerSlot) { + public static PartitionInfo[] fromEdges(ExecutionEdge[] edges, SimpleSlot consumerSlot) { // Every edge consumes a different result partition, which might be of // local, remote, or unknown location. PartitionInfo[] partitions = new PartitionInfo[edges.length]; http://git-wip-us.apache.org/repos/asf/flink/blob/db1b8b99/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java index a705231..e1a24c4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java @@ -101,8 +101,8 @@ public class Execution implements Serializable { private volatile ExecutionState state = CREATED; - - private volatile AllocatedSlot assignedResource; // once assigned, never changes + + private volatile SimpleSlot assignedResource; // once assigned, never changes private volatile Throwable failureCause; // once assigned, never changes @@ -141,7 +141,7 @@ public class Execution implements Serializable { return state; } - public AllocatedSlot getAssignedResource() { + public SimpleSlot getAssignedResource() { return assignedResource; } @@ -185,7 +185,7 @@ public class Execution implements Serializable { // sanity check if (locationConstraint != null && sharingGroup == null) { - throw new RuntimeException("Trying to schedule with co-location constraint but without slot sharing allowed."); + throw new RuntimeException("Trying to schedule with co-location constraint but without slot sharing not allowed."); } if (transitionState(CREATED, SCHEDULED)) { @@ -201,7 +201,7 @@ public class Execution implements Serializable { future.setFutureAction(new SlotAllocationFutureAction() { @Override - public void slotAllocated(AllocatedSlot slot) { + public void slotAllocated(SimpleSlot slot) { try { deployToSlot(slot); } @@ -216,7 +216,7 @@ public class Execution implements Serializable { }); } else { - AllocatedSlot slot = scheduler.scheduleImmediately(toSchedule); + SimpleSlot slot = scheduler.scheduleImmediately(toSchedule); try { deployToSlot(slot); } @@ -237,7 +237,7 @@ public class Execution implements Serializable { } } - public void deployToSlot(final AllocatedSlot slot) throws JobException { + public void deployToSlot(final SimpleSlot slot) throws JobException { // sanity checks if (slot == null) { throw new NullPointerException(); @@ -406,7 +406,7 @@ public class Execution implements Serializable { } } else if (consumerState == RUNNING) { - AllocatedSlot consumerSlot = consumerVertex.getCurrentAssignedResource(); + SimpleSlot consumerSlot = consumerVertex.getCurrentAssignedResource(); ExecutionAttemptID consumerExecutionId = consumerVertex.getCurrentExecutionAttempt().getAttemptId(); PartitionInfo partitionInfo = PartitionInfo.fromEdge(edge, consumerSlot); @@ -635,7 +635,7 @@ public class Execution implements Serializable { } private void sendCancelRpcCall() { - final AllocatedSlot slot = this.assignedResource; + final SimpleSlot slot = this.assignedResource; if (slot == null) { return; } @@ -662,7 +662,7 @@ public class Execution implements Serializable { } private void sendFailIntermediateResultPartitionsRPCCall() { - final AllocatedSlot slot = this.assignedResource; + final SimpleSlot slot = this.assignedResource; if (slot == null) { return; } @@ -680,7 +680,7 @@ public class Execution implements Serializable { } } - private boolean sendUpdateTaskRpcCall(final AllocatedSlot consumerSlot, final ExecutionAttemptID executionId, final IntermediateDataSetID resultId, final PartitionInfo partitionInfo) throws Exception { + private boolean sendUpdateTaskRpcCall(final SimpleSlot consumerSlot, final ExecutionAttemptID executionId, final IntermediateDataSetID resultId, final PartitionInfo partitionInfo) throws Exception { final Instance instance = consumerSlot.getInstance(); final TaskManagerMessages.TaskOperationResult result = AkkaUtils.ask( http://git-wip-us.apache.org/repos/asf/flink/blob/db1b8b99/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java index 8812569..d3993bb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.executiongraph; +import org.apache.flink.runtime.instance.SimpleSlot; import org.apache.flink.runtime.JobException; import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.deployment.PartitionConsumerDeploymentDescriptor; @@ -25,7 +26,6 @@ import org.apache.flink.runtime.deployment.PartitionDeploymentDescriptor; import org.apache.flink.runtime.deployment.PartitionInfo; import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; import org.apache.flink.runtime.execution.ExecutionState; -import org.apache.flink.runtime.instance.AllocatedSlot; import org.apache.flink.runtime.instance.Instance; import org.apache.flink.runtime.jobgraph.DistributionPattern; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; @@ -178,7 +178,7 @@ public class ExecutionVertex implements Serializable { return currentExecution.getFailureCause(); } - public AllocatedSlot getCurrentAssignedResource() { + public SimpleSlot getCurrentAssignedResource() { return currentExecution.getAssignedResource(); } @@ -304,7 +304,7 @@ public class ExecutionVertex implements Serializable { ExecutionEdge[] sources = inputEdges[i]; if (sources != null) { for (int k = 0; k < sources.length; k++) { - AllocatedSlot sourceSlot = sources[k].getSource().getProducer().getCurrentAssignedResource(); + SimpleSlot sourceSlot = sources[k].getSource().getProducer().getCurrentAssignedResource(); if (sourceSlot != null) { locations.add(sourceSlot.getInstance()); if (locations.size() > MAX_DISTINCT_LOCATIONS_TO_CONSIDER) { @@ -346,7 +346,7 @@ public class ExecutionVertex implements Serializable { return this.currentExecution.scheduleForExecution(scheduler, queued); } - public void deployToSlot(AllocatedSlot slot) throws JobException { + public void deployToSlot(SimpleSlot slot) throws JobException { this.currentExecution.deployToSlot(slot); } @@ -397,7 +397,7 @@ public class ExecutionVertex implements Serializable { getExecutionGraph().notifyExecutionChange(getJobvertexId(), subTaskIndex, executionId, newState, error); } - TaskDeploymentDescriptor createDeploymentDescriptor(ExecutionAttemptID executionId, AllocatedSlot slot) { + TaskDeploymentDescriptor createDeploymentDescriptor(ExecutionAttemptID executionId, SimpleSlot slot) { // Produced intermediate results List producedPartitions = new ArrayList(resultPartitions.length); http://git-wip-us.apache.org/repos/asf/flink/blob/db1b8b99/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java deleted file mode 100644 index f1481f3..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java +++ /dev/null @@ -1,192 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.instance; - -import java.io.Serializable; -import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; -import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; - -import org.apache.flink.runtime.executiongraph.Execution; -import org.apache.flink.runtime.jobgraph.JobID; -import org.apache.flink.runtime.jobmanager.scheduler.Locality; - -/** - * An allocated slot is the unit in which resources are allocated on instances. - */ -public class AllocatedSlot implements Serializable { - - static final long serialVersionUID = 42L; - - private static final AtomicIntegerFieldUpdater STATUS_UPDATER = - AtomicIntegerFieldUpdater.newUpdater(AllocatedSlot.class, "status"); - - private static final AtomicReferenceFieldUpdater VERTEX_UPDATER = - AtomicReferenceFieldUpdater.newUpdater(AllocatedSlot.class, Execution.class, "executedTask"); - - private static final int ALLOCATED_AND_ALIVE = 0; // tasks may be added and might be running - private static final int CANCELLED = 1; // no more tasks may run - private static final int RELEASED = 2; // has been given back to the instance - - - /** The ID of the job this slice belongs to. */ - private final JobID jobID; - - /** The instance on which the slot is allocated */ - private final Instance instance; - - /** The number of the slot on which the task is deployed */ - private final int slotNumber; - - /** Task being executed in the slot. Volatile to force a memory barrier and allow for correct double-checking */ - private volatile Execution executedTask; - - /** The state of the vertex, only atomically updated */ - private volatile int status = ALLOCATED_AND_ALIVE; - - private Locality locality = Locality.UNCONSTRAINED; - - - public AllocatedSlot(JobID jobID, Instance instance, int slotNumber) { - if (jobID == null || instance == null || slotNumber < 0) { - throw new IllegalArgumentException(); - } - - this.jobID = jobID; - this.instance = instance; - this.slotNumber = slotNumber; - } - - // -------------------------------------------------------------------------------------------- - - /** - * Returns the ID of the job this allocated slot belongs to. - * - * @return the ID of the job this allocated slot belongs to - */ - public JobID getJobID() { - return this.jobID; - } - - public Instance getInstance() { - return instance; - } - - public int getSlotNumber() { - return slotNumber; - } - - public Execution getExecutedVertex() { - return executedTask; - } - - public Locality getLocality() { - return locality; - } - - public void setLocality(Locality locality) { - this.locality = locality; - } - - public boolean setExecutedVertex(Execution executedVertex) { - if (executedVertex == null) { - throw new NullPointerException(); - } - - // check that we can actually run in this slot - if (status != ALLOCATED_AND_ALIVE) { - return false; - } - - // atomically assign the vertex - if (!VERTEX_UPDATER.compareAndSet(this, null, executedVertex)) { - return false; - } - - // we need to do a double check that we were not cancelled in the meantime - if (status != ALLOCATED_AND_ALIVE) { - this.executedTask = null; - return false; - } - - return true; - } - - // -------------------------------------------------------------------------------------------- - // Status and life cycle - // -------------------------------------------------------------------------------------------- - - public boolean isAlive() { - return status == ALLOCATED_AND_ALIVE; - } - - public boolean isCanceled() { - return status != ALLOCATED_AND_ALIVE; - } - - public boolean isReleased() { - return status == RELEASED; - } - - - public void cancel() { - if (STATUS_UPDATER.compareAndSet(this, ALLOCATED_AND_ALIVE, CANCELLED)) { - // kill all tasks currently running in this slot - Execution exec = this.executedTask; - if (exec != null && !exec.isFinished()) { - exec.fail(new Exception("The slot in which the task was scheduled has been killed (probably loss of TaskManager).")); - } - } - } - - public void releaseSlot() { - // cancel everything, if there is something. since this is atomically status based, - // it will not happen twice if another attempt happened before or concurrently - try { - cancel(); - } finally { - this.instance.returnAllocatedSlot(this); - } - } - - protected boolean markReleased() { - return STATUS_UPDATER.compareAndSet(this, CANCELLED, RELEASED); - } - - // -------------------------------------------------------------------------------------------- - // Utilities - // -------------------------------------------------------------------------------------------- - - @Override - public String toString() { - return instance.getId() + " (" + slotNumber + ") - " + getStateName(status); - } - - private static final String getStateName(int state) { - switch (state) { - case ALLOCATED_AND_ALIVE: - return "ALLOCATED/ALIVE"; - case CANCELLED: - return "CANCELLED"; - case RELEASED: - return "RELEASED"; - default: - return "(unknown)"; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/db1b8b99/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java index abbbc34..4f9dc7f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java @@ -27,8 +27,10 @@ import java.util.Queue; import java.util.Set; import akka.actor.ActorRef; +import org.apache.flink.runtime.AbstractID; import org.apache.flink.runtime.jobgraph.JobID; import org.apache.flink.runtime.jobmanager.scheduler.SlotAvailabilityListener; +import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroupAssignment; /** * An taskManager represents a resource a {@link org.apache.flink.runtime.taskmanager.TaskManager} runs on. @@ -59,7 +61,7 @@ public class Instance implements Serializable { private transient final Queue availableSlots; /** Allocated slots on this taskManager */ - private final Set allocatedSlots = new HashSet(); + private final Set allocatedSlots = new HashSet(); /** A listener to be notified upon new slot availability */ @@ -121,20 +123,27 @@ public class Instance implements Serializable { } public void markDead() { - if (isDead) { - return; - } - - isDead = true; - synchronized (instanceLock) { - + if (isDead) { + return; + } + + isDead = true; + // no more notifications for the slot releasing this.slotAvailabilityListener = null; - - for (AllocatedSlot slot : allocatedSlots) { - slot.releaseSlot(); - } + } + + /* + * releaseSlot must not own the instanceLock in order to avoid dead locks where a slot + * owning the assignment group lock wants to give itself back to the instance which requires + * the instance lock + */ + for (Slot slot : allocatedSlots) { + slot.releaseSlot(); + } + + synchronized (instanceLock) { allocatedSlots.clear(); availableSlots.clear(); } @@ -176,8 +185,12 @@ public class Instance implements Serializable { // -------------------------------------------------------------------------------------------- // Resource allocation // -------------------------------------------------------------------------------------------- + + public SimpleSlot allocateSimpleSlot(JobID jobID) throws InstanceDiedException { + return allocateSimpleSlot(jobID, jobID); + } - public AllocatedSlot allocateSlot(JobID jobID) throws InstanceDiedException { + public SimpleSlot allocateSimpleSlot(JobID jobID, AbstractID groupID) throws InstanceDiedException { if (jobID == null) { throw new IllegalArgumentException(); } @@ -191,15 +204,38 @@ public class Instance implements Serializable { if (nextSlot == null) { return null; } else { - AllocatedSlot slot = new AllocatedSlot(jobID, this, nextSlot); + SimpleSlot slot = new SimpleSlot(jobID, this, nextSlot, null, groupID); allocatedSlots.add(slot); return slot; } } } - - public boolean returnAllocatedSlot(AllocatedSlot slot) { + + public SharedSlot allocateSharedSlot(JobID jobID, SlotSharingGroupAssignment sharingGroupAssignment, AbstractID groupID) throws + InstanceDiedException { // the slot needs to be in the returned to taskManager state + if (jobID == null) { + throw new IllegalArgumentException(); + } + + synchronized (instanceLock) { + if (isDead) { + throw new InstanceDiedException(this); + } + + Integer nextSlot = availableSlots.poll(); + if (nextSlot == null) { + return null; + } else { + SharedSlot slot = new SharedSlot(jobID, this, nextSlot, + sharingGroupAssignment, null, groupID); + allocatedSlots.add(slot); + return slot; + } + } + } + + public boolean returnAllocatedSlot(Slot slot) { if (slot == null || slot.getInstance() != this) { throw new IllegalArgumentException("Slot is null or belongs to the wrong taskManager."); } @@ -231,14 +267,15 @@ public class Instance implements Serializable { } public void cancelAndReleaseAllSlots() { + List copy = null; + synchronized (instanceLock) { // we need to do this copy because of concurrent modification exceptions - List copy = new ArrayList(this.allocatedSlots); + copy = new ArrayList(this.allocatedSlots); + } - for (AllocatedSlot slot : copy) { - slot.releaseSlot(); - } - allocatedSlots.clear(); + for (Slot slot : copy) { + slot.releaseSlot(); } } @@ -293,6 +330,6 @@ public class Instance implements Serializable { @Override public String toString() { return instanceId + " @" + (taskManager != null ? taskManager.path() : "ActorRef.noSender") + " " + - numberOfSlots + " slots"; + numberOfSlots + " slots" + " - " + hashCode(); } } http://git-wip-us.apache.org/repos/asf/flink/blob/db1b8b99/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java new file mode 100644 index 0000000..2efcf6c --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.instance; + +import org.apache.flink.runtime.AbstractID; +import org.apache.flink.runtime.jobgraph.JobID; +import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroupAssignment; + +import java.util.HashSet; +import java.util.Set; + +/** + * This class represents a shared slot. A shared slot can have multiple + * {@link org.apache.flink.runtime.instance.SimpleSlot} instances within itself. This allows to + * schedule multiple tasks simultaneously, enabling Flink's streaming capabilities. + * + * IMPORTANT: This class contains no synchronization. Thus, the caller has to guarantee proper + * synchronization. In the current implementation, all concurrently modifying operations are + * passed through a {@link SlotSharingGroupAssignment} object which is responsible for + * synchronization. + * + */ +public class SharedSlot extends Slot { + + private final SlotSharingGroupAssignment assignmentGroup; + + private final Set subSlots; + + public SharedSlot(JobID jobID, Instance instance, int slotNumber, + SlotSharingGroupAssignment assignmentGroup, SharedSlot parent, + AbstractID groupID) { + super(jobID, instance, slotNumber, parent, groupID); + + this.assignmentGroup = assignmentGroup; + this.subSlots = new HashSet(); + } + + public Set getSubSlots() { + return subSlots; + } + + /** + * Removes the simple slot from the {@link org.apache.flink.runtime.instance.SharedSlot}. Should + * only be called through the + * {@link org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroupAssignment} attribute + * assignmnetGroup. + * + * @param slot slot to be removed from the set of sub slots. + * @return Number of remaining sub slots + */ + public int freeSubSlot(Slot slot){ + if(!subSlots.remove(slot)){ + throw new IllegalArgumentException("Wrong shared slot for sub slot."); + } + + return subSlots.size(); + } + + @Override + public int getNumberLeaves() { + int result = 0; + + for(Slot slot: subSlots){ + result += slot.getNumberLeaves(); + } + + return result; + } + + @Override + public void cancel() { + // Guarantee that the operation is only executed once + if (markCancelled()) { + assignmentGroup.releaseSharedSlot(this); + } + } + + /** + * Release this shared slot. In order to do this: + * + * 1. Cancel and release all sub slots atomically with respect to the assigned assignment group. + * 2. Set the state of the shared slot to be cancelled. + * 3. Dispose the shared slot (returning the slot to the instance). + * + * After cancelAndReleaseSubSlots, the shared slot is marked to be dead. This prevents further + * sub slot creation by the scheduler. + */ + @Override + public void releaseSlot() { + assignmentGroup.releaseSharedSlot(this); + } + + /** + * Creates a new sub slot if the slot is not dead, yet. This method should only be called from + * the assignment group instance to guarantee synchronization. + * + * @param jID id to identify tasks which can be deployed in this sub slot + * @return new sub slot if the shared slot is still alive, otherwise null + */ + public SimpleSlot allocateSubSlot(AbstractID jID){ + if(isDead()){ + return null; + } else { + SimpleSlot slot = new SimpleSlot(jobID, instance, subSlots.size(), this, jID); + subSlots.add(slot); + + return slot; + } + } + + public SharedSlot allocateSharedSlot(AbstractID jID){ + if(isDead()){ + return null; + } else { + SharedSlot slot = new SharedSlot(jobID, instance, subSlots.size(), assignmentGroup, this, jID); + subSlots.add(slot); + + return slot; + } + } + + /** + * Disposes the given sub slot. This + * is done by the means of the assignmentGroup in order to synchronize the method. If the + * disposed slot was the last sub slot, then the shared slot is marked to be cancelled and is + * disposed/returned to the owning instance. + * + * @param slot sub slot which shall be removed from the shared slot + */ + public void disposeChild(SimpleSlot slot){ + assignmentGroup.releaseSimpleSlot(slot); + } + + @Override + public String toString() { + return "Shared " + super.toString(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/db1b8b99/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java new file mode 100644 index 0000000..5b1af57 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.instance; + +import org.apache.flink.runtime.AbstractID; +import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.jobgraph.JobID; +import org.apache.flink.runtime.jobmanager.scheduler.Locality; + +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; + +/** + * Class which represents a single slot on a machine or within a shared slot. If this slot is part + * of a [[SharedSlot]], then its parent attribute is set to this instance. If not, then the parent + * attribute is null. + * + * IMPORTANT: This class has no synchronization. Thus it has to be synchronized by the calling + * object. + */ +public class SimpleSlot extends Slot { + + private static final AtomicReferenceFieldUpdater VERTEX_UPDATER = + AtomicReferenceFieldUpdater.newUpdater(SimpleSlot.class, Execution.class, "executedTask"); + + /** Task being executed in the slot. Volatile to force a memory barrier and allow for correct double-checking */ + private volatile Execution executedTask; + + private Locality locality = Locality.UNCONSTRAINED; + + public SimpleSlot(JobID jobID, Instance instance, int slotNumber, SharedSlot parent, AbstractID groupID){ + super(jobID, instance, slotNumber, parent, groupID); + } + + @Override + public int getNumberLeaves() { + return 1; + } + + + public Execution getExecution() { + return executedTask; + } + + public Locality getLocality() { + return locality; + } + + public void setLocality(Locality locality) { + this.locality = locality; + } + + public boolean setExecutedVertex(Execution executedVertex) { + if (executedVertex == null) { + throw new NullPointerException(); + } + + // check that we can actually run in this slot + if (status != ALLOCATED_AND_ALIVE) { + return false; + } + + // atomically assign the vertex + if (!VERTEX_UPDATER.compareAndSet(this, null, executedVertex)) { + return false; + } + + // we need to do a double check that we were not cancelled in the meantime + if (status != ALLOCATED_AND_ALIVE) { + this.executedTask = null; + return false; + } + + return true; + } + + @Override + public void cancel() { + if (markCancelled()) { + // kill all tasks currently running in this slot + Execution exec = this.executedTask; + if (exec != null && !exec.isFinished()) { + exec.fail(new Exception("The slot in which the task was scheduled has been killed (probably loss of TaskManager).")); + } + } + } + + @Override + public void releaseSlot() { + // cancel everything, if there is something. since this is atomically status based, + // it will not happen twice if another attempt happened before or concurrently + try { + cancel(); + } finally { + if (getParent() != null) { + // we have to ask our parent to dispose us + getParent().disposeChild(this); + } else { + // we have to give back the slot to the owning instance + instance.returnAllocatedSlot(this); + } + } + } + + @Override + public String toString() { + return "SimpleSlot " + super.toString(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/db1b8b99/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java new file mode 100644 index 0000000..fb62c4c --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.instance; + +import org.apache.flink.runtime.AbstractID; +import org.apache.flink.runtime.jobgraph.JobID; + +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; + +/** + * Base class for slots. + */ +public abstract class Slot { + protected static final AtomicIntegerFieldUpdater STATUS_UPDATER = + AtomicIntegerFieldUpdater.newUpdater(Slot.class, "status"); + + protected static final int ALLOCATED_AND_ALIVE = 0; // tasks may be added and might be running + protected static final int CANCELLED = 1; // no more tasks may run + protected static final int RELEASED = 2; // has been given back to the instance + + /** The ID of the job this slice belongs to. */ + protected final JobID jobID; + + /** The instance on which the slot is allocated */ + protected final Instance instance; + + /** The number of the slot on which the task is deployed */ + protected final int slotNumber; + + /** The state of the vertex, only atomically updated */ + protected volatile int status = ALLOCATED_AND_ALIVE; + + /** Indicates whether this slot was marked dead by the system */ + private boolean dead = false; + + private final AbstractID groupID; + + private final SharedSlot parent; + + private boolean disposed = false; + + + public Slot(JobID jobID, Instance instance, int slotNumber, SharedSlot parent, AbstractID groupID) { + if (jobID == null || instance == null || slotNumber < 0) { + throw new IllegalArgumentException(); + } + + this.jobID = jobID; + this.instance = instance; + this.slotNumber = slotNumber; + this.parent = parent; + this.groupID = groupID; + + } + // -------------------------------------------------------------------------------------------- + + /** + * Returns the ID of the job this allocated slot belongs to. + * + * @return the ID of the job this allocated slot belongs to + */ + public JobID getJobID() { + return this.jobID; + } + + public Instance getInstance() { + return instance; + } + + public int getSlotNumber() { + return slotNumber; + } + + public AbstractID getGroupID() { + return groupID; + } + + public SharedSlot getParent() { + return parent; + } + + public Slot getRoot() { + if(parent == null){ + return this; + } else { + return parent.getRoot(); + } + } + + public abstract int getNumberLeaves(); + + // -------------------------------------------------------------------------------------------- + // Status and life cycle + // -------------------------------------------------------------------------------------------- + + public boolean isAlive() { + return status == ALLOCATED_AND_ALIVE; + } + + public boolean isCanceled() { + return status != ALLOCATED_AND_ALIVE; + } + + public boolean isReleased() { + return status == RELEASED; + } + + public abstract void cancel(); + + public abstract void releaseSlot(); + + public boolean markReleased() { + return STATUS_UPDATER.compareAndSet(this, CANCELLED, RELEASED); + } + + public boolean markCancelled() { + return STATUS_UPDATER.compareAndSet(this, ALLOCATED_AND_ALIVE, CANCELLED); + } + + /** + * Marks this shared slot to be dead. Returns if the slot was alive before. Should only + * be called through the {@link org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroupAssignment} attribute assignmentGroup. + * + * @return if the slot was alive before + */ + public boolean markDead() { + boolean result = !dead; + + dead = true; + + return result; + } + + public boolean isDead() { + return dead; + } + + public boolean markDisposed() { + boolean result = !disposed; + + disposed = true; + + return result; + } + + public boolean isDisposed() { + return disposed; + } + + // -------------------------------------------------------------------------------------------- + // Utilities + // -------------------------------------------------------------------------------------------- + + @Override + public String toString() { + return hierarchy() + " - " + instance.getId() + " - " + getStateName(status); + } + + protected String hierarchy() { + return "(" + slotNumber + ")" + (getParent() != null ? getParent().hierarchy() : ""); + } + + private static final String getStateName(int state) { + switch (state) { + case ALLOCATED_AND_ALIVE: + return "ALLOCATED/ALIVE"; + case CANCELLED: + return "CANCELLED"; + case RELEASED: + return "RELEASED"; + default: + return "(unknown)"; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/db1b8b99/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java index 739ec09..8ef61b9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java @@ -22,6 +22,7 @@ import org.apache.flink.runtime.AbstractID; import org.apache.flink.runtime.instance.Instance; import com.google.common.base.Preconditions; +import org.apache.flink.runtime.instance.SharedSlot; import java.io.Serializable; @@ -46,7 +47,7 @@ public class CoLocationConstraint implements Serializable { public Instance getLocation() { if (sharedSlot != null) { - return sharedSlot.getAllocatedSlot().getInstance(); + return sharedSlot.getInstance(); } else { throw new IllegalStateException("Not assigned"); } @@ -56,7 +57,7 @@ public class CoLocationConstraint implements Serializable { if (this.sharedSlot == sharedSlot) { return; } - else if (this.sharedSlot == null || this.sharedSlot.isDisposed()) { + else if (this.sharedSlot == null || this.sharedSlot.isDead()) { this.sharedSlot = sharedSlot; } else { throw new IllegalStateException("Overriding shared slot that is still alive."); @@ -68,12 +69,17 @@ public class CoLocationConstraint implements Serializable { } public boolean isUnassignedOrDisposed() { - return this.sharedSlot == null || this.sharedSlot.isDisposed(); + return this.sharedSlot == null || this.sharedSlot.isDead(); } public AbstractID getGroupId() { return this.group.getId(); } + + @Override + public String toString() { + return "CoLocation constraint id " + getGroupId() + " shared slot " + sharedSlot; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/db1b8b99/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java index 730952b..93b4541 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java @@ -36,14 +36,19 @@ public class NoResourceAvailableException extends JobException { + ". You can decrease the operator parallelism or increase the number of slots per TaskManager in the configuration."); } - public NoResourceAvailableException(int numInstances, int numSlotsTotal) { - super(String.format("%s Resources available to scheduler: Number of instances=%d, total number of slots=%d", - BASE_MESSAGE, numInstances, numSlotsTotal)); + public NoResourceAvailableException(int numInstances, int numSlotsTotal, int availableSlots) { + super(String.format("%s Resources available to scheduler: Number of instances=%d, total number of slots=%d, available slots=%d", + BASE_MESSAGE, numInstances, numSlotsTotal, availableSlots)); } - NoResourceAvailableException(ScheduledUnit task, int numInstances, int numSlotsTotal) { - super(String.format("%s Task to schedule: < %s > in sharing group < %s >. Resources available to scheduler: Number of instances=%d, total number of slots=%d", - BASE_MESSAGE, task.getTaskToExecute(), task.getSlotSharingGroup(), numInstances, numSlotsTotal)); + NoResourceAvailableException(ScheduledUnit task, int numInstances, int numSlotsTotal, int availableSlots) { + super(String.format("%s Task to schedule: < %s > with groupID < %s > in sharing group < %s >. Resources available to scheduler: Number of instances=%d, total number of slots=%d, available slots=%d", + BASE_MESSAGE, task.getTaskToExecute(), + task.getLocationConstraint() == null ? task.getTaskToExecute().getVertex().getJobvertexId() : task.getLocationConstraint().getGroupId(), + task.getSlotSharingGroup(), + numInstances, + numSlotsTotal, + availableSlots)); } public NoResourceAvailableException(String message) { http://git-wip-us.apache.org/repos/asf/flink/blob/db1b8b99/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java index 6495aba..c237aa5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java @@ -29,11 +29,15 @@ import java.util.concurrent.Callable; import java.util.concurrent.LinkedBlockingQueue; import akka.dispatch.Futures; +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.commons.lang3.tuple.Pair; import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.AbstractID; +import org.apache.flink.runtime.instance.SharedSlot; +import org.apache.flink.runtime.instance.SimpleSlot; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.flink.runtime.executiongraph.ExecutionVertex; -import org.apache.flink.runtime.instance.AllocatedSlot; import org.apache.flink.runtime.instance.Instance; import org.apache.flink.runtime.instance.InstanceDiedException; import org.apache.flink.runtime.instance.InstanceListener; @@ -124,10 +128,10 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener { // Scheduling // -------------------------------------------------------------------------------------------- - public AllocatedSlot scheduleImmediately(ScheduledUnit task) throws NoResourceAvailableException { + public SimpleSlot scheduleImmediately(ScheduledUnit task) throws NoResourceAvailableException { Object ret = scheduleTask(task, false); - if (ret instanceof AllocatedSlot) { - return (AllocatedSlot) ret; + if (ret instanceof SimpleSlot) { + return (SimpleSlot) ret; } else { throw new RuntimeException(); @@ -136,8 +140,8 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener { public SlotAllocationFuture scheduleQueued(ScheduledUnit task) throws NoResourceAvailableException { Object ret = scheduleTask(task, true); - if (ret instanceof AllocatedSlot) { - return new SlotAllocationFuture((AllocatedSlot) ret); + if (ret instanceof SimpleSlot) { + return new SlotAllocationFuture((SimpleSlot) ret); } if (ret instanceof SlotAllocationFuture) { return (SlotAllocationFuture) ret; @@ -148,7 +152,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener { } /** - * Returns either an {@link AllocatedSlot}, or an {@link SlotAllocationFuture}. + * Returns either an {@link org.apache.flink.runtime.instance.SimpleSlot}, or an {@link SlotAllocationFuture}. */ private Object scheduleTask(ScheduledUnit task, boolean queueIfNoResource) throws NoResourceAvailableException { if (task == null) { @@ -158,7 +162,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener { if (LOG.isDebugEnabled()) { LOG.debug("Scheduling task " + task); } - + final ExecutionVertex vertex = task.getTaskToExecute().getVertex(); synchronized (globalLock) { @@ -176,15 +180,15 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener { final CoLocationConstraint constraint = task.getLocationConstraint(); // get a slot from the group, if the group has one for us (and can fulfill the constraint) - SubSlot slotFromGroup; + SimpleSlot slotFromGroup; if (constraint == null) { slotFromGroup = assignment.getSlotForTask(vertex); } else { slotFromGroup = assignment.getSlotForTask(vertex, constraint); } - - AllocatedSlot newSlot = null; + + SimpleSlot newSlot = null; // the following needs to make sure any allocated slot is released in case of an error try { @@ -202,15 +206,15 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener { vertex.getPreferredLocations() : Collections.singleton(constraint.getLocation()); // get a new slot, since we could not place it into the group, or we could not place it locally - newSlot = getFreeSlotForTask(vertex, locations); - - SubSlot toUse; + newSlot = getFreeSubSlotForTask(vertex, locations, assignment, constraint); + + SimpleSlot toUse; if (newSlot == null) { if (slotFromGroup == null) { // both null if (constraint == null || constraint.isUnassigned()) { - throw new NoResourceAvailableException(task, getNumberOfAvailableInstances(), getTotalNumberOfSlots()); + throw new NoResourceAvailableException(task, getNumberOfAvailableInstances(), getTotalNumberOfSlots(), getNumberOfAvailableSlots()); } else { throw new NoResourceAvailableException("Could not allocate a slot on instance " + constraint.getLocation() + ", as required by the co-location constraint."); @@ -226,11 +230,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener { slotFromGroup.releaseSlot(); } - if (constraint == null) { - toUse = assignment.addNewSlotWithTask(newSlot, vertex); - } else { - toUse = assignment.addNewSlotWithTask(newSlot, vertex, constraint); - } + toUse = newSlot; } else { // both are available and usable. neither is local @@ -242,7 +242,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener { // if it was assigned before and the new one is not local, it is a fail if (constraint != null) { if (constraint.isUnassigned() || toUse.getLocality() == Locality.LOCAL) { - constraint.setSharedSlot(toUse.getSharedSlot()); + constraint.setSharedSlot(toUse.getParent()); } else { // the fail throw new NoResourceAvailableException("Could not allocate a slot on instance " + @@ -270,7 +270,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener { // 2) === schedule without hints and sharing === - AllocatedSlot slot = getFreeSlotForTask(vertex, vertex.getPreferredLocations()); + SimpleSlot slot = getFreeSlotForTask(vertex, vertex.getPreferredLocations()); if (slot != null) { updateLocalityCounters(slot.getLocality()); return slot; @@ -283,7 +283,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener { return future; } else { - throw new NoResourceAvailableException(getNumberOfAvailableInstances(), getTotalNumberOfSlots()); + throw new NoResourceAvailableException(getNumberOfAvailableInstances(), getTotalNumberOfSlots(), getNumberOfAvailableSlots()); } } } @@ -297,69 +297,96 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener { * @param vertex The task to run. * @return The instance to run the vertex on, it {@code null}, if no instance is available. */ - protected AllocatedSlot getFreeSlotForTask(ExecutionVertex vertex, Iterable requestedLocations) { + protected SimpleSlot getFreeSlotForTask(ExecutionVertex vertex, Iterable requestedLocations) { // we need potentially to loop multiple times, because there may be false positives // in the set-with-available-instances while (true) { - if (this.instancesWithAvailableResources.isEmpty()) { - // check if the asynchronous calls did not yet return the queues - Instance queuedInstance = this.newlyAvailableInstances.poll(); - if (queuedInstance == null) { - return null; - } else { - this.instancesWithAvailableResources.add(queuedInstance); + Pair instanceLocalityPair = findInstance(requestedLocations); + + if(instanceLocalityPair == null){ + return null; + } + + Instance instanceToUse = instanceLocalityPair.getLeft(); + Locality locality = instanceLocalityPair.getRight(); + + if(LOG.isDebugEnabled()){ + if(locality == Locality.LOCAL){ + LOG.debug("Local assignment: " + vertex.getSimpleName() + " --> " + instanceToUse); + }else if(locality == Locality.NON_LOCAL){ + LOG.debug("Non-local assignment: " + vertex.getSimpleName() + " --> " + instanceToUse); + }else if(locality == Locality.UNCONSTRAINED) { + LOG.debug("Unconstrained assignment: " + vertex.getSimpleName() + " --> " + instanceToUse); } } - - Iterator locations = requestedLocations == null ? null : requestedLocations.iterator(); - - Instance instanceToUse = null; - Locality locality = Locality.UNCONSTRAINED; - - if (locations != null && locations.hasNext()) { - // we have a locality preference + + try { + SimpleSlot slot = instanceToUse.allocateSimpleSlot(vertex.getJobId(), vertex.getJobvertexId()); - while (locations.hasNext()) { - Instance location = locations.next(); - - if (location != null && this.instancesWithAvailableResources.remove(location)) { - instanceToUse = location; - locality = Locality.LOCAL; - - if (LOG.isDebugEnabled()) { - LOG.debug("Local assignment: " + vertex.getSimpleName() + " --> " + location); - } - - break; - } + // if the instance has further available slots, re-add it to the set of available resources. + if (instanceToUse.hasResourcesAvailable()) { + this.instancesWithAvailableResources.add(instanceToUse); } - if (instanceToUse == null) { - instanceToUse = this.instancesWithAvailableResources.poll(); - locality = Locality.NON_LOCAL; - if (LOG.isDebugEnabled()) { - LOG.debug("Non-local assignment: " + vertex.getSimpleName() + " --> " + instanceToUse); - } + if (slot != null) { + slot.setLocality(locality); + return slot; } } - else { - instanceToUse = this.instancesWithAvailableResources.poll(); - if (LOG.isDebugEnabled()) { + catch (InstanceDiedException e) { + // the instance died it has not yet been propagated to this scheduler + // remove the instance from the set of available instances + this.allInstances.remove(instanceToUse); + this.instancesWithAvailableResources.remove(instanceToUse); + } + + // if we failed to get a slot, fall through the loop + } + } + + protected SimpleSlot getFreeSubSlotForTask(ExecutionVertex vertex, + Iterable requestedLocations, + SlotSharingGroupAssignment groupAssignment, + CoLocationConstraint constraint) { + // we need potentially to loop multiple times, because there may be false positives + // in the set-with-available-instances + while (true) { + Pair instanceLocalityPair = findInstance(requestedLocations); + + if(instanceLocalityPair == null){ + return null; + } + + Instance instanceToUse = instanceLocalityPair.getLeft(); + Locality locality = instanceLocalityPair.getRight(); + + if(LOG.isDebugEnabled()){ + if(locality == Locality.LOCAL){ + LOG.debug("Local assignment: " + vertex.getSimpleName() + " --> " + instanceToUse); + }else if(locality == Locality.NON_LOCAL){ + LOG.debug("Non-local assignment: " + vertex.getSimpleName() + " --> " + instanceToUse); + }else if(locality == Locality.UNCONSTRAINED) { LOG.debug("Unconstrained assignment: " + vertex.getSimpleName() + " --> " + instanceToUse); } } - + try { - AllocatedSlot slot = instanceToUse.allocateSlot(vertex.getJobId()); - + AbstractID groupID = constraint == null ? vertex.getJobvertexId() : constraint.getGroupId(); + + // root SharedSlot + SharedSlot sharedSlot = instanceToUse.allocateSharedSlot(vertex.getJobId(), groupAssignment, groupID); + + // If constraint != null, then slot nested in a SharedSlot nested in sharedSlot + // If constraint == null, then slot nested in sharedSlot + SimpleSlot slot = groupAssignment.addSharedSlotAndAllocateSubSlot(sharedSlot, locality, groupID, constraint); + // if the instance has further available slots, re-add it to the set of available resources. if (instanceToUse.hasResourcesAvailable()) { this.instancesWithAvailableResources.add(instanceToUse); } - + if (slot != null) { - slot.setLocality(locality); return slot; } } @@ -369,10 +396,61 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener { this.allInstances.remove(instanceToUse); this.instancesWithAvailableResources.remove(instanceToUse); } - + // if we failed to get a slot, fall through the loop } } + + /** + * NOTE: This method is not thread-safe, it needs to be synchronized by the caller. + * + * Tries to find a requested instance. If no such instance is available it will return a non- + * local instance. If no such instance exists (all slots occupied), then return null. + * + * @param requestedLocations + * @return + */ + private Pair findInstance(Iterable requestedLocations){ + if (this.instancesWithAvailableResources.isEmpty()) { + // check if the asynchronous calls did not yet return the queues + Instance queuedInstance = this.newlyAvailableInstances.poll(); + if (queuedInstance == null) { + return null; + } else { + this.instancesWithAvailableResources.add(queuedInstance); + } + } + + Iterator locations = requestedLocations == null ? null : requestedLocations.iterator(); + + Instance instanceToUse = null; + Locality locality = Locality.UNCONSTRAINED; + + if (locations != null && locations.hasNext()) { + // we have a locality preference + + while (locations.hasNext()) { + Instance location = locations.next(); + + if (location != null && this.instancesWithAvailableResources.remove(location)) { + instanceToUse = location; + locality = Locality.LOCAL; + + break; + } + } + + if (instanceToUse == null) { + instanceToUse = this.instancesWithAvailableResources.poll(); + locality = Locality.NON_LOCAL; + } + } + else { + instanceToUse = this.instancesWithAvailableResources.poll(); + } + + return new ImmutablePair(instanceToUse, locality); + } @Override public void newSlotAvailable(final Instance instance) { @@ -386,7 +464,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener { // (2) scheduler (to check whether to take a new task item // // that leads with a high probability to deadlocks, when scheduling fast - + this.newlyAvailableInstances.add(instance); Futures.future(new Callable() { @@ -416,7 +494,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener { ExecutionVertex vertex = task.getTaskToExecute().getVertex(); try { - AllocatedSlot newSlot = instance.allocateSlot(vertex.getJobId()); + SimpleSlot newSlot = instance.allocateSimpleSlot(vertex.getJobId(), vertex.getJobvertexId()); if (newSlot != null) { // success, remove from the task queue and notify the future @@ -524,7 +602,16 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener { // -------------------------------------------------------------------------------------------- public int getNumberOfAvailableInstances() { - return allInstances.size(); + int numberAvailableInstances = 0; + synchronized (this.globalLock) { + for(Instance instance: allInstances){ + if(instance.isAlive()){ + numberAvailableInstances++; + } + } + } + + return numberAvailableInstances; } public int getNumberOfInstancesWithAvailableSlots() { http://git-wip-us.apache.org/repos/asf/flink/blob/db1b8b99/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SharedSlot.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SharedSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SharedSlot.java deleted file mode 100644 index 1ce8465..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SharedSlot.java +++ /dev/null @@ -1,114 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.jobmanager.scheduler; - -import java.io.Serializable; -import java.util.HashSet; -import java.util.Set; - -import org.apache.flink.runtime.instance.AllocatedSlot; -import org.apache.flink.runtime.jobgraph.JobVertexID; - -/** - * - * NOTE: This class does no synchronization by itself and its mutating - * methods may only be called from within the synchronization scope of - * it associated SlotSharingGroupAssignment. - */ -class SharedSlot implements Serializable { - - static final long serialVersionUID = 42L; - - private final AllocatedSlot allocatedSlot; - - private final SlotSharingGroupAssignment assignmentGroup; - - private final Set subSlots; - - private int subSlotNumber; - - private volatile boolean disposed; - - // -------------------------------------------------------------------------------------------- - - public SharedSlot(AllocatedSlot allocatedSlot, SlotSharingGroupAssignment assignmentGroup) { - if (allocatedSlot == null || assignmentGroup == null) { - throw new NullPointerException(); - } - - this.allocatedSlot = allocatedSlot; - this.assignmentGroup = assignmentGroup; - this.subSlots = new HashSet(); - } - - // -------------------------------------------------------------------------------------------- - - AllocatedSlot getAllocatedSlot() { - return this.allocatedSlot; - } - - boolean isDisposed() { - return disposed; - } - - int getNumberOfAllocatedSubSlots() { - return this.subSlots.size(); - } - - SubSlot allocateSubSlot(JobVertexID jid) { - if (disposed) { - return null; - } else { - SubSlot ss = new SubSlot(this, subSlotNumber++, jid); - this.subSlots.add(ss); - return ss; - } - } - - void returnAllocatedSlot(SubSlot slot) { - if (!slot.isReleased()) { - throw new IllegalArgumentException("SubSlot is not released."); - } - - this.assignmentGroup.releaseSubSlot(slot, this); - } - - int releaseSlot(SubSlot slot) { - if (!this.subSlots.remove(slot)) { - throw new IllegalArgumentException("Wrong shared slot for subslot."); - } - return subSlots.size(); - } - - void dispose() { - if (subSlots.isEmpty()) { - disposed = true; - this.allocatedSlot.releaseSlot(); - } else { - throw new IllegalStateException("Cannot dispose while subslots are still alive."); - } - } - - // -------------------------------------------------------------------------------------------- - - @Override - public String toString() { - return "Shared " + allocatedSlot.toString(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/db1b8b99/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFuture.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFuture.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFuture.java index eb5f9fb..31bd341 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFuture.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFuture.java @@ -18,13 +18,13 @@ package org.apache.flink.runtime.jobmanager.scheduler; -import org.apache.flink.runtime.instance.AllocatedSlot; +import org.apache.flink.runtime.instance.SimpleSlot; public class SlotAllocationFuture { private final Object monitor = new Object(); - private volatile AllocatedSlot slot; + private volatile SimpleSlot slot; private volatile SlotAllocationFutureAction action; @@ -32,17 +32,17 @@ public class SlotAllocationFuture { public SlotAllocationFuture() {} - public SlotAllocationFuture(AllocatedSlot slot) { + public SlotAllocationFuture(SimpleSlot slot) { this.slot = slot; } // -------------------------------------------------------------------------------------------- - public AllocatedSlot waitTillAllocated() throws InterruptedException { + public SimpleSlot waitTillAllocated() throws InterruptedException { return waitTillAllocated(0); } - public AllocatedSlot waitTillAllocated(long timeout) throws InterruptedException { + public SimpleSlot waitTillAllocated(long timeout) throws InterruptedException { synchronized (monitor) { while (slot == null) { monitor.wait(timeout); @@ -66,7 +66,7 @@ public class SlotAllocationFuture { } } - public void setSlot(AllocatedSlot slot) { + public void setSlot(SimpleSlot slot) { if (slot == null) { throw new NullPointerException(); } http://git-wip-us.apache.org/repos/asf/flink/blob/db1b8b99/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFutureAction.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFutureAction.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFutureAction.java index 11137fd..f9d032f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFutureAction.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFutureAction.java @@ -18,7 +18,7 @@ package org.apache.flink.runtime.jobmanager.scheduler; -import org.apache.flink.runtime.instance.AllocatedSlot; +import org.apache.flink.runtime.instance.SimpleSlot; /** * An action that is invoked once a {@link SlotAllocationFuture} is triggered. @@ -30,5 +30,5 @@ public interface SlotAllocationFutureAction { * * @param slot The slot that has been allocated. */ - void slotAllocated(AllocatedSlot slot); + void slotAllocated(SimpleSlot slot); } http://git-wip-us.apache.org/repos/asf/flink/blob/db1b8b99/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAvailabilityListener.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAvailabilityListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAvailabilityListener.java index 639d2b7..f75f294 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAvailabilityListener.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAvailabilityListener.java @@ -21,7 +21,7 @@ package org.apache.flink.runtime.jobmanager.scheduler; import org.apache.flink.runtime.instance.Instance; /** - * A SlotAvailabilityListener can be notified when new {@link org.apache.flink.runtime.instance.AllocatedSlot}s become available + * A SlotAvailabilityListener can be notified when new {@link org.apache.flink.runtime.instance.AllocatedSlot2}s become available * on an {@link org.apache.flink.runtime.instance.Instance}. */ public interface SlotAvailabilityListener { http://git-wip-us.apache.org/repos/asf/flink/blob/db1b8b99/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroupAssignment.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroupAssignment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroupAssignment.java index 7a0546f..70d4510 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroupAssignment.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroupAssignment.java @@ -33,8 +33,10 @@ import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; import org.apache.flink.runtime.AbstractID; import org.apache.flink.runtime.executiongraph.ExecutionVertex; -import org.apache.flink.runtime.instance.AllocatedSlot; import org.apache.flink.runtime.instance.Instance; +import org.apache.flink.runtime.instance.SharedSlot; +import org.apache.flink.runtime.instance.SimpleSlot; +import org.apache.flink.runtime.instance.Slot; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.slf4j.Logger; @@ -42,86 +44,82 @@ import org.slf4j.Logger; public class SlotSharingGroupAssignment implements Serializable { static final long serialVersionUID = 42L; - + private static final Logger LOG = Scheduler.LOG; - + private transient final Object lock = new Object(); - + /** All slots currently allocated to this sharing group */ private final Set allSlots = new LinkedHashSet(); - + /** The slots available per vertex type (jid), keyed by instance, to make them locatable */ private final Map>> availableSlotsPerJid = new LinkedHashMap>>(); - - + // -------------------------------------------------------------------------------------------- - - - public SubSlot addNewSlotWithTask(AllocatedSlot slot, ExecutionVertex vertex) { - JobVertexID id = vertex.getJobvertexId(); - return addNewSlotWithTask(slot, id, id); - } - - public SubSlot addNewSlotWithTask(AllocatedSlot slot, ExecutionVertex vertex, CoLocationConstraint constraint) { - AbstractID groupId = constraint.getGroupId(); - return addNewSlotWithTask(slot, groupId, null); - } - - private SubSlot addNewSlotWithTask(AllocatedSlot slot, AbstractID groupId, JobVertexID vertexId) { - - final SharedSlot sharedSlot = new SharedSlot(slot, this); - final Instance location = slot.getInstance(); - + + public SimpleSlot addSharedSlotAndAllocateSubSlot(SharedSlot sharedSlot, Locality locality, + AbstractID groupId, CoLocationConstraint constraint) { + + final Instance location = sharedSlot.getInstance(); + synchronized (lock) { // add to the total bookkeeping allSlots.add(sharedSlot); - - // allocate us a sub slot to return - SubSlot subslot = sharedSlot.allocateSubSlot(vertexId); - + + SimpleSlot subSlot = null; + + if(constraint == null){ + // allocate us a sub slot to return + subSlot = sharedSlot.allocateSubSlot(groupId); + } else { + // we need a colocation slot --> a SimpleSlot nested in a SharedSlot to host other colocated tasks + SharedSlot constraintGroupSlot = sharedSlot.allocateSharedSlot(groupId); + subSlot = constraintGroupSlot.allocateSubSlot(null); + } + // preserve the locality information - subslot.setLocality(slot.getLocality()); - + subSlot.setLocality(locality); + boolean entryForNewJidExists = false; - + // let the other vertex types know about this one as well for (Map.Entry>> entry : availableSlotsPerJid.entrySet()) { - + if (entry.getKey().equals(groupId)) { entryForNewJidExists = true; continue; } - + Map> available = entry.getValue(); putIntoMultiMap(available, location, sharedSlot); } - + // make sure an empty entry exists for this group, if no other entry exists if (!entryForNewJidExists) { availableSlotsPerJid.put(groupId, new LinkedHashMap>()); } - - return subslot; + + return subSlot; } } - + /** * Gets a slot suitable for the given task vertex. This method will prefer slots that are local * (with respect to {@link ExecutionVertex#getPreferredLocations()}), but will return non local * slots if no local slot is available. The method returns null, when no slot is available for the * given JobVertexID at all. - * + * * @param vertex - * + * * @return A task vertex for a task with the given JobVertexID, or null, if none is available. */ - public SubSlot getSlotForTask(ExecutionVertex vertex) { + public SimpleSlot getSlotForTask(ExecutionVertex vertex) { synchronized (lock) { Pair p = getSlotForTaskInternal(vertex.getJobvertexId(), vertex, vertex.getPreferredLocations(), false); - + if (p != null) { SharedSlot ss = p.getLeft(); - SubSlot slot = ss.allocateSubSlot(vertex.getJobvertexId()); + SimpleSlot slot = ss.allocateSubSlot(vertex.getJobvertexId()); slot.setLocality(p.getRight()); return slot; } @@ -129,17 +127,17 @@ public class SlotSharingGroupAssignment implements Serializable { return null; } } - + } - - public SubSlot getSlotForTask(ExecutionVertex vertex, CoLocationConstraint constraint) { - + + public SimpleSlot getSlotForTask(ExecutionVertex vertex, CoLocationConstraint constraint) { + synchronized (lock) { SharedSlot shared = constraint.getSharedSlot(); - - if (shared != null && !shared.isDisposed()) { + + if (shared != null && !shared.isDead()) { // initialized and set - SubSlot subslot = shared.allocateSubSlot(null); + SimpleSlot subslot = shared.allocateSubSlot(null); subslot.setLocality(Locality.LOCAL); return subslot; } @@ -147,85 +145,92 @@ public class SlotSharingGroupAssignment implements Serializable { // not initialized, grab a new slot. preferred locations are defined by the vertex // we only associate the slot with the constraint, if it was a local match Pair p = getSlotForTaskInternal(constraint.getGroupId(), vertex, vertex.getPreferredLocations(), false); + if (p == null) { return null; } else { shared = p.getLeft(); Locality l = p.getRight(); - - SubSlot sub = shared.allocateSubSlot(null); + + // we need a colocation slot --> SimpleSlot nested in a SharedSlot to host other colocated tasks + SharedSlot constraintGroupSlot = shared.allocateSharedSlot(constraint.getGroupId()); + // Depth=3 => groupID==null + SimpleSlot sub = constraintGroupSlot.allocateSubSlot(null); sub.setLocality(l); - + if (l != Locality.NON_LOCAL) { - constraint.setSharedSlot(shared); + constraint.setSharedSlot(constraintGroupSlot); } return sub; } } else { // disposed. get a new slot on the same instance - Instance location = shared.getAllocatedSlot().getInstance(); + Instance location = shared.getInstance(); Pair p = getSlotForTaskInternal(constraint.getGroupId(), vertex, Collections.singleton(location), true); + if (p == null) { return null; } else { shared = p.getLeft(); - constraint.setSharedSlot(shared); - SubSlot subslot = shared.allocateSubSlot(null); - subslot.setLocality(Locality.LOCAL); - return subslot; + // we need colocation slot --> SimpleSlot nested in a SharedSlot to host other colocated tasks + SharedSlot constraintGroupSlot = shared.allocateSharedSlot(constraint.getGroupId()); + constraint.setSharedSlot(constraintGroupSlot); + SimpleSlot subSlot = constraintGroupSlot.allocateSubSlot(null); + subSlot.setLocality(Locality.LOCAL); + return subSlot; } } } } - + /** * NOTE: This method is not synchronized by itself, needs to be synchronized externally. - * + * * @return An allocated sub slot, or {@code null}, if no slot is available. */ private Pair getSlotForTaskInternal(AbstractID groupId, ExecutionVertex vertex, Iterable preferredLocations, boolean localOnly) { + Map> slotsForGroup = availableSlotsPerJid.get(groupId); + if (allSlots.isEmpty()) { return null; } - - Map> slotsForGroup = availableSlotsPerJid.get(groupId); - + // get the available slots for the group if (slotsForGroup == null) { // no task is yet scheduled for that group, so all slots are available slotsForGroup = new LinkedHashMap>(); availableSlotsPerJid.put(groupId, slotsForGroup); - + for (SharedSlot availableSlot : allSlots) { - putIntoMultiMap(slotsForGroup, availableSlot.getAllocatedSlot().getInstance(), availableSlot); + putIntoMultiMap(slotsForGroup, availableSlot.getInstance(), availableSlot); } } else if (slotsForGroup.isEmpty()) { return null; } - + // check whether we can schedule the task to a preferred location boolean didNotGetPreferred = false; - + if (preferredLocations != null) { for (Instance location : preferredLocations) { - + // set the flag that we failed a preferred location. If one will be found, // we return early anyways and skip the flag evaluation didNotGetPreferred = true; - + SharedSlot slot = removeFromMultiMap(slotsForGroup, location); - if (slot != null && !slot.isDisposed()) { + if (slot != null && !slot.isDead()) { if (LOG.isDebugEnabled()) { LOG.debug("Local assignment in shared group : " + vertex + " --> " + slot); } - + return new ImmutablePair(slot, Locality.LOCAL); } } } - + // if we want only local assignments, exit now with a "not found" result if (didNotGetPreferred && localOnly) { if (LOG.isDebugEnabled()) { @@ -233,84 +238,153 @@ public class SlotSharingGroupAssignment implements Serializable { } return null; } - + // schedule the task to any available location SharedSlot slot = pollFromMultiMap(slotsForGroup); - if (slot != null && !slot.isDisposed()) { + if (slot != null && !slot.isDead()) { if (LOG.isDebugEnabled()) { LOG.debug((didNotGetPreferred ? "Non-local" : "Unconstrained") + " assignment in shared group : " + vertex + " --> " + slot); } - + return new ImmutablePair(slot, didNotGetPreferred ? Locality.NON_LOCAL : Locality.UNCONSTRAINED); } else { return null; } } - - - void releaseSubSlot(SubSlot subslot, SharedSlot sharedSlot) { - - AbstractID groupId = subslot.getGroupId(); - + + /** + * Removes the shared slot from the assignment group. + * + * @param sharedSlot + */ + private void removeSharedSlot(SharedSlot sharedSlot){ + if (!allSlots.contains(sharedSlot)) { + throw new IllegalArgumentException("Slot was not associated with this SlotSharingGroup before."); + } + + allSlots.remove(sharedSlot); + + Instance location = sharedSlot.getInstance(); + + for(Map.Entry>> mapEntry: availableSlotsPerJid.entrySet()){ + Map> map = mapEntry.getValue(); + + List list = map.get(location); + + if(list == null || !list.remove(sharedSlot)){ + throw new IllegalStateException("Bug: SharedSlot was not available to another vertex type that it was not allocated for before."); + } + + if(list.isEmpty()){ + map.remove(location); + } + } + + sharedSlot.markCancelled(); + + returnAllocatedSlot(sharedSlot); + } + + /** + * Releases the shared slot from the assignment group. + * @param sharedSlot The SharedSlot to be released + */ + public void releaseSharedSlot(SharedSlot sharedSlot){ synchronized (lock) { + Set subSlots = sharedSlot.getSubSlots(); - if (!allSlots.contains(sharedSlot)) { - throw new IllegalArgumentException("Slot was not associated with this SlotSharingGroup before."); + for(Slot subSlot: subSlots) { + + subSlot.markDisposed(); + + if(subSlot instanceof SharedSlot){ + releaseSharedSlot((SharedSlot) subSlot); + }else if(subSlot instanceof SimpleSlot){ + releaseSimpleSlot((SimpleSlot) subSlot); + } } - - int slotsRemaining = sharedSlot.releaseSlot(subslot); - - if (slotsRemaining == 0) { - // this was the last sub slot. remove this from the availability list - // and trigger disposal - try { - allSlots.remove(sharedSlot); - - Instance location = sharedSlot.getAllocatedSlot().getInstance(); - - if (groupId != null) { - for (Map.Entry>> mapEntry : availableSlotsPerJid.entrySet()) { - AbstractID id = mapEntry.getKey(); - - // hack: we identify co location hint entries by the fact that they are keyed - // by an abstract id, rather than a job vertex id - if (id.getClass() == AbstractID.class || id.equals(groupId)) { - continue; - } - - Map> map = mapEntry.getValue(); - List list = map.get(location); - if (list == null || !list.remove(sharedSlot)) { - throw new IllegalStateException("Bug: SharedSlot was not available to another vertex type that it was not allocated for before."); - } - if (list.isEmpty()) { - map.remove(location); - } - } + + subSlots.clear(); + + returnSlot(sharedSlot); + } + } + + /** + * Releases the simple slot from the assignment group. + * @param simpleSlot The SimpleSlot to be released + */ + public void releaseSimpleSlot(SimpleSlot simpleSlot){ + synchronized (lock) { + simpleSlot.cancel(); + + returnSlot(simpleSlot); + } + + } + + /** + * Removes the given slot from the assignment group. If the slot is a root object, then it has + * to be a SharedSlot and it is removed from the availableSlotsPerJid field and the slot is + * returned to the instance. If the slot is a sub slot of the root slot, then this sub slot + * is marked available again for tasks of the same group. Otherwise, the slot is simply removed + * from its parent if it is not already marked as disposed. If a slot is already marked to be + * disposed, then the releasing was called from a parent slot which will take care of the + * disposal. + * + * IMPORTANT: The method is not synchronized. The caller is responsible for that. + * + * @param slot The slot to be returned. + */ + private void returnSlot(Slot slot){ + // each slot can only be returned once, if a slot is returned then it should no longer be used --> markDead + if(slot.markDead()) { + // slot is a root slot + if(slot.getParent() == null){ + // only SharedSlots are allowed to be root slots in a SlotSharingGroupAssignment + if(slot instanceof SharedSlot){ + removeSharedSlot((SharedSlot) slot); + } else { + throw new IllegalStateException("Simple slot cannot be returned from SlotSharingGroupAssignment."); + } + } else { + AbstractID groupID = slot.getGroupID(); + SharedSlot parent = slot.getParent(); + + // Only colocation constraint slots (SimpleSlot nested in a SharedSlot nested in a SharedSlot) have a groupID==null + // One can also say, all nested slots deeper than 2 have a groupID==null + if(groupID != null){ + if (!allSlots.contains(parent)) { + throw new IllegalArgumentException("Slot was not associated with this SlotSharingGroup before."); + } + + // make the shared slot available to tasks within the group it available to + Map> slotsForJid = availableSlotsPerJid.get(groupID); + + // sanity check + if (slotsForJid == null) { + throw new IllegalStateException("Trying to return a slot for group " + groupID + + " when available slots indicated that all slots were available."); } - } finally { - sharedSlot.dispose(); + + putIntoMultiMap(slotsForJid, parent.getInstance(), parent); } - } - else if (groupId != null) { - // make the shared slot available to tasks within the group it available to - Map> slotsForJid = availableSlotsPerJid.get(groupId); - - // sanity check - if (slotsForJid == null) { - throw new IllegalStateException("Trying to return a slot for group " + groupId + - " when available slots indicated that all slots were available."); + + // if no one else takes care of disposal, then remove the slot from the parent + if(slot.markDisposed()) { + if (slot.getParent().freeSubSlot(slot) == 0) { + releaseSharedSlot(slot.getParent()); + } } - - putIntoMultiMap(slotsForJid, sharedSlot.getAllocatedSlot().getInstance(), sharedSlot); } } } - - - - + + private void returnAllocatedSlot(SharedSlot slot){ + slot.getInstance().returnAllocatedSlot(slot); + } + // -------------------------------------------------------------------------------------------- // State // --------------------------------------------------------------------------------------------