flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [1/8] flink git commit: [runtime] Fix TaskExecutionState against non-serializable exceptions.
Date Mon, 11 May 2015 20:19:35 GMT
Repository: flink
Updated Branches:
  refs/heads/master d259e6962 -> fbea2da26


[runtime] Fix TaskExecutionState against non-serializable exceptions.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1d368a4b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1d368a4b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1d368a4b

Branch: refs/heads/master
Commit: 1d368a4b77f3ea53809e622564c87b657547c405
Parents: 1c8d866
Author: Stephan Ewen <sewen@apache.org>
Authored: Sun May 3 01:57:37 2015 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Mon May 11 21:07:57 2015 +0200

----------------------------------------------------------------------
 .../runtime/taskmanager/TaskExecutionState.java | 37 ++++++++++++++++----
 1 file changed, 30 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1d368a4b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java
index 96d56eb..6c85ab5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java
@@ -18,12 +18,12 @@
 
 package org.apache.flink.runtime.taskmanager;
 
-import java.io.IOException;
 import java.util.Arrays;
 
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.InstantiationUtil;
 
 /**
@@ -68,6 +68,7 @@ public class TaskExecutionState implements java.io.Serializable {
 	
 	/**
 	 * Creates a new task execution state update, with an attached exception.
+	 * This constructor may never throw an exception.
 	 * 
 	 * @param jobID
 	 *        the ID of the job the task belongs to
@@ -91,13 +92,35 @@ public class TaskExecutionState implements java.io.Serializable {
 		this.cachedError = error;
 
 		if (error != null) {
+			byte[] serializedError;
 			try {
-				this.serializedError = InstantiationUtil.serializeObject(error);
+				serializedError = InstantiationUtil.serializeObject(error);
 			}
-			catch (IOException e) {
-				throw new RuntimeException("Error while serializing task exception", e);
+			catch (Throwable t) {
+				// could not serialize exception. send the stringified version instead
+				try {
+					this.cachedError = new Exception(ExceptionUtils.stringifyException(error));
+					serializedError = InstantiationUtil.serializeObject(this.cachedError);
+				}
+				catch (Throwable tt) {
+					// seems like we cannot do much to report the actual exception
+					// report a placeholder instead
+					try {
+						this.cachedError = new Exception("Cause is a '" + error.getClass().getName()
+								+ "' (failed to serialize or stringify)");
+						serializedError = InstantiationUtil.serializeObject(this.cachedError);
+					}
+					catch (Throwable ttt) {
+						// this should never happen unless the JVM is fubar.
+						// we just report the state without the error
+						this.cachedError = null;
+						serializedError = null;
+					}
+				}
 			}
-		} else {
+			this.serializedError = serializedError;
+		}
+		else {
 			this.serializedError = null;
 		}
 	}
@@ -181,7 +204,7 @@ public class TaskExecutionState implements java.io.Serializable {
 	public String toString() {
 		return String.format("TaskState jobId=%s, executionId=%s, state=%s, error=%s", 
 				jobID, executionId, executionState,
-				cachedError == null ? "(null)"
-									: cachedError.getClass().getName() + ": " + cachedError.getMessage());
+				cachedError == null ? (serializedError == null ? "(null)" : "(serialized)")
+									: (cachedError.getClass().getName() + ": " + cachedError.getMessage()));
 	}
 }


Mime
View raw message