flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [4/5] flink git commit: [FLINK-8466] [runtime] Make sure ErrorInfo references no user-defined classes.
Date Wed, 24 Jan 2018 21:30:59 GMT
[FLINK-8466] [runtime] Make sure ErrorInfo references no user-defined classes.

That way, holding on to the ErrorInfo does not prevent class unloading.

However, this implies that the ErrorInfo must not hold strong references to any Exception
classes.
For that reason, the commit pull the "ground truth" exception into a separate fields, so that
the
ExecutionGraph logic itself can always assume to have the proper ground-truth exception.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4d1ba45e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4d1ba45e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4d1ba45e

Branch: refs/heads/release-1.4
Commit: 4d1ba45e78fa4ed6cd5af0405c7988ffaa6dee13
Parents: a8ea169
Author: Stephan Ewen <sewen@apache.org>
Authored: Tue Jan 23 22:00:06 2018 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Wed Jan 24 18:57:02 2018 +0100

----------------------------------------------------------------------
 .../apache/flink/util/SerializedThrowable.java  |  4 ++
 .../itcases/AbstractQueryableStateTestBase.java |  2 +-
 .../executiongraph/AccessExecutionGraph.java    |  2 +-
 .../executiongraph/ArchivedExecutionGraph.java  |  2 +-
 .../flink/runtime/executiongraph/ErrorInfo.java | 34 +++--------
 .../runtime/executiongraph/ExecutionGraph.java  | 64 ++++++++------------
 .../rest/handler/job/JobExceptionsHandler.java  |  2 +-
 .../handler/legacy/JobExceptionsHandler.java    |  2 +-
 .../ArchivedExecutionGraphTest.java             |  2 +-
 .../runtime/executiongraph/ErrorInfoTest.java   | 62 +++++++++++++++++++
 .../ExecutionGraphRestartTest.java              |  2 +-
 .../ExecutionGraphSuspendTest.java              |  2 +-
 .../legacy/JobExceptionsHandlerTest.java        |  4 +-
 13 files changed, 111 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4d1ba45e/flink-core/src/main/java/org/apache/flink/util/SerializedThrowable.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/SerializedThrowable.java b/flink-core/src/main/java/org/apache/flink/util/SerializedThrowable.java
index dab7cda..6a721b3 100644
--- a/flink-core/src/main/java/org/apache/flink/util/SerializedThrowable.java
+++ b/flink-core/src/main/java/org/apache/flink/util/SerializedThrowable.java
@@ -136,6 +136,10 @@ public class SerializedThrowable extends Exception implements Serializable
{
 		return originalErrorClassName;
 	}
 
+	public String getFullStringifiedStackTrace() {
+		return fullStringifiedStackTrace;
+	}
+
 	// ------------------------------------------------------------------------
 	//  Override the behavior of Throwable
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/4d1ba45e/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
index 5a28367..9ca3bda 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
@@ -340,7 +340,7 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger
{
 						.mapTo(ClassTag$.MODULE$.<JobManagerMessages.JobFound>apply(JobManagerMessages.JobFound.class)))
 				.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
 
-		String failureCause = jobFound.executionGraph().getFailureCause().getExceptionAsString();
+		String failureCause = jobFound.executionGraph().getFailureInfo().getExceptionAsString();
 
 		assertEquals(JobStatus.FAILED, jobFound.executionGraph().getState());
 		assertTrue("Not instance of SuppressRestartsException", failureCause.startsWith("org.apache.flink.runtime.execution.SuppressRestartsException"));

