flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject flink git commit: [FLINK-3594] [runtime] Make sure exceptions during checkpoints are handled properly
Date Fri, 11 Mar 2016 13:28:56 GMT
Repository: flink
Updated Branches:
  refs/heads/master 5aca6f4a7 -> 34110fefc


[FLINK-3594] [runtime] Make sure exceptions during checkpoints are handled properly

  - For the asynchronous trigger, exceptions are suppressed if the task is no longer running
  - The task cannot go to "not running" while a checkpoint is still in progress.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/34110fef
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/34110fef
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/34110fef

Branch: refs/heads/master
Commit: 34110fefc6e49bcbe1ae8de090fa43a5f6ef5fec
Parents: 5aca6f4
Author: Stephan Ewen <sewen@apache.org>
Authored: Thu Mar 10 21:30:49 2016 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Fri Mar 11 14:28:06 2016 +0100

----------------------------------------------------------------------
 .../streaming/runtime/tasks/StreamTask.java     | 36 +++++++++++++++-----
 1 file changed, 28 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/34110fef/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 7cd4cf3..7138d53 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -207,6 +207,7 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 
 			// first order of business is to give operators back their state
 			restoreState();
+			lazyRestoreState = null; // GC friendliness
 			
 			// we need to make sure that any triggers scheduled in open() cannot be
 			// executed before all operators are opened
@@ -219,23 +220,25 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 				throw new CancelTaskException();
 			}
 
-				// let the task do its work
+			// let the task do its work
 			isRunning = true;
 			run();
-			isRunning = false;
-			
-			if (LOG.isDebugEnabled()) {
-				LOG.debug("Finished task {}", getName());
-			}
+
+			LOG.debug("Finished task {}", getName());
 			
 			// make sure no further checkpoint and notification actions happen.
 			// we make sure that no other thread is currently in the locked scope before
 			// we close the operators by trying to acquire the checkpoint scope lock
 			// we also need to make sure that no triggers fire concurrently with the close logic
+			// at the same time, this makes sure that during any "regular" exit where still
 			synchronized (lock) {
+				isRunning = false;
+				
 				// this is part of the main logic, so if this fails, the task is considered failed
 				closeAllOperators();
 			}
+
+			LOG.debug("Closed operators for task {}", getName());
 			
 			// make sure all buffered data is flushed
 			operatorChain.flushOutputs();
@@ -457,7 +460,21 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 	}
 
 	@Override
-	public boolean triggerCheckpoint(final long checkpointId, final long timestamp) throws Exception
{
+	public boolean triggerCheckpoint(long checkpointId, long timestamp) throws Exception {
+		try {
+			return performCheckpoint(checkpointId, timestamp);
+		}
+		catch (Exception e) {
+			// propagate exceptions only if the task is still in "running" state
+			if (isRunning) {
+				throw e;
+			} else {
+				return false;
+			}
+		}
+	}
+
+	protected boolean performCheckpoint(final long checkpointId, final long timestamp) throws
Exception {
 		LOG.debug("Starting checkpoint {} on task {}", checkpointId, getName());
 		
 		synchronized (lock) {
@@ -675,7 +692,10 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 			@Override
 			public void onEvent(CheckpointBarrier barrier) {
 				try {
-					triggerCheckpoint(barrier.getId(), barrier.getTimestamp());
+					performCheckpoint(barrier.getId(), barrier.getTimestamp());
+				}
+				catch (CancelTaskException e) {
+					throw e;
 				}
 				catch (Exception e) {
 					throw new RuntimeException("Error triggering a checkpoint as the result of receiving
checkpoint barrier", e);


Mime
View raw message