flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gyf...@apache.org
Subject [02/12] flink git commit: [streaming] fix for null state in ConfirmCheckpoint messages
Date Thu, 25 Jun 2015 17:21:35 GMT
[streaming] fix for null state in ConfirmCheckpoint messages


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0ecab82a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0ecab82a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0ecab82a

Branch: refs/heads/master
Commit: 0ecab82add0946503a000162813bb820f6f3b4d4
Parents: e2e73ad
Author: Paris Carbone <seniorcarbone@gmail.com>
Authored: Fri May 29 11:57:34 2015 +0200
Committer: Gyula Fora <gyfora@apache.org>
Committed: Thu Jun 25 16:38:06 2015 +0200

----------------------------------------------------------------------
 .../runtime/checkpoint/CheckpointCoordinator.java    |  6 +++++-
 .../runtime/checkpoint/SuccessfulCheckpoint.java     | 15 +++++++++++++--
 .../messages/checkpoint/ConfirmCheckpoint.java       |  4 ++++
 3 files changed, 22 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0ecab82a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 8a31bee..91fd424 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -32,6 +32,8 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.messages.checkpoint.ConfirmCheckpoint;
 import org.apache.flink.runtime.messages.checkpoint.TriggerCheckpoint;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.util.SerializedValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -373,8 +375,10 @@ public class CheckpointCoordinator {
 				Execution ee = ev.getCurrentExecutionAttempt();
 				if (ee != null) {
 					ExecutionAttemptID attemptId = ee.getAttemptId();
+					StateForTask stateForTask = completed.getState(ev.getJobvertexId());
+					SerializedValue<StateHandle<?>> taskState = (stateForTask != null) ? stateForTask.getState()
: null;
 					ConfirmCheckpoint confirmMessage = new ConfirmCheckpoint(job, attemptId, checkpointId,

-							timestamp, completed.getState(ev.getJobvertexId()).getState() );
+							timestamp, taskState);
 					ev.sendMessageToCurrentExecution(confirmMessage, ee.getAttemptId());
 				}
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/0ecab82a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SuccessfulCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SuccessfulCheckpoint.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SuccessfulCheckpoint.java
index 85a3a79..c277ea3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SuccessfulCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SuccessfulCheckpoint.java
@@ -72,10 +72,21 @@ public class SuccessfulCheckpoint {
 	public List<StateForTask> getStates() {
 		return states;
 	}
-	
+
+	/**
+	 * Returns the task state included in the checkpoint for a given JobVertexID if it exists
or 
+	 * null if no state is included for that id.
+	 * 
+	 * @param jobVertexID
+	 * @return
+	 */
 	public StateForTask getState(JobVertexID jobVertexID)
 	{
-		return vertexToState.get(jobVertexID);
+		if(vertexToState.containsKey(jobVertexID)) {
+			return vertexToState.get(jobVertexID);
+		}
+		
+		return null;
 	}
 
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/0ecab82a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/ConfirmCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/ConfirmCheckpoint.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/ConfirmCheckpoint.java
index 43b5d4c..328f692 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/ConfirmCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/ConfirmCheckpoint.java
@@ -53,6 +53,10 @@ public class ConfirmCheckpoint extends AbstractCheckpointMessage implements
java
 
 	// --------------------------------------------------------------------------------------------
 
+	/**
+	 * Returns the stateHandle that was included in the confirmed checkpoint for a given task
or null
+	 * if no state was commited in that checkpoint.
+	 */
 	public SerializedValue<StateHandle<?>> getState() {
 		return state;
 	}


Mime
View raw message