flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [1/4] flink git commit: [hotfix] Extend TestingFatalErrorHandler to return an error future
Date Wed, 28 Mar 2018 20:16:13 GMT
Repository: flink
Updated Branches:
  refs/heads/release-1.5 535e26792 -> c0cf2234c


[hotfix] Extend TestingFatalErrorHandler to return an error future


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7b420995
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7b420995
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7b420995

Branch: refs/heads/release-1.5
Commit: 7b4209955a7efff3de6b98882557213301f8a38e
Parents: 535e267
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Thu Mar 22 10:46:04 2018 +0100
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Wed Mar 28 22:15:56 2018 +0200

----------------------------------------------------------------------
 .../runtime/util/TestingFatalErrorHandler.java  | 62 ++++++++++++++++----
 1 file changed, 49 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7b420995/flink-runtime/src/test/java/org/apache/flink/runtime/util/TestingFatalErrorHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/TestingFatalErrorHandler.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/TestingFatalErrorHandler.java
index 616313c..d07d356 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/TestingFatalErrorHandler.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/TestingFatalErrorHandler.java
@@ -19,10 +19,18 @@
 package org.apache.flink.runtime.util;
 
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.concurrent.atomic.AtomicReference;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 
 /**
  * Testing fatal error handler which records the occurred exceptions during the execution
of the
@@ -30,34 +38,62 @@ import java.util.concurrent.atomic.AtomicReference;
  */
 public class TestingFatalErrorHandler implements FatalErrorHandler {
 	private static final Logger LOG = LoggerFactory.getLogger(TestingFatalErrorHandler.class);
-	private final AtomicReference<Throwable> atomicThrowable;
+	private CompletableFuture<Throwable> errorFuture;
 
 	public TestingFatalErrorHandler() {
-		atomicThrowable = new AtomicReference<>(null);
+		errorFuture = new CompletableFuture<>();
 	}
 
-	public void rethrowError() throws TestingException {
-		Throwable throwable = atomicThrowable.get();
+	public synchronized void rethrowError() throws TestingException {
+		final Throwable throwable = getException();
 
 		if (throwable != null) {
-			throw new TestingException(throwable);
+            throw new TestingException(throwable);
+        }
+	}
+
+	public synchronized boolean hasExceptionOccurred() {
+		return errorFuture.isDone();
+	}
+
+	@Nullable
+	public synchronized Throwable getException() {
+		if (errorFuture.isDone()) {
+			Throwable throwable = null;
+
+			try {
+				throwable = errorFuture.get();
+			} catch (InterruptedException ie) {
+				Thread.interrupted();
+				throw new FlinkRuntimeException("This should never happen since the future was completed.");
+			} catch (ExecutionException e) {
+				throwable = ExceptionUtils.stripExecutionException(e);
+			}
+
+			return throwable;
+		} else {
+			return null;
 		}
 	}
 
-	public boolean hasExceptionOccurred() {
-		return atomicThrowable.get() != null;
+	public synchronized CompletableFuture<Throwable> getErrorFuture() {
+		return errorFuture;
 	}
 
-	public Throwable getException() {
-		return atomicThrowable.get();
+	public synchronized void clearError() {
+		errorFuture = new CompletableFuture<>();
 	}
 
 	@Override
-	public void onFatalError(Throwable exception) {
+	public synchronized void onFatalError(@Nonnull Throwable exception) {
 		LOG.error("OnFatalError:", exception);
 
-		if (!atomicThrowable.compareAndSet(null, exception)) {
-			atomicThrowable.get().addSuppressed(exception);
+		if (!errorFuture.complete(exception)) {
+			final Throwable throwable = getException();
+
+			Preconditions.checkNotNull(throwable);
+
+			throwable.addSuppressed(exception);
 		}
 	}
 


Mime
View raw message