flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject flink git commit: [FLINK-8392] [rpc] Let termination future be completed by AkkaRpcActor#postStop
Date Wed, 10 Jan 2018 11:28:39 GMT
Repository: flink
Updated Branches:
  refs/heads/master 6033de01a -> 51a278778


[FLINK-8392] [rpc] Let termination future be completed by AkkaRpcActor#postStop

Revert the changes introduced by FLINK-7754. An RpcEndpoint's termination future is now
completed from the AkkaRpcActor#postStop method.

This closes #5266.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/51a27877
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/51a27877
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/51a27877

Branch: refs/heads/master
Commit: 51a278778c3536aa9f5030a8f43a7faea6889992
Parents: 6033de0
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Tue Jan 9 17:50:37 2018 +0100
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Wed Jan 10 12:27:50 2018 +0100

----------------------------------------------------------------------
 .../runtime/rpc/akka/AkkaBasedEndpoint.java     | 11 +--
 .../runtime/rpc/akka/AkkaInvocationHandler.java | 30 +++-----
 .../flink/runtime/rpc/akka/AkkaRpcActor.java    | 24 +++---
 .../flink/runtime/rpc/akka/AkkaRpcService.java  | 79 +++-----------------
 .../rpc/akka/FencedAkkaInvocationHandler.java   |  3 +-
 .../runtime/rpc/akka/FencedAkkaRpcActor.java    |  6 +-
 .../runtime/rpc/akka/AkkaRpcServiceTest.java    |  7 +-
 7 files changed, 40 insertions(+), 120 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/51a27877/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaBasedEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaBasedEndpoint.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaBasedEndpoint.java
index 499de1e..7493507 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaBasedEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaBasedEndpoint.java
@@ -22,10 +22,8 @@ import org.apache.flink.runtime.rpc.RpcGateway;
 
 import akka.actor.ActorRef;
 