http://git-wip-us.apache.org/repos/asf/flink/blob/4d1ba45e/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
index ebc0768..ce490dd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
@@ -70,7 +70,7 @@ public interface AccessExecutionGraph {
 	 * @return failure causing exception, or null
 	 */
 	@Nullable
-	ErrorInfo getFailureCause();
+	ErrorInfo getFailureInfo();
 
 	/**
 	 * Returns the job vertex for the given {@link JobVertexID}.

http://git-wip-us.apache.org/repos/asf/flink/blob/4d1ba45e/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
index 4481e1b..20c2c8f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
@@ -143,7 +143,7 @@ public class ArchivedExecutionGraph implements AccessExecutionGraph, Serializabl
 	}
 
 	@Override
-	public ErrorInfo getFailureCause() {
+	public ErrorInfo getFailureInfo() {
 		return failureCause;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4d1ba45e/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ErrorInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ErrorInfo.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ErrorInfo.java
index d919bfa..9fe569f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ErrorInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ErrorInfo.java
@@ -18,11 +18,9 @@
 
 package org.apache.flink.runtime.executiongraph;
 
-import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SerializedThrowable;
 
-import java.io.IOException;
-import java.io.ObjectOutputStream;
 import java.io.Serializable;
 
 /**
@@ -32,26 +30,25 @@ public class ErrorInfo implements Serializable {
 
 	private static final long serialVersionUID = -6138942031953594202L;
 
-	private final transient Throwable exception;
-	private final long timestamp;
+	/** The exception that we keep holding forever. Has no strong reference to any user-defined
code. */
+	private final SerializedThrowable exception;
 
-	private volatile String exceptionAsString;
+	private final long timestamp;
 
 	public ErrorInfo(Throwable exception, long timestamp) {
 		Preconditions.checkNotNull(exception);
 		Preconditions.checkArgument(timestamp > 0);
 
-		this.exception = exception;
+		this.exception = exception instanceof SerializedThrowable ?
+				(SerializedThrowable) exception : new SerializedThrowable(exception);
 		this.timestamp = timestamp;
 	}
 
 	/**
-	 * Returns the contained exception.
-	 *
-	 * @return contained exception, or {@code "(null)"} if either no exception was set or this
object has been deserialized
+	 * Returns the serialized form of the original exception.
 	 */
-	Throwable getException() {
-		return exception;
+	public SerializedThrowable getException() {
+		return this.exception;
 	}
 
 	/**
@@ -60,10 +57,7 @@ public class ErrorInfo implements Serializable {
 	 * @return failure causing exception as a string, or {@code "(null)"}
 	 */
 	public String getExceptionAsString() {
-		if (exceptionAsString == null) {
-			exceptionAsString = ExceptionUtils.stringifyException(exception);
-		}
-		return exceptionAsString;
+		return exception.getFullStringifiedStackTrace();
 	}
 
 	/**
@@ -74,12 +68,4 @@ public class ErrorInfo implements Serializable {
 	public long getTimestamp() {
 		return timestamp;
 	}
-
-	private void writeObject(ObjectOutputStream out) throws IOException {
-		// make sure that the exception was stringified so it isn't lost during serialization
-		if (exceptionAsString == null) {
-			exceptionAsString = ExceptionUtils.stringifyException(exception);
-		}
-		out.defaultWriteObject();
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4d1ba45e/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 8a74001..7fa55be 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -264,7 +264,12 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 
 	/** The exception that caused the job to fail. This is set to the first root exception
 	 * that was not recoverable and triggered job failure */
-	private volatile ErrorInfo failureCause;
+	private volatile Throwable failureCause;
+
+	/** The extended failure cause information for the job. This exists in addition to 'failureCause',
+	 * to let 'failureCause' be a strong reference to the exception, while this info holds no
+	 * strong reference to any user-defined classes.*/
+	private volatile ErrorInfo failureInfo;
 
 	// ------ Fields that are relevant to the execution and need to be cleared before archiving
 -------
 
@@ -620,10 +625,14 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 		return state;
 	}
 
-	public ErrorInfo getFailureCause() {
+	public Throwable getFailureCause() {
 		return failureCause;
 	}
 
+	public ErrorInfo getFailureInfo() {
+		return failureInfo;
+	}
+
 	/**
 	 * Gets the number of full restarts that the execution graph went through.
 	 * If a full restart recovery is currently pending, this recovery is included in the
@@ -1026,25 +1035,6 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 	 * @param suspensionCause Cause of the suspension
 	 */
 	public void suspend(Throwable suspensionCause) {
-		suspend(new ErrorInfo(suspensionCause, System.currentTimeMillis()));
-	}
-
-	/**
-	 * Suspends the current ExecutionGraph.
-	 *
-	 * The JobStatus will be directly set to SUSPENDED iff the current state is not a terminal
-	 * state. All ExecutionJobVertices will be canceled and the postRunCleanup is executed.
-	 *
-	 * The SUSPENDED state is a local terminal state which stops the execution of the job but
does
-	 * not remove the job from the HA job store so that it can be recovered by another JobManager.
-	 *
-	 * @param errorInfo ErrorInfo containing the cause of the suspension
-	 */
-	public void suspend(ErrorInfo errorInfo) {
-		Throwable suspensionCause = errorInfo != null
-			? errorInfo.getException()
-			: null;
-
 		while (true) {
 			JobStatus currentState = state;
 
@@ -1052,7 +1042,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 				// stay in a terminal state
 				return;
 			} else if (transitionState(currentState, JobStatus.SUSPENDED, suspensionCause)) {
-				this.failureCause = errorInfo;
+				initFailureCause(suspensionCause);
 
 				// make sure no concurrent local actions interfere with the cancellation
 				incrementGlobalModVersion();
@@ -1072,10 +1062,6 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 		}
 	}
 
-	public void failGlobal(Throwable error) {
-		failGlobal(new ErrorInfo(error, System.currentTimeMillis()));
-	}
-
 	/**
 	 * Fails the execution graph globally. This failure will not be recovered by a specific
 	 * failover strategy, but results in a full restart of all tasks.
@@ -1085,13 +1071,9 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 	 * exceptions that indicate a bug or an unexpected call race), and where a full restart
is the
 	 * safe way to get consistency back.
 	 *
-	 * @param errorInfo ErrorInfo containing the exception that caused the failure.
+	 * @param t The exception that caused the failure.
 	 */
-	public void failGlobal(ErrorInfo errorInfo) {
-		Throwable t = errorInfo != null
-			? errorInfo.getException()
-			: null;
-
+	public void failGlobal(Throwable t) {
 		while (true) {
 			JobStatus current = state;
 			// stay in these states
@@ -1103,7 +1085,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 			else if (current == JobStatus.RESTARTING) {
 				// we handle 'failGlobal()' while in 'RESTARTING' as a safety net in case something
 				// has gone wrong in 'RESTARTING' and we need to re-attempt the restarts
-				this.failureCause = errorInfo;
+				initFailureCause(t);
 
 				final long globalVersionForRestart = incrementGlobalModVersion();
 				if (tryRestartOrFail(globalVersionForRestart)) {
@@ -1111,7 +1093,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 				}
 			}
 			else if (transitionState(current, JobStatus.FAILING, t)) {
-				this.failureCause = errorInfo;
+				initFailureCause(t);
 
 				// make sure no concurrent local or global actions interfere with the failover
 				final long globalVersionForRestart = incrementGlobalModVersion();
@@ -1304,6 +1286,11 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 		return GLOBAL_VERSION_UPDATER.incrementAndGet(this);
 	}
 
+	private void initFailureCause(Throwable t) {
+		this.failureCause = t;
+		this.failureInfo = new ErrorInfo(t, System.currentTimeMillis());
+	}
+
 	// ------------------------------------------------------------------------
 	//  Job Status Progress
 	// ------------------------------------------------------------------------
@@ -1399,9 +1386,8 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 		JobStatus currentState = state;
 
 		if (currentState == JobStatus.FAILING || currentState == JobStatus.RESTARTING) {
-			Throwable failureCause = this.failureCause != null
-				? this.failureCause.getException()
-				: null;
+			final Throwable failureCause = this.failureCause;
+
 			synchronized (progressLock) {
 				if (LOG.isDebugEnabled()) {
 					LOG.debug("Try to restart or fail the job {} ({}) if no longer possible.", getJobName(),
getJobID(), failureCause);
@@ -1678,7 +1664,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 				catch (Throwable t) {
 					// bug in the failover strategy - fall back to global failover
 					LOG.warn("Error in failover strategy - falling back to global restart", t);
-					failGlobal(new ErrorInfo(ex, timestamp));
+					failGlobal(ex);
 				}
 			}
 		}
@@ -1710,7 +1696,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 			archivedVerticesInCreationOrder,
 			stateTimestamps,
 			getState(),
-			failureCause,
+			failureInfo,
 			getJsonPlan(),
 			getAccumulatorResultsStringified(),
 			serializedUserAccumulators,

http://git-wip-us.apache.org/repos/asf/flink/blob/4d1ba45e/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
index feabbea..ea3fd51 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
@@ -65,7 +65,7 @@ public class JobExceptionsHandler extends AbstractExecutionGraphHandler<JobExcep
 
 	@Override
 	protected JobExceptionsInfo handleRequest(HandlerRequest<EmptyRequestBody, JobMessageParameters>
request, AccessExecutionGraph executionGraph) {
-		ErrorInfo rootException = executionGraph.getFailureCause();
+		ErrorInfo rootException = executionGraph.getFailureInfo();
 		String rootExceptionMessage = null;
 		Long rootTimestamp = null;
 		if (rootException != null && !rootException.getExceptionAsString().equals(ExceptionUtils.STRINGIFIED_NULL_EXCEPTION))
{

http://git-wip-us.apache.org/repos/asf/flink/blob/4d1ba45e/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandler.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandler.java
index 6a4cc0d..a6bae86 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandler.java
@@ -92,7 +92,7 @@ public class JobExceptionsHandler extends AbstractExecutionGraphRequestHandler
{
 		gen.writeStartObject();
 
 		// most important is the root failure cause
-		ErrorInfo rootException = graph.getFailureCause();
+		ErrorInfo rootException = graph.getFailureInfo();
 		if (rootException != null && !rootException.getExceptionAsString().equals(ExceptionUtils.STRINGIFIED_NULL_EXCEPTION))
{
 			gen.writeStringField("root-exception", rootException.getExceptionAsString());
 			gen.writeNumberField("timestamp", rootException.getTimestamp());

http://git-wip-us.apache.org/repos/asf/flink/blob/4d1ba45e/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
index 0d7c8e6..a96a03e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
@@ -171,7 +171,7 @@ public class ArchivedExecutionGraphTest extends TestLogger {
 		assertEquals(runtimeGraph.getJobID(), archivedGraph.getJobID());
 		assertEquals(runtimeGraph.getJobName(), archivedGraph.getJobName());
 		assertEquals(runtimeGraph.getState(), archivedGraph.getState());
-		assertEquals(runtimeGraph.getFailureCause().getExceptionAsString(), archivedGraph.getFailureCause().getExceptionAsString());
+		assertEquals(runtimeGraph.getFailureInfo().getExceptionAsString(), archivedGraph.getFailureInfo().getExceptionAsString());
 		assertEquals(runtimeGraph.getStatusTimestamp(JobStatus.CREATED), archivedGraph.getStatusTimestamp(JobStatus.CREATED));
 		assertEquals(runtimeGraph.getStatusTimestamp(JobStatus.RUNNING), archivedGraph.getStatusTimestamp(JobStatus.RUNNING));
 		assertEquals(runtimeGraph.getStatusTimestamp(JobStatus.FAILING), archivedGraph.getStatusTimestamp(JobStatus.FAILING));

http://git-wip-us.apache.org/repos/asf/flink/blob/4d1ba45e/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ErrorInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ErrorInfoTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ErrorInfoTest.java
new file mode 100644
index 0000000..4841365
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ErrorInfoTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.core.testutils.CommonTestUtils;
+
+import org.junit.Test;
+
+import java.io.Serializable;
+import java.net.URL;
+import java.net.URLClassLoader;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Simple test for the {@link ErrorInfo}.
+ */
+public class ErrorInfoTest {
+
+	@Test
+	public void testSerializationWithExceptionOutsideClassLoader() throws Exception {
+		final ErrorInfo error = new ErrorInfo(new ExceptionWithCustomClassLoader(), System.currentTimeMillis());
+		final ErrorInfo copy = CommonTestUtils.createCopySerializable(error);
+
+		assertEquals(error.getTimestamp(), copy.getTimestamp());
+		assertEquals(error.getExceptionAsString(), copy.getExceptionAsString());
+		assertEquals(error.getException().getMessage(), copy.getException().getMessage());
+
+	}
+
+	// ------------------------------------------------------------------------
+
+	private static final class ExceptionWithCustomClassLoader extends Exception {
+
+		private static final long serialVersionUID = 42L;
+
+		private static final ClassLoader CUSTOM_LOADER = new URLClassLoader(new URL[0]);
+
+		@SuppressWarnings("unused")
+		private final Serializable outOfClassLoader = CommonTestUtils.createObjectForClassNotInClassPath(CUSTOM_LOADER);
+
+		public ExceptionWithCustomClassLoader() {
+			super("tada");
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4d1ba45e/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
index 8770b06..e788881 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
@@ -818,7 +818,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
 
 		waitUntilJobStatus(eg, JobStatus.FAILED, 1000);
 
-		final Throwable t = eg.getFailureCause().getException();
+		final Throwable t = eg.getFailureCause();
 		if (!(t instanceof NoResourceAvailableException)) {
 			ExceptionUtils.rethrowException(t, t.getMessage());
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/4d1ba45e/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java
index f0adc32..852a7a7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java
@@ -252,7 +252,7 @@ public class ExecutionGraphSuspendTest extends TestLogger {
 
 		assertEquals(JobStatus.SUSPENDED, eg.getState());
 
-		assertEquals(exception, eg.getFailureCause().getException());
+		assertEquals(exception, eg.getFailureCause());
 	}
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/4d1ba45e/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandlerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandlerTest.java
index 641bf96..0e96f36 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandlerTest.java
@@ -76,8 +76,8 @@ public class JobExceptionsHandlerTest {
 	private static void compareExceptions(AccessExecutionGraph originalJob, String json) throws
IOException {
 		JsonNode result = ArchivedJobGenerationUtils.MAPPER.readTree(json);
 
-		Assert.assertEquals(originalJob.getFailureCause().getExceptionAsString(), result.get("root-exception").asText());
-		Assert.assertEquals(originalJob.getFailureCause().getTimestamp(), result.get("timestamp").asLong());
+		Assert.assertEquals(originalJob.getFailureInfo().getExceptionAsString(), result.get("root-exception").asText());
+		Assert.assertEquals(originalJob.getFailureInfo().getTimestamp(), result.get("timestamp").asLong());
 
 		ArrayNode exceptions = (ArrayNode) result.get("all-exceptions");
 


Mime
View raw message