flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject flink git commit: [FLINK-2040] [runtime] Tolerate out-of-order CANCELING and CANCELED messages.
Date Tue, 19 May 2015 09:19:44 GMT
Repository: flink
Updated Branches:
  refs/heads/master ea23f28c4 -> 8aad7affb


[FLINK-2040] [runtime] Tolerate out-of-order CANCELING and CANCELED messages.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8aad7aff
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8aad7aff
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8aad7aff

Branch: refs/heads/master
Commit: 8aad7affb9c4de0ba6b23c9bac721eaf855be15c
Parents: ea23f28
Author: Stephan Ewen <sewen@apache.org>
Authored: Tue May 19 10:33:19 2015 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Tue May 19 10:33:19 2015 +0200

----------------------------------------------------------------------
 .../apache/flink/runtime/taskmanager/Task.java  | 22 ++++-----
 .../flink/runtime/taskmanager/TaskTest.java     | 49 +++++++++++++++++---
 2 files changed, 54 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8aad7aff/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 8630edf..f3c40ba 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -745,9 +745,7 @@ public class Task implements Runnable {
 	 */
 	public void cancelExecution() {
 		LOG.info("Attempting to cancel task " + taskNameWithSubtask);
-		if (cancelOrFailAndCancelInvokable(ExecutionState.CANCELING)) {
-			notifyObservers(ExecutionState.CANCELING, null);
-		}
+		cancelOrFailAndCancelInvokable(ExecutionState.CANCELING, null);
 	}
 
 	/**
@@ -761,27 +759,27 @@ public class Task implements Runnable {
 	 */
 	public void failExternally(Throwable cause) {
 		LOG.info("Attempting to fail task externally " + taskNameWithSubtask);
-		if (cancelOrFailAndCancelInvokable(ExecutionState.FAILED)) {
-			failureCause = cause;
-			notifyObservers(ExecutionState.FAILED, cause);
-		}
+		cancelOrFailAndCancelInvokable(ExecutionState.FAILED, cause);
 	}
 
-	private boolean cancelOrFailAndCancelInvokable(ExecutionState targetState) {
+	private void cancelOrFailAndCancelInvokable(ExecutionState targetState, Throwable cause)
{
 		while (true) {
 			ExecutionState current = this.executionState;
 
 			// if the task is already canceled (or canceling) or finished or failed,
 			// then we need not do anything
 			if (current.isTerminal() || current == ExecutionState.CANCELING) {
-				return false;
+				LOG.info("Task " + taskNameWithSubtask + " is already in state " + current);
+				return;
 			}
 
 			if (current == ExecutionState.DEPLOYING || current == ExecutionState.CREATED) {
 				if (STATE_UPDATER.compareAndSet(this, current, targetState)) {
 					// if we manage this state transition, then the invokable gets never called
 					// we need not call cancel on it
-					return true;
+					this.failureCause = cause;
+					notifyObservers(targetState, cause);
+					return;
 				}
 			}
 			else if (current == ExecutionState.RUNNING) {
@@ -789,6 +787,8 @@ public class Task implements Runnable {
 					// we are canceling / failing out of the running state
 					// we need to cancel the invokable
 					if (invokable != null && invokableHasBeenCanceled.compareAndSet(false, true))
{
+						this.failureCause = cause;
+						notifyObservers(targetState, cause);
 						LOG.info("Triggering cancellation of task code {} ({}).", taskNameWithSubtask, executionId);
 
 						// because the canceling may block on user code, we cancel from a separate thread
@@ -799,7 +799,7 @@ public class Task implements Runnable {
 								"Canceler for " + taskNameWithSubtask);
 						cancelThread.start();
 					}
-					return true;
+					return;
 				}
 			}
 			else {

http://git-wip-us.apache.org/repos/asf/flink/blob/8aad7aff/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index 4492372..4713bae 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -365,8 +365,7 @@ public class TaskTest {
 			assertNull(task.getFailureCause());
 			
 			validateUnregisterTask(task.getExecutionId());
-			validateListenerMessage(ExecutionState.CANCELING, task, false);
-			validateListenerMessage(ExecutionState.CANCELED, task, false);
+			validateCancelingAndCanceledListenerMessage(task);
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -431,8 +430,7 @@ public class TaskTest {
 			validateUnregisterTask(task.getExecutionId());
 			
 			validateListenerMessage(ExecutionState.RUNNING, task, false);
-			validateListenerMessage(ExecutionState.CANCELING, task, false);
-			validateListenerMessage(ExecutionState.CANCELED, task, false);
+			validateCancelingAndCanceledListenerMessage(task);
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -553,8 +551,7 @@ public class TaskTest {
 			validateUnregisterTask(task.getExecutionId());
 
 			validateListenerMessage(ExecutionState.RUNNING, task, false);
-			validateListenerMessage(ExecutionState.CANCELING, task, false);
-			validateListenerMessage(ExecutionState.CANCELED, task, false);
+			validateCancelingAndCanceledListenerMessage(task);
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -721,6 +718,46 @@ public class TaskTest {
 			fail("interrupted");
 		}
 	}
+
+	private void validateCancelingAndCanceledListenerMessage(Task task) {
+		try {
+			// we may have to wait for a bit to give the actors time to receive the message
+			// and put it into the queue
+			TaskMessages.UpdateTaskExecutionState message1 =
+					(TaskMessages.UpdateTaskExecutionState) listenerMessages.poll(10, TimeUnit.SECONDS);
+			TaskMessages.UpdateTaskExecutionState message2 =
+					(TaskMessages.UpdateTaskExecutionState) listenerMessages.poll(10, TimeUnit.SECONDS);
+			
+			
+			assertNotNull("There is no additional listener message", message1);
+			assertNotNull("There is no additional listener message", message2);
+
+			TaskExecutionState taskState1 =  message1.taskExecutionState();
+			TaskExecutionState taskState2 =  message2.taskExecutionState();
+
+			assertEquals(task.getJobID(), taskState1.getJobID());
+			assertEquals(task.getJobID(), taskState2.getJobID());
+			assertEquals(task.getExecutionId(), taskState1.getID());
+			assertEquals(task.getExecutionId(), taskState2.getID());
+			
+			ExecutionState state1 = taskState1.getExecutionState();
+			ExecutionState state2 = taskState2.getExecutionState();
+			
+			// it may be (very rarely) that the following race happens:
+			//  - OUTSIDE THREAD: call to cancel()
+			//  - OUTSIDE THREAD: atomic state change from running to canceling
+			//  - TASK THREAD: finishes, atomic change from canceling to canceled
+			//  - TASK THREAD: send notification that state is canceled
+			//  - OUTSIDE THREAD: send notification that state is canceling
+			
+			// for that reason, we allow the notification messages in any order.
+			assertTrue( (state1 == ExecutionState.CANCELING && state2 == ExecutionState.CANCELED)
||
+						(state2 == ExecutionState.CANCELING && state1 == ExecutionState.CANCELED));
+		}
+		catch (InterruptedException e) {
+			fail("interrupted");
+		}
+	}
 	
 	// --------------------------------------------------------------------------------------------
 	//  Mock invokable code


Mime
View raw message