flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m..@apache.org
Subject flink git commit: [FLINK-2183][runtime] fix deadlock for concurrent slot release
Date Thu, 11 Jun 2015 14:56:47 GMT
Repository: flink
Updated Branches:
  refs/heads/release-0.9 87988ae3b -> 77def9f51


[FLINK-2183][runtime] fix deadlock for concurrent slot release


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/77def9f5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/77def9f5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/77def9f5

Branch: refs/heads/release-0.9
Commit: 77def9f518640ea10e0b4070541bea2b1f22ba5b
Parents: 87988ae
Author: Maximilian Michels <mxm@apache.org>
Authored: Thu Jun 11 13:27:23 2015 +0200
Committer: Maximilian Michels <mxm@apache.org>
Committed: Thu Jun 11 16:56:26 2015 +0200

----------------------------------------------------------------------
 .../flink/runtime/instance/SimpleSlot.java      | 15 ++--
 .../instance/SlotSharingGroupAssignment.java    | 76 ++++++++++----------
 .../TaskManagerFailsWithSlotSharingITCase.scala |  2 +-
 3 files changed, 48 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/77def9f5/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java
index 9bc977d..dbe961a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java
@@ -142,11 +142,9 @@ public class SimpleSlot extends Slot {
 
 	@Override
 	public void releaseSlot() {
-		
-		// try to transition to the CANCELED state. That state marks
-		// that the releasing is in progress
-		if (markCancelled()) {
-			
+
+		if (!isCanceled()) {
+
 			// kill all tasks currently running in this slot
 			Execution exec = this.executedTask;
 			if (exec != null && !exec.isFinished()) {
@@ -159,9 +157,10 @@ public class SimpleSlot extends Slot {
 			// otherwise release through the parent shared slot
 			if (getParent() == null) {
 				// we have to give back the slot to the owning instance
-				getInstance().returnAllocatedSlot(this);
-			}
-			else {
+				if (markCancelled()) {
+					getInstance().returnAllocatedSlot(this);
+				}
+			} else {
 				// we have to ask our parent to dispose us
 				getParent().releaseChild(this);
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/77def9f5/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
index f2b7dba..801e9ca 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
@@ -474,48 +474,52 @@ public class SlotSharingGroupAssignment {
 	 */
 	void releaseSimpleSlot(SimpleSlot simpleSlot) {
 		synchronized (lock) {
-			// sanity checks
-			if (simpleSlot.isAlive()) {
-				throw new IllegalStateException("slot is still alive");
-			}
-			
-			// check whether the slot is already released
-			if (simpleSlot.markReleased()) {
-				
-				AbstractID groupID = simpleSlot.getGroupID();
-				SharedSlot parent = simpleSlot.getParent();
+			// try to transition to the CANCELED state. That state marks
+			// that the releasing is in progress
+			if (simpleSlot.markCancelled()) {
 
-				// if we have a group ID, then our parent slot is tracked here
-				if (groupID != null && !allSlots.contains(parent)) {
-					throw new IllegalArgumentException("Slot was not associated with this SlotSharingGroup
before.");
+				// sanity checks
+				if (simpleSlot.isAlive()) {
+					throw new IllegalStateException("slot is still alive");
 				}
 
-				int parentRemaining = parent.removeDisposedChildSlot(simpleSlot);
-				
-				if (parentRemaining > 0) {
-					// the parent shared slot is still alive. make sure we make it
-					// available again to the group of the just released slot
-					
-					if (groupID != null) {
-						// if we have a group ID, then our parent becomes available
-						// for that group again. otherwise, the slot is part of a
-						// co-location group and nothing becomes immediately available
-						
-						Map<Instance, List<SharedSlot>> slotsForJid = availableSlotsPerJid.get(groupID);
+				// check whether the slot is already released
+				if (simpleSlot.markReleased()) {
 
-						// sanity check
-						if (slotsForJid == null) {
-							throw new IllegalStateException("Trying to return a slot for group " + groupID +
-									" when available slots indicated that all slots were available.");
-						}
+					AbstractID groupID = simpleSlot.getGroupID();
+					SharedSlot parent = simpleSlot.getParent();
 
-						putIntoMultiMap(slotsForJid, parent.getInstance(), parent);
+					// if we have a group ID, then our parent slot is tracked here
+					if (groupID != null && !allSlots.contains(parent)) {
+						throw new IllegalArgumentException("Slot was not associated with this SlotSharingGroup
before.");
+					}
+
+					int parentRemaining = parent.removeDisposedChildSlot(simpleSlot);
+
+					if (parentRemaining > 0) {
+						// the parent shared slot is still alive. make sure we make it
+						// available again to the group of the just released slot
+
+						if (groupID != null) {
+							// if we have a group ID, then our parent becomes available
+							// for that group again. otherwise, the slot is part of a
+							// co-location group and nothing becomes immediately available
+
+							Map<Instance, List<SharedSlot>> slotsForJid = availableSlotsPerJid.get(groupID);
+
+							// sanity check
+							if (slotsForJid == null) {
+								throw new IllegalStateException("Trying to return a slot for group " + groupID +
+										" when available slots indicated that all slots were available.");
+							}
+
+							putIntoMultiMap(slotsForJid, parent.getInstance(), parent);
+						}
+					} else {
+						// the parent shared slot is now empty and can be released
+						parent.markCancelled();
+						internalDisposeEmptySharedSlot(parent);
 					}
-				}
-				else {
-					// the parent shared slot is now empty and can be released
-					parent.markCancelled();
-					internalDisposeEmptySharedSlot(parent);
 				}
 			}
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/77def9f5/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala
index 39543f7..e98fd98 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala
@@ -44,7 +44,7 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll {
 
   "The JobManager" should {
     "handle gracefully failing task manager with slot sharing" in {
-      val num_tasks = 20
+      val num_tasks = 100
 
       val sender = new AbstractJobVertex("Sender")
       val receiver = new AbstractJobVertex("Receiver")


Mime
View raw message