flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject flink git commit: [hotfix] Fix interaction of Async calls/checkpointing/canceling
Date Wed, 27 Jan 2016 13:40:39 GMT
Repository: flink
Updated Branches:
  refs/heads/master ace25c88a -> fb25cd8bf


[hotfix] Fix interaction of Async calls/checkpointing/canceling

Before, it could happen that a Task is canceled during snapshotting.
Some State Backends would silently swallow exceptions resulting from
this and the Task would get stuck until the cleanup logic gets to it.

Now, we rethrow a CancelTaskException if isRunning is false in
StreamTask after performing snapshots.

This also moves the logic that swallows exceptions in case a task is not
running anymore from StreamTask to the async caller in Task.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fb25cd8b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fb25cd8b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fb25cd8b

Branch: refs/heads/master
Commit: fb25cd8bf192128439366798bbd55298f5c83b68
Parents: ace25c8
Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Authored: Wed Jan 20 18:09:12 2016 +0100
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Wed Jan 27 14:40:20 2016 +0100

----------------------------------------------------------------------
 .../apache/flink/runtime/taskmanager/Task.java  |  14 ++-
 .../streaming/runtime/tasks/StreamTask.java     | 113 +++++++++----------
 2 files changed, 67 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fb25cd8b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 9cc1be4..ed322ea 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -882,7 +882,11 @@ public class Task implements Runnable {
 							}
 						}
 						catch (Throwable t) {
-							failExternally(new RuntimeException("Error while triggering checkpoint for " + taskName,
t));
+							if (getExecutionState() == ExecutionState.RUNNING) {
+								failExternally(new RuntimeException(
+									"Error while triggering checkpoint for " + taskName,
+									t));
+							}
 						}
 					}
 				};
@@ -915,8 +919,12 @@ public class Task implements Runnable {
 							statefulTask.notifyCheckpointComplete(checkpointID);
 						}
 						catch (Throwable t) {
-							// fail task if checkpoint confirmation failed.
-							failExternally(new RuntimeException("Error while confirming checkpoint", t));
+							if (getExecutionState() == ExecutionState.RUNNING) {
+								// fail task if checkpoint confirmation failed.
+								failExternally(new RuntimeException(
+									"Error while confirming checkpoint",
+									t));
+							}
 						}
 					}
 				};

http://git-wip-us.apache.org/repos/asf/flink/blob/fb25cd8b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 72f74ad..65cfebb 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -29,6 +29,7 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
+import org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
@@ -457,74 +458,72 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 				operatorChain.broadcastCheckpointBarrier(checkpointId, timestamp);
 				
 				// now draw the state snapshot
