Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id A494E200BB4 for ; Tue, 1 Nov 2016 09:40:31 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id A3229160B17; Tue, 1 Nov 2016 08:40:31 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 6C4C1160AE5 for ; Tue, 1 Nov 2016 09:40:30 +0100 (CET) Received: (qmail 10127 invoked by uid 500); 1 Nov 2016 08:40:29 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 8993 invoked by uid 99); 1 Nov 2016 08:40:28 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 01 Nov 2016 08:40:28 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 743F1E0C0A; Tue, 1 Nov 2016 08:40:28 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: trohrmann@apache.org To: commits@flink.apache.org Date: Tue, 01 Nov 2016 08:40:48 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [21/50] [abbrv] flink git commit: [FLINK-4694] [rpc] Add termination futures to RpcEndpoint and RpcService archived-at: Tue, 01 Nov 2016 08:40:31 -0000 [FLINK-4694] [rpc] Add termination futures to RpcEndpoint and RpcService The termination futures can be used to wait for the termination of the respective component. This closes #2558. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1a9e6447 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1a9e6447 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1a9e6447 Branch: refs/heads/flip-6 Commit: 1a9e6447dbb63d138f97b55c15d1158b66e32286 Parents: 8adceed Author: Till Rohrmann Authored: Tue Sep 27 18:17:42 2016 +0200 Committer: Till Rohrmann Committed: Tue Nov 1 09:39:30 2016 +0100 ---------------------------------------------------------------------- .../concurrent/impl/FlinkCompletableFuture.java | 9 ++--- .../apache/flink/runtime/rpc/RpcEndpoint.java | 9 +++++ .../apache/flink/runtime/rpc/RpcService.java | 7 ++++ .../apache/flink/runtime/rpc/SelfGateway.java | 34 ++++++++++++++++++ .../runtime/rpc/akka/AkkaInvocationHandler.java | 22 ++++++++++-- .../flink/runtime/rpc/akka/AkkaRpcActor.java | 17 ++++++++- .../flink/runtime/rpc/akka/AkkaRpcService.java | 32 +++++++++++++++-- .../runtime/rpc/TestingSerialRpcService.java | 10 +++++- .../runtime/rpc/akka/AkkaRpcActorTest.java | 36 ++++++++++++++++++++ .../runtime/rpc/akka/AkkaRpcServiceTest.java | 29 ++++++++++++++++ 10 files changed, 193 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/1a9e6447/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkCompletableFuture.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkCompletableFuture.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkCompletableFuture.java index c8b86ed..14686d7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkCompletableFuture.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkCompletableFuture.java @@ -20,7 +20,6 @@ package org.apache.flink.runtime.concurrent.impl; import akka.dispatch.Futures; import org.apache.flink.runtime.concurrent.CompletableFuture; -import org.apache.flink.util.Preconditions; import scala.concurrent.Promise; import scala.concurrent.Promise$; @@ -63,10 +62,12 @@ public class FlinkCompletableFuture extends FlinkFuture implements Complet @Override public boolean completeExceptionally(Throwable t) { - Preconditions.checkNotNull(t); - try { - promise.failure(t); + if (t == null) { + promise.failure(new NullPointerException("Throwable was null.")); + } else { + promise.failure(t); + } return true; } catch (IllegalStateException e) { http://git-wip-us.apache.org/repos/asf/flink/blob/1a9e6447/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java index 79961f7..f93a2e2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java @@ -173,6 +173,15 @@ public abstract class RpcEndpoint { return rpcService; } + /** + * Return a future which is completed when the rpc endpoint has been terminated. + * + * @return Future which is completed when the rpc endpoint has been terminated. + */ + public Future getTerminationFuture() { + return ((SelfGateway)self).getTerminationFuture(); + } + // ------------------------------------------------------------------------ // Asynchronous executions // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/1a9e6447/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java index 96844ed..2052f98 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java @@ -77,6 +77,13 @@ public interface RpcService { void stopService(); /** + * Returns a future indicating when the RPC service has been shut down. + * + * @return Termination future + */ + Future getTerminationFuture(); + + /** * Gets the executor, provided by this RPC service. This executor can be used for example for * the {@code handleAsync(...)} or {@code thenAcceptAsync(...)} methods of futures. * http://git-wip-us.apache.org/repos/asf/flink/blob/1a9e6447/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/SelfGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/SelfGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/SelfGateway.java new file mode 100644 index 0000000..ed8ef9d --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/SelfGateway.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc; + +import org.apache.flink.runtime.concurrent.Future; + +/** + * Interface for self gateways + */ +public interface SelfGateway { + + /** + * Return a future which is completed when the rpc endpoint has been terminated. + * + * @return Future indicating when the rpc endpoint has been terminated + */ + Future getTerminationFuture(); +} http://git-wip-us.apache.org/repos/asf/flink/blob/1a9e6447/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java index 8f4deff..709ff92 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java @@ -25,6 +25,7 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.concurrent.impl.FlinkFuture; import org.apache.flink.runtime.rpc.MainThreadExecutable; +import org.apache.flink.runtime.rpc.SelfGateway; import org.apache.flink.runtime.rpc.RpcGateway; import org.apache.flink.runtime.rpc.RpcTimeout; import org.apache.flink.runtime.rpc.StartStoppable; @@ -52,7 +53,7 @@ import static org.apache.flink.util.Preconditions.checkArgument; * rpc in a {@link LocalRpcInvocation} message and then sends it to the {@link AkkaRpcActor} where it is * executed. */ -class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, MainThreadExecutable, StartStoppable { +class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, MainThreadExecutable, StartStoppable, SelfGateway { private static final Logger LOG = Logger.getLogger(AkkaInvocationHandler.class); private final String address; @@ -67,12 +68,22 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, MainThrea private final long maximumFramesize; - AkkaInvocationHandler(String address, ActorRef rpcEndpoint, Time timeout, long maximumFramesize) { + // null if gateway; otherwise non-null + private final Future terminationFuture; + + AkkaInvocationHandler( + String address, + ActorRef rpcEndpoint, + Time timeout, + long maximumFramesize, + Future terminationFuture) { + this.address = Preconditions.checkNotNull(address); this.rpcEndpoint = Preconditions.checkNotNull(rpcEndpoint); this.isLocal = this.rpcEndpoint.path().address().hasLocalScope(); this.timeout = Preconditions.checkNotNull(timeout); this.maximumFramesize = maximumFramesize; + this.terminationFuture = terminationFuture; } @Override @@ -83,7 +94,7 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, MainThrea if (declaringClass.equals(AkkaGateway.class) || declaringClass.equals(MainThreadExecutable.class) || declaringClass.equals(Object.class) || declaringClass.equals(StartStoppable.class) || - declaringClass.equals(RpcGateway.class)) { + declaringClass.equals(RpcGateway.class) || declaringClass.equals(SelfGateway.class)) { result = method.invoke(this, args); } else { String methodName = method.getName(); @@ -300,4 +311,9 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, MainThrea public String getAddress() { return address; } + + @Override + public Future getTerminationFuture() { + return terminationFuture; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/1a9e6447/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java index 1b456a7..c21383a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java @@ -24,6 +24,7 @@ import akka.actor.UntypedActorWithStash; import akka.dispatch.Futures; import akka.japi.Procedure; import akka.pattern.Patterns; +import org.apache.flink.runtime.concurrent.CompletableFuture; import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.concurrent.impl.FlinkFuture; import org.apache.flink.runtime.rpc.MainThreadValidatorUtil; @@ -76,9 +77,23 @@ class AkkaRpcActor> extends Untyp /** the helper that tracks whether calls come from the main thread */ private final MainThreadValidatorUtil mainThreadValidator; - AkkaRpcActor(final T rpcEndpoint) { + private final CompletableFuture terminationFuture; + + AkkaRpcActor(final T rpcEndpoint, final CompletableFuture terminationFuture) { this.rpcEndpoint = checkNotNull(rpcEndpoint, "rpc endpoint"); this.mainThreadValidator = new MainThreadValidatorUtil(rpcEndpoint); + this.terminationFuture = checkNotNull(terminationFuture); + } + + @Override + public void postStop() { + super.postStop(); + + // IMPORTANT: This only works if we don't use a restarting supervisor strategy. Otherwise + // we would complete the future and let the actor system restart the actor with a completed + // future. + // Complete the termination future so that others know that we've stopped. + terminationFuture.complete(null); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/1a9e6447/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java index fb7896a..44719c8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java @@ -32,9 +32,12 @@ import akka.dispatch.Mapper; import akka.pattern.Patterns; import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.concurrent.CompletableFuture; import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; import org.apache.flink.runtime.concurrent.impl.FlinkFuture; import org.apache.flink.runtime.rpc.MainThreadExecutable; +import org.apache.flink.runtime.rpc.SelfGateway; import org.apache.flink.runtime.rpc.RpcGateway; import org.apache.flink.runtime.rpc.RpcEndpoint; import org.apache.flink.runtime.rpc.RpcService; @@ -131,7 +134,12 @@ public class AkkaRpcService implements RpcService { final String address = AkkaUtils.getAkkaURL(actorSystem, actorRef); - InvocationHandler akkaInvocationHandler = new AkkaInvocationHandler(address, actorRef, timeout, maximumFramesize); + InvocationHandler akkaInvocationHandler = new AkkaInvocationHandler( + address, + actorRef, + timeout, + maximumFramesize, + null); // Rather than using the System ClassLoader directly, we derive the ClassLoader // from this class . That works better in cases where Flink runs embedded and all Flink @@ -156,7 +164,8 @@ public class AkkaRpcService implements RpcService { public > C startServer(S rpcEndpoint) { checkNotNull(rpcEndpoint, "rpc endpoint"); - Props akkaRpcActorProps = Props.create(AkkaRpcActor.class, rpcEndpoint); + CompletableFuture terminationFuture = new FlinkCompletableFuture<>(); + Props akkaRpcActorProps = Props.create(AkkaRpcActor.class, rpcEndpoint, terminationFuture); ActorRef actorRef; synchronized (lock) { @@ -169,7 +178,12 @@ public class AkkaRpcService implements RpcService { final String address = AkkaUtils.getAkkaURL(actorSystem, actorRef); - InvocationHandler akkaInvocationHandler = new AkkaInvocationHandler(address, actorRef, timeout, maximumFramesize); + InvocationHandler akkaInvocationHandler = new AkkaInvocationHandler( + address, + actorRef, + timeout, + maximumFramesize, + terminationFuture); // Rather than using the System ClassLoader directly, we derive the ClassLoader // from this class . That works better in cases where Flink runs embedded and all Flink @@ -181,6 +195,7 @@ public class AkkaRpcService implements RpcService { classLoader, new Class[]{ rpcEndpoint.getSelfGatewayType(), + SelfGateway.class, MainThreadExecutable.class, StartStoppable.class, AkkaGateway.class}, @@ -231,6 +246,17 @@ public class AkkaRpcService implements RpcService { } @Override + public Future getTerminationFuture() { + return FlinkFuture.supplyAsync(new Callable(){ + @Override + public Void call() throws Exception { + actorSystem.awaitTermination(); + return null; + } + }, getExecutor()); + } + + @Override public Executor getExecutor() { return actorSystem.dispatcher(); } http://git-wip-us.apache.org/repos/asf/flink/blob/1a9e6447/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java index 2a004c5..88906a7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.rpc; import akka.dispatch.Futures; import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.concurrent.CompletableFuture; import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; import org.apache.flink.runtime.util.DirectExecutorService; @@ -39,7 +40,6 @@ import java.util.concurrent.TimeUnit; import static org.apache.flink.util.Preconditions.checkNotNull; - /** * An RPC Service implementation for testing. This RPC service directly executes all asynchronous * calls one by one in the calling thread. @@ -48,10 +48,12 @@ public class TestingSerialRpcService implements RpcService { private final DirectExecutorService executorService; private final ConcurrentHashMap registeredConnections; + private final CompletableFuture terminationFuture; public TestingSerialRpcService() { executorService = new DirectExecutorService(); this.registeredConnections = new ConcurrentHashMap<>(16); + this.terminationFuture = new FlinkCompletableFuture<>(); } @Override @@ -89,6 +91,12 @@ public class TestingSerialRpcService implements RpcService { public void stopService() { executorService.shutdown(); registeredConnections.clear(); + terminationFuture.complete(null); + } + + @Override + public Future getTerminationFuture() { + return terminationFuture; } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/1a9e6447/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java index 5d76024..ba8eb11 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java @@ -22,6 +22,8 @@ import akka.actor.ActorSystem; import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; +import org.apache.flink.runtime.concurrent.impl.FlinkFuture; import org.apache.flink.runtime.rpc.RpcEndpoint; import org.apache.flink.runtime.rpc.RpcGateway; import org.apache.flink.runtime.rpc.RpcMethod; @@ -32,9 +34,15 @@ import org.hamcrest.core.Is; import org.junit.AfterClass; import org.junit.Test; +import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -148,6 +156,34 @@ public class AkkaRpcActorTest extends TestLogger { } } + /** + * Tests that we can wait for a RpcEndpoint to terminate. + * + * @throws ExecutionException + * @throws InterruptedException + */ + @Test(timeout=1000) + public void testRpcEndpointTerminationFuture() throws ExecutionException, InterruptedException { + final DummyRpcEndpoint rpcEndpoint = new DummyRpcEndpoint(akkaRpcService); + rpcEndpoint.start(); + + Future terminationFuture = rpcEndpoint.getTerminationFuture(); + + assertFalse(terminationFuture.isDone()); + + FlinkFuture.supplyAsync(new Callable() { + @Override + public Void call() throws Exception { + rpcEndpoint.shutDown(); + + return null; + } + }, actorSystem.dispatcher()); + + // wait until the rpc endpoint has terminated + terminationFuture.get(); + } + private interface DummyRpcGateway extends RpcGateway { Future foobar(); } http://git-wip-us.apache.org/repos/asf/flink/blob/1a9e6447/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java index 3388011..7c8defa 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.time.Time; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.concurrent.impl.FlinkFuture; import org.apache.flink.util.TestLogger; import org.junit.AfterClass; @@ -34,6 +35,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; public class AkkaRpcServiceTest extends TestLogger { @@ -120,4 +122,31 @@ public class AkkaRpcServiceTest extends TestLogger { public void testGetAddress() { assertEquals(AkkaUtils.getAddress(actorSystem).host().get(), akkaRpcService.getAddress()); } + + /** + * Tests that we can wait for the termination of the rpc service + * + * @throws ExecutionException + * @throws InterruptedException + */ + @Test(timeout = 1000) + public void testTerminationFuture() throws ExecutionException, InterruptedException { + final ActorSystem actorSystem = AkkaUtils.createDefaultActorSystem(); + final AkkaRpcService rpcService = new AkkaRpcService(actorSystem, Time.milliseconds(1000)); + + Future terminationFuture = rpcService.getTerminationFuture(); + + assertFalse(terminationFuture.isDone()); + + FlinkFuture.supplyAsync(new Callable() { + @Override + public Void call() throws Exception { + rpcService.stopService(); + + return null; + } + }, actorSystem.dispatcher()); + + terminationFuture.get(); + } }