-import java.util.concurrent.CompletableFuture;
-
 /**
- * Interface for Akka based rpc gateways
+ * Interface for Akka based rpc gateways.
  */
 interface AkkaBasedEndpoint extends RpcGateway {
 
@@ -35,11 +33,4 @@ interface AkkaBasedEndpoint extends RpcGateway {
 	 * @return the {@link ActorRef} of the underlying RPC actor
 	 */
 	ActorRef getActorRef();
-
-	/**
-	 * Returns the internal termination future.
-	 *
-	 * @return Internal termination future
-	 */
-	CompletableFuture<Void> getInternalTerminationFuture();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/51a27877/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
index 37f46e3..863b780 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
@@ -18,23 +18,24 @@
 
 package org.apache.flink.runtime.rpc.akka;
 
-import akka.actor.ActorRef;
-import akka.pattern.Patterns;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.rpc.FencedRpcGateway;
 import org.apache.flink.runtime.rpc.MainThreadExecutable;
-import org.apache.flink.runtime.rpc.RpcServer;
 import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.runtime.rpc.RpcServer;
 import org.apache.flink.runtime.rpc.RpcTimeout;
 import org.apache.flink.runtime.rpc.StartStoppable;
+import org.apache.flink.runtime.rpc.akka.messages.Processing;
 import org.apache.flink.runtime.rpc.messages.CallAsync;
 import org.apache.flink.runtime.rpc.messages.LocalRpcInvocation;
-import org.apache.flink.runtime.rpc.akka.messages.Processing;
 import org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation;
 import org.apache.flink.runtime.rpc.messages.RpcInvocation;
 import org.apache.flink.runtime.rpc.messages.RunAsync;
 import org.apache.flink.util.Preconditions;
+
+import akka.actor.ActorRef;
+import akka.pattern.Patterns;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -49,8 +50,8 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeoutException;
 
-import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * Invocation handler to be used with an {@link AkkaRpcActor}. The invocation handler wraps
the
@@ -85,18 +86,13 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaBasedEndpoint,
Rpc
 	@Nullable
 	private final CompletableFuture<Boolean> terminationFuture;
 
-	// null if gateway; otherwise non-null
-	@Nullable
-	private final CompletableFuture<Void> internalTerminationFuture;
-
 	AkkaInvocationHandler(
 			String address,
 			String hostname,
 			ActorRef rpcEndpoint,
 			Time timeout,
 			long maximumFramesize,
-			@Nullable CompletableFuture<Boolean> terminationFuture,
-			@Nullable CompletableFuture<Void> internalTerminationFuture) {
+			@Nullable CompletableFuture<Boolean> terminationFuture) {
 
 		this.address = Preconditions.checkNotNull(address);
 		this.hostname = Preconditions.checkNotNull(hostname);
@@ -105,7 +101,6 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaBasedEndpoint,
Rpc
 		this.timeout = Preconditions.checkNotNull(timeout);
 		this.maximumFramesize = maximumFramesize;
 		this.terminationFuture = terminationFuture;
-		this.internalTerminationFuture = internalTerminationFuture;
 	}
 
 	@Override
@@ -159,7 +154,7 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaBasedEndpoint,
Rpc
 
 	@Override
 	public <V> CompletableFuture<V> callAsync(Callable<V> callable, Time callTimeout)
{
-		if(isLocal) {
+		if (isLocal) {
 			@SuppressWarnings("unchecked")
 			CompletableFuture<V> resultFuture = (CompletableFuture<V>) ask(new CallAsync(callable),
callTimeout);
 
@@ -208,7 +203,7 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaBasedEndpoint,
Rpc
 			tell(rpcInvocation);
 
 			result = null;
-		} else if (Objects.equals(returnType,CompletableFuture.class)) {
+		} else if (Objects.equals(returnType, CompletableFuture.class)) {
 			// execute an asynchronous call
 			result = ask(rpcInvocation, futureTimeout);
 		} else {
@@ -298,7 +293,7 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaBasedEndpoint,
Rpc
 	}
 
 	/**
-	 * Checks whether any of the annotations is of type {@link RpcTimeout}
+	 * Checks whether any of the annotations is of type {@link RpcTimeout}.
 	 *
 	 * @param annotations Array of annotations
 	 * @return True if {@link RpcTimeout} was found; otherwise false
@@ -349,9 +344,4 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaBasedEndpoint,
Rpc
 	public CompletableFuture<Boolean> getTerminationFuture() {
 		return terminationFuture;
 	}
-
-	@Override
-	public CompletableFuture<Void> getInternalTerminationFuture() {
-		return internalTerminationFuture;
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/51a27877/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
index f7488ab..da7ce35 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
@@ -53,14 +53,14 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 /**
  * Akka rpc actor which receives {@link LocalRpcInvocation}, {@link RunAsync} and {@link
CallAsync}
  * {@link Processing} messages.
- * <p>
- * The {@link LocalRpcInvocation} designates a rpc and is dispatched to the given {@link
RpcEndpoint}
+ *
+ * <p>The {@link LocalRpcInvocation} designates a rpc and is dispatched to the given
{@link RpcEndpoint}
  * instance.
- * <p>
- * The {@link RunAsync} and {@link CallAsync} messages contain executable code which is executed
+ *
+ * <p>The {@link RunAsync} and {@link CallAsync} messages contain executable code which
is executed
  * in the context of the actor thread.
- * <p>
- * The {@link Processing} message controls the processing behaviour of the akka rpc actor.
A
+ *
+ * <p>The {@link Processing} message controls the processing behaviour of the akka
rpc actor. A
  * {@link Processing#START} starts processing incoming messages. A {@link Processing#STOP}
message
  * stops processing messages. All messages which arrive when the processing is stopped, will
be
  * discarded.
@@ -68,7 +68,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * @param <T> Type of the {@link RpcEndpoint}
  */
 class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends UntypedActor {
-	
+
 	protected final Logger log = LoggerFactory.getLogger(getClass());
 
 	/** the endpoint to invoke the methods on. */
@@ -77,12 +77,12 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends
UntypedActor {
 	/** the helper that tracks whether calls come from the main thread. */
 	private final MainThreadValidatorUtil mainThreadValidator;
 
-	private final CompletableFuture<Void> internalTerminationFuture;
+	private final CompletableFuture<Boolean> terminationFuture;
 
-	AkkaRpcActor(final T rpcEndpoint, final CompletableFuture<Void> internalTerminationFuture)
{
+	AkkaRpcActor(final T rpcEndpoint, final CompletableFuture<Boolean> terminationFuture)
{
 		this.rpcEndpoint = checkNotNull(rpcEndpoint, "rpc endpoint");
 		this.mainThreadValidator = new MainThreadValidatorUtil(rpcEndpoint);
-		this.internalTerminationFuture = checkNotNull(internalTerminationFuture);
+		this.terminationFuture = checkNotNull(terminationFuture);
 	}
 
 	@Override
@@ -106,9 +106,9 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends
UntypedActor {
 			// Complete the termination future so that others know that we've stopped.
 
 			if (shutdownThrowable != null) {
-				internalTerminationFuture.completeExceptionally(shutdownThrowable);
+				terminationFuture.completeExceptionally(shutdownThrowable);
 			} else {
-				internalTerminationFuture.complete(null);
+				terminationFuture.complete(null);
 			}
 		} finally {
 			mainThreadValidator.exitMainThread();

http://git-wip-us.apache.org/repos/asf/flink/blob/51a27877/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
index 68b5aaa..7ff08f7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
@@ -54,10 +54,8 @@ import javax.annotation.concurrent.ThreadSafe;
 import java.io.Serializable;
 import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.Proxy;
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Callable;
@@ -65,7 +63,6 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import java.util.function.Function;
 
 import scala.Option;
@@ -165,7 +162,6 @@ public class AkkaRpcService implements RpcService {
 					actorRef,
 					timeout,
 					maximumFramesize,
-					null,
 					null);
 			});
 	}
@@ -186,7 +182,6 @@ public class AkkaRpcService implements RpcService {
 					timeout,
 					maximumFramesize,
 					null,
-					null,
 					() -> fencingToken);
 			});
 	}
@@ -196,13 +191,12 @@ public class AkkaRpcService implements RpcService {
 		checkNotNull(rpcEndpoint, "rpc endpoint");
 
 		CompletableFuture<Boolean> terminationFuture = new CompletableFuture<>();
-		CompletableFuture<Void> internalTerminationFuture = new CompletableFuture<>();
 		final Props akkaRpcActorProps;
 
 		if (rpcEndpoint instanceof FencedRpcEndpoint) {
-			akkaRpcActorProps = Props.create(FencedAkkaRpcActor.class, rpcEndpoint, internalTerminationFuture);
+			akkaRpcActorProps = Props.create(FencedAkkaRpcActor.class, rpcEndpoint, terminationFuture);
 		} else {
-			akkaRpcActorProps = Props.create(AkkaRpcActor.class, rpcEndpoint, internalTerminationFuture);
+			akkaRpcActorProps = Props.create(AkkaRpcActor.class, rpcEndpoint, terminationFuture);
 		}
 
 		ActorRef actorRef;
@@ -240,7 +234,6 @@ public class AkkaRpcService implements RpcService {
 				timeout,
 				maximumFramesize,
 				terminationFuture,
-				internalTerminationFuture,
 				((FencedRpcEndpoint<?>) rpcEndpoint)::getFencingToken);
 
 			implementedRpcGateways.add(FencedMainThreadExecutable.class);
@@ -251,8 +244,7 @@ public class AkkaRpcService implements RpcService {
 				actorRef,
 				timeout,
 				maximumFramesize,
-				terminationFuture,
-				internalTerminationFuture);
+				terminationFuture);
 		}
 
 		// Rather than using the System ClassLoader directly, we derive the ClassLoader
@@ -280,7 +272,6 @@ public class AkkaRpcService implements RpcService {
 				timeout,
 				maximumFramesize,
 				null,
-				null,
 				() -> fencingToken);
 
 			// Rather than using the System ClassLoader directly, we derive the ClassLoader
@@ -300,43 +291,19 @@ public class AkkaRpcService implements RpcService {
 	@Override
 	public void stopServer(RpcServer selfGateway) {
 		if (selfGateway instanceof AkkaBasedEndpoint) {
-			AkkaBasedEndpoint akkaClient = (AkkaBasedEndpoint) selfGateway;
+			final AkkaBasedEndpoint akkaClient = (AkkaBasedEndpoint) selfGateway;
+			final RpcEndpoint rpcEndpoint;
 
-			boolean fromThisService;
 			synchronized (lock) {
 				if (stopped) {
 					return;
 				} else {
-					fromThisService = actors.remove(akkaClient.getActorRef()) != null;
+					rpcEndpoint = actors.remove(akkaClient.getActorRef());
 				}
 			}
 
-			if (fromThisService) {
-				ActorRef selfActorRef = akkaClient.getActorRef();
-				LOG.info("Trigger shut down of RPC endpoint {}.", selfGateway.getAddress());
-
-				CompletableFuture<Boolean> akkaTerminationFuture = FutureUtils.toJava(
-					Patterns.gracefulStop(
-						selfActorRef,
-						FutureUtils.toFiniteDuration(timeout),
-						Kill.getInstance()));
-
-				akkaTerminationFuture
-					.thenCombine(
-						akkaClient.getInternalTerminationFuture(),
-						(Boolean terminated, Void ignored) -> true)
-					.whenComplete(
-						(Boolean terminated, Throwable throwable) -> {
-							if (throwable != null) {
-								LOG.debug("Graceful RPC endpoint shutdown failed. Shutting endpoint down hard now.",
throwable);
-
-								actorSystem.stop(selfActorRef);
-								selfGateway.getTerminationFuture().completeExceptionally(throwable);
-							} else {
-								LOG.info("RPC endpoint {} has been shut down.", selfGateway.getAddress());
-								selfGateway.getTerminationFuture().complete(null);
-							}
-						});
+			if (rpcEndpoint != null) {
+				akkaClient.getActorRef().tell(Kill.getInstance(), ActorRef.noSender());
 			} else {
 				LOG.debug("RPC endpoint {} already stopped or from different RPC service", selfGateway.getAddress());
 			}
@@ -347,8 +314,6 @@ public class AkkaRpcService implements RpcService {
 	public void stopService() {
 		LOG.info("Stopping Akka RPC service.");
 
-		final List<RpcEndpoint> actorsToTerminate;
-
 		synchronized (lock) {
 			if (stopped) {
 				return;
@@ -356,35 +321,13 @@ public class AkkaRpcService implements RpcService {
 
 			stopped = true;
 
-			actorSystem.shutdown();
-
-			actorsToTerminate = new ArrayList<>(actors.values());
-
-			actors.clear();
 		}
 
+		actorSystem.shutdown();
 		actorSystem.awaitTermination();
 
-		// complete the termination futures of all actors
-		for (RpcEndpoint rpcEndpoint : actorsToTerminate) {
-			final CompletableFuture<Boolean> terminationFuture = rpcEndpoint.getTerminationFuture();
-
-			AkkaBasedEndpoint akkaBasedEndpoint = rpcEndpoint.getSelfGateway(AkkaBasedEndpoint.class);
-
-			CompletableFuture<Void> internalTerminationFuture = akkaBasedEndpoint.getInternalTerminationFuture();
-
-			internalTerminationFuture.whenComplete(
-				(Void ignored, Throwable throwable) -> {
-					if (throwable != null) {
-						terminationFuture.completeExceptionally(throwable);
-					} else {
-						terminationFuture.complete(true);
-					}
-				});
-
-			// make sure that if the internal termination futures haven't completed yet, then they
time out
-			internalTerminationFuture.completeExceptionally(
-				new TimeoutException("The RpcEndpoint " + rpcEndpoint.getAddress() + " did not terminate
in time."));
+		synchronized (lock) {
+			actors.clear();
 		}
 
 		LOG.info("Stopped Akka RPC service.");

http://git-wip-us.apache.org/repos/asf/flink/blob/51a27877/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaInvocationHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaInvocationHandler.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaInvocationHandler.java
index 03534ae..3ca75e2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaInvocationHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaInvocationHandler.java
@@ -61,9 +61,8 @@ public class FencedAkkaInvocationHandler<F extends Serializable> extends
AkkaInv
 			Time timeout,
 			long maximumFramesize,
 			@Nullable CompletableFuture<Boolean> terminationFuture,
-			@Nullable CompletableFuture<Void> internalTerminationFuture,
 			Supplier<F> fencingTokenSupplier) {
-		super(address, hostname, rpcEndpoint, timeout, maximumFramesize, terminationFuture, internalTerminationFuture);
+		super(address, hostname, rpcEndpoint, timeout, maximumFramesize, terminationFuture);
 
 		this.fencingTokenSupplier = Preconditions.checkNotNull(fencingTokenSupplier);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/51a27877/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaRpcActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaRpcActor.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaRpcActor.java
index fa83e4f..57280fd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaRpcActor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaRpcActor.java
@@ -19,9 +19,9 @@
 package org.apache.flink.runtime.rpc.akka;
 
 import org.apache.flink.runtime.rpc.FencedRpcEndpoint;
+import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.akka.exceptions.AkkaUnknownMessageException;
 import org.apache.flink.runtime.rpc.exceptions.FencingTokenException;
-import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.messages.FencedMessage;
 import org.apache.flink.runtime.rpc.messages.UnfencedMessage;
 
@@ -38,8 +38,8 @@ import java.util.concurrent.CompletableFuture;
  */
 public class FencedAkkaRpcActor<F extends Serializable, T extends FencedRpcEndpoint<F>
& RpcGateway> extends AkkaRpcActor<T> {
 
-	public FencedAkkaRpcActor(T rpcEndpoint, CompletableFuture<Void> internalTerminationFuture)
{
-		super(rpcEndpoint, internalTerminationFuture);
+	public FencedAkkaRpcActor(T rpcEndpoint, CompletableFuture<Boolean> terminationFuture)
{
+		super(rpcEndpoint, terminationFuture);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/51a27877/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
index a5c41ef..c4259f4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
@@ -137,12 +137,9 @@ public class AkkaRpcServiceTest extends TestLogger {
 
 	/**
 	 * Tests that we can wait for the termination of the rpc service
-	 *
-	 * @throws ExecutionException
-	 * @throws InterruptedException
 	 */
 	@Test(timeout = 60000)
-	public void testTerminationFuture() throws ExecutionException, InterruptedException {
+	public void testTerminationFuture() throws Exception {
 		final ActorSystem actorSystem = AkkaUtils.createDefaultActorSystem();
 		final AkkaRpcService rpcService = new AkkaRpcService(actorSystem, Time.milliseconds(1000));
 
@@ -150,7 +147,7 @@ public class AkkaRpcServiceTest extends TestLogger {
 
 		assertFalse(terminationFuture.isDone());
 
-		CompletableFuture.runAsync(() -> rpcService.stopService(), actorSystem.dispatcher());
+		CompletableFuture.runAsync(rpcService::stopService, actorSystem.dispatcher());
 
 		terminationFuture.get();
 	}


Mime
View raw message