-				try {
-					final StreamOperator<?>[] allOperators = operatorChain.getAllOperators();
-					final StreamTaskState[] states = new StreamTaskState[allOperators.length];
+				final StreamOperator<?>[] allOperators = operatorChain.getAllOperators();
+				final StreamTaskState[] states = new StreamTaskState[allOperators.length];
 
-					boolean hasAsyncStates = false;
+				boolean hasAsyncStates = false;
 
-					for (int i = 0; i < states.length; i++) {
-						StreamOperator<?> operator = allOperators[i];
-						if (operator != null) {
-							StreamTaskState state = operator.snapshotOperatorState(checkpointId, timestamp);
-							if (state.getOperatorState() instanceof AsynchronousStateHandle) {
-								hasAsyncStates = true;
-							}
-							if (state.getFunctionState() instanceof AsynchronousStateHandle) {
-								hasAsyncStates = true;
-							}
-							states[i] = state.isEmpty() ? null : state;
+				for (int i = 0; i < states.length; i++) {
+					StreamOperator<?> operator = allOperators[i];
+					if (operator != null) {
+						StreamTaskState state = operator.snapshotOperatorState(checkpointId, timestamp);
+						if (state.getOperatorState() instanceof AsynchronousStateHandle) {
+							hasAsyncStates = true;
 						}
+						if (state.getFunctionState() instanceof AsynchronousStateHandle) {
+							hasAsyncStates = true;
+						}
+						states[i] = state.isEmpty() ? null : state;
 					}
+				}
 
+				if (!isRunning) {
+					// Rethrow the cancel exception because some state backends could swallow
+					// exceptions and seem to exit cleanly.
+					throw new CancelTaskException();
+				}
 
-					StreamTaskStateList allStates = new StreamTaskStateList(states);
-
-					if (allStates.isEmpty()) {
-						getEnvironment().acknowledgeCheckpoint(checkpointId);
-					} else if (!hasAsyncStates) {
-						getEnvironment().acknowledgeCheckpoint(checkpointId, allStates);
-					} else {
-						// start a Thread that does the asynchronous materialization and
-						// then sends the checkpoint acknowledge
-
-						Thread checkpointThread = new Thread() {
-							@Override
-							public void run() {
-								try {
-									for (StreamTaskState state : states) {
-										if (state != null) {
-											if (state.getFunctionState() instanceof AsynchronousStateHandle) {
-												AsynchronousStateHandle<?> asyncState = (AsynchronousStateHandle<?>)
state.getFunctionState();
-												state.setFunctionState((StateHandle) asyncState.materialize());
-											}
-											if (state.getOperatorState() instanceof AsynchronousStateHandle) {
-												AsynchronousStateHandle<?> asyncState = (AsynchronousStateHandle<?>)
state.getOperatorState();
-												state.setOperatorState((StateHandle) asyncState.materialize());
-											}
+				StreamTaskStateList allStates = new StreamTaskStateList(states);
+
+				if (allStates.isEmpty()) {
+					getEnvironment().acknowledgeCheckpoint(checkpointId);
+				} else if (!hasAsyncStates) {
+					getEnvironment().acknowledgeCheckpoint(checkpointId, allStates);
+				} else {
+					// start a Thread that does the asynchronous materialization and
+					// then sends the checkpoint acknowledge
+
+					Thread checkpointThread = new Thread() {
+						@Override
+						public void run() {
+							try {
+								for (StreamTaskState state : states) {
+									if (state != null) {
+										if (state.getFunctionState() instanceof AsynchronousStateHandle) {
+											AsynchronousStateHandle<?> asyncState = (AsynchronousStateHandle<?>)
state.getFunctionState();
+											state.setFunctionState((StateHandle) asyncState.materialize());
+										}
+										if (state.getOperatorState() instanceof AsynchronousStateHandle) {
+											AsynchronousStateHandle<?> asyncState = (AsynchronousStateHandle<?>)
state.getOperatorState();
+											state.setOperatorState((StateHandle) asyncState.materialize());
 										}
-									}
-									StreamTaskStateList allStates = new StreamTaskStateList(states);
-									getEnvironment().acknowledgeCheckpoint(checkpointId, allStates);
-								} catch (Exception e) {
-									LOG.error("Caught exception while materializing asynchronous checkpoints.", e);
-									if (asyncException == null) {
-										asyncException = new AsynchronousException(e);
 									}
 								}
-								asyncCheckpointThreads.remove(this);
-								LOG.debug("Finished asynchronous checkpoints for checkpoint {} on task {}", checkpointId,
getName());
+								StreamTaskStateList allStates = new StreamTaskStateList(states);
+								getEnvironment().acknowledgeCheckpoint(checkpointId, allStates);
+							} catch (Exception e) {
+								LOG.error("Caught exception while materializing asynchronous checkpoints.", e);
+								if (asyncException == null) {
+									asyncException = new AsynchronousException(e);
+								}
 							}
-						};
+							asyncCheckpointThreads.remove(this);
+							LOG.debug("Finished asynchronous checkpoints for checkpoint {} on task {}", checkpointId,
getName());
+						}
+					};
 
-						asyncCheckpointThreads.add(checkpointThread);
-						checkpointThread.start();
-					}
-				}
-				catch (Exception e) {
-					if (isRunning) {
-						throw e;
-					}
+					asyncCheckpointThreads.add(checkpointThread);
+					checkpointThread.start();
 				}
 				return true;
 			} else {


Mime
View raw message