flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [23/50] [abbrv] flink git commit: [FLINK-4694] [rpc] Add termination futures to RpcEndpoint and RpcService
Date Fri, 21 Oct 2016 12:22:03 GMT
[FLINK-4694] [rpc] Add termination futures to RpcEndpoint and RpcService

The termination futures can be used to wait for the termination of the respective component.

This closes #2558.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b48acd0a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b48acd0a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b48acd0a

Branch: refs/heads/flip-6
Commit: b48acd0a1e3775fec3f3728e300cd83441564a73
Parents: 0cac661
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Tue Sep 27 18:17:42 2016 +0200
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Thu Oct 20 19:47:56 2016 +0200

----------------------------------------------------------------------
 .../concurrent/impl/FlinkCompletableFuture.java | 11 +++---
 .../apache/flink/runtime/rpc/RpcEndpoint.java   |  9 +++++
 .../apache/flink/runtime/rpc/RpcService.java    |  7 ++++
 .../apache/flink/runtime/rpc/SelfGateway.java   | 34 ++++++++++++++++++
 .../runtime/rpc/akka/AkkaInvocationHandler.java | 22 ++++++++++--
 .../flink/runtime/rpc/akka/AkkaRpcActor.java    | 17 ++++++++-
 .../flink/runtime/rpc/akka/AkkaRpcService.java  | 32 +++++++++++++++--
 .../runtime/rpc/TestingSerialRpcService.java    | 10 +++++-
 .../runtime/rpc/akka/AkkaRpcActorTest.java      | 36 ++++++++++++++++++++
 .../runtime/rpc/akka/AkkaRpcServiceTest.java    | 29 ++++++++++++++++
 10 files changed, 193 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b48acd0a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkCompletableFuture.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkCompletableFuture.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkCompletableFuture.java
index e648a71..14686d7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkCompletableFuture.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkCompletableFuture.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.concurrent.impl;
 
 import akka.dispatch.Futures;
 import org.apache.flink.runtime.concurrent.CompletableFuture;
-import org.apache.flink.util.Preconditions;
 import scala.concurrent.Promise;
 import scala.concurrent.Promise$;
 
@@ -52,8 +51,6 @@ public class FlinkCompletableFuture<T> extends FlinkFuture<T>
implements Complet
 
 	@Override
 	public boolean complete(T value) {
-		Preconditions.checkNotNull(value);
-
 		try {
 			promise.success(value);
 
@@ -65,10 +62,12 @@ public class FlinkCompletableFuture<T> extends FlinkFuture<T>
implements Complet
 
 	@Override
 	public boolean completeExceptionally(Throwable t) {
-		Preconditions.checkNotNull(t);
-
 		try {
-			promise.failure(t);
+			if (t == null) {
+				promise.failure(new NullPointerException("Throwable was null."));
+			} else {
+				promise.failure(t);
+			}
 
 			return true;
 		} catch (IllegalStateException e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/b48acd0a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
index 79961f7..f93a2e2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
@@ -173,6 +173,15 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
 		return rpcService;
 	}
 
+	/**
+	 * Return a future which is completed when the rpc endpoint has been terminated.
+	 *
+	 * @return Future which is completed when the rpc endpoint has been terminated.
+	 */
+	public Future<Void> getTerminationFuture() {
+		return ((SelfGateway)self).getTerminationFuture();
+	}
+
 	// ------------------------------------------------------------------------
 	//  Asynchronous executions
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/b48acd0a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
index 96844ed..2052f98 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
@@ -77,6 +77,13 @@ public interface RpcService {
 	void stopService();
 
 	/**
+	 * Returns a future indicating when the RPC service has been shut down.
+	 *
+	 * @return Termination future
+	 */
+	Future<Void> getTerminationFuture();
+
+	/**
 	 * Gets the executor, provided by this RPC service. This executor can be used for example
for
 	 * the {@code handleAsync(...)} or {@code thenAcceptAsync(...)} methods of futures.
 	 * 

http://git-wip-us.apache.org/repos/asf/flink/blob/b48acd0a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/SelfGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/SelfGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/SelfGateway.java
new file mode 100644
index 0000000..ed8ef9d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/SelfGateway.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc;
+
+import org.apache.flink.runtime.concurrent.Future;
+
+/**
+ * Interface for self gateways
+ */
+public interface SelfGateway {
+
+	/**
+	 * Return a future which is completed when the rpc endpoint has been terminated.
+	 *
+	 * @return Future indicating when the rpc endpoint has been terminated
+	 */
+	Future<Void> getTerminationFuture();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b48acd0a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
index 8f4deff..709ff92 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
 import org.apache.flink.runtime.rpc.MainThreadExecutable;
+import org.apache.flink.runtime.rpc.SelfGateway;
 import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcTimeout;
 import org.apache.flink.runtime.rpc.StartStoppable;
@@ -52,7 +53,7 @@ import static org.apache.flink.util.Preconditions.checkArgument;
  * rpc in a {@link LocalRpcInvocation} message and then sends it to the {@link AkkaRpcActor}
where it is
  * executed.
  */
-class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, MainThreadExecutable,
StartStoppable {
+class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, MainThreadExecutable,
StartStoppable, SelfGateway {
 	private static final Logger LOG = Logger.getLogger(AkkaInvocationHandler.class);
 
 	private final String address;
@@ -67,12 +68,22 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaGateway,
MainThrea
 
 	private final long maximumFramesize;
 
-	AkkaInvocationHandler(String address, ActorRef rpcEndpoint, Time timeout, long maximumFramesize)
{
+	// null if gateway; otherwise non-null
+	private final Future<Void> terminationFuture;
+
+	AkkaInvocationHandler(
+			String address,
+			ActorRef rpcEndpoint,
+			Time timeout,
+			long maximumFramesize,
+			Future<Void> terminationFuture) {
+
 		this.address = Preconditions.checkNotNull(address);
 		this.rpcEndpoint = Preconditions.checkNotNull(rpcEndpoint);
 		this.isLocal = this.rpcEndpoint.path().address().hasLocalScope();
 		this.timeout = Preconditions.checkNotNull(timeout);
 		this.maximumFramesize = maximumFramesize;
+		this.terminationFuture = terminationFuture;
 	}
 
 	@Override
@@ -83,7 +94,7 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, MainThrea
 
 		if (declaringClass.equals(AkkaGateway.class) || declaringClass.equals(MainThreadExecutable.class)
||
 			declaringClass.equals(Object.class) || declaringClass.equals(StartStoppable.class) ||
-			declaringClass.equals(RpcGateway.class)) {
+			declaringClass.equals(RpcGateway.class) || declaringClass.equals(SelfGateway.class)) {
 			result = method.invoke(this, args);
 		} else {
 			String methodName = method.getName();
@@ -300,4 +311,9 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaGateway,
MainThrea
 	public String getAddress() {
 		return address;
 	}
+
+	@Override
+	public Future<Void> getTerminationFuture() {
+		return terminationFuture;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b48acd0a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
index 1b456a7..c21383a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
@@ -24,6 +24,7 @@ import akka.actor.UntypedActorWithStash;
 import akka.dispatch.Futures;
 import akka.japi.Procedure;
 import akka.pattern.Patterns;
+import org.apache.flink.runtime.concurrent.CompletableFuture;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
 import org.apache.flink.runtime.rpc.MainThreadValidatorUtil;
@@ -76,9 +77,23 @@ class AkkaRpcActor<C extends RpcGateway, T extends RpcEndpoint<C>>
extends Untyp
 	/** the helper that tracks whether calls come from the main thread */
 	private final MainThreadValidatorUtil mainThreadValidator;
 
-	AkkaRpcActor(final T rpcEndpoint) {
+	private final CompletableFuture<Void> terminationFuture;
+
+	AkkaRpcActor(final T rpcEndpoint, final CompletableFuture<Void> terminationFuture)
{
 		this.rpcEndpoint = checkNotNull(rpcEndpoint, "rpc endpoint");
 		this.mainThreadValidator = new MainThreadValidatorUtil(rpcEndpoint);
+		this.terminationFuture = checkNotNull(terminationFuture);
+	}
+
+	@Override
+	public void postStop() {
+		super.postStop();
+
+		// IMPORTANT: This only works if we don't use a restarting supervisor strategy. Otherwise
+		// we would complete the future and let the actor system restart the actor with a completed
+		// future.
+		// Complete the termination future so that others know that we've stopped.
+		terminationFuture.complete(null);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/b48acd0a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
index fb7896a..44719c8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
@@ -32,9 +32,12 @@ import akka.dispatch.Mapper;
 import akka.pattern.Patterns;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.concurrent.CompletableFuture;
 import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
 import org.apache.flink.runtime.rpc.MainThreadExecutable;
+import org.apache.flink.runtime.rpc.SelfGateway;
 import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcService;
@@ -131,7 +134,12 @@ public class AkkaRpcService implements RpcService {
 
 					final String address = AkkaUtils.getAkkaURL(actorSystem, actorRef);
 
-					InvocationHandler akkaInvocationHandler = new AkkaInvocationHandler(address, actorRef,
timeout, maximumFramesize);
+					InvocationHandler akkaInvocationHandler = new AkkaInvocationHandler(
+						address,
+						actorRef,
+						timeout,
+						maximumFramesize,
+						null);
 
 					// Rather than using the System ClassLoader directly, we derive the ClassLoader
 					// from this class . That works better in cases where Flink runs embedded and all Flink
@@ -156,7 +164,8 @@ public class AkkaRpcService implements RpcService {
 	public <C extends RpcGateway, S extends RpcEndpoint<C>> C startServer(S rpcEndpoint)
{
 		checkNotNull(rpcEndpoint, "rpc endpoint");
 
-		Props akkaRpcActorProps = Props.create(AkkaRpcActor.class, rpcEndpoint);
+		CompletableFuture<Void> terminationFuture = new FlinkCompletableFuture<>();
+		Props akkaRpcActorProps = Props.create(AkkaRpcActor.class, rpcEndpoint, terminationFuture);
 		ActorRef actorRef;
 
 		synchronized (lock) {
@@ -169,7 +178,12 @@ public class AkkaRpcService implements RpcService {
 
 		final String address = AkkaUtils.getAkkaURL(actorSystem, actorRef);
 
-		InvocationHandler akkaInvocationHandler = new AkkaInvocationHandler(address, actorRef,
timeout, maximumFramesize);
+		InvocationHandler akkaInvocationHandler = new AkkaInvocationHandler(
+			address,
+			actorRef,
+			timeout,
+			maximumFramesize,
+			terminationFuture);
 
 		// Rather than using the System ClassLoader directly, we derive the ClassLoader
 		// from this class . That works better in cases where Flink runs embedded and all Flink
@@ -181,6 +195,7 @@ public class AkkaRpcService implements RpcService {
 			classLoader,
 			new Class<?>[]{
 				rpcEndpoint.getSelfGatewayType(),
+				SelfGateway.class,
 				MainThreadExecutable.class,
 				StartStoppable.class,
 				AkkaGateway.class},
@@ -231,6 +246,17 @@ public class AkkaRpcService implements RpcService {
 	}
 
 	@Override
+	public Future<Void> getTerminationFuture() {
+		return FlinkFuture.supplyAsync(new Callable<Void>(){
+			@Override
+			public Void call() throws Exception {
+				actorSystem.awaitTermination();
+				return null;
+			}
+		}, getExecutor());
+	}
+
+	@Override
 	public Executor getExecutor() {
 		return actorSystem.dispatcher();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/b48acd0a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
index 2a004c5..88906a7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.rpc;
 import akka.dispatch.Futures;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.concurrent.CompletableFuture;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.util.DirectExecutorService;
@@ -39,7 +40,6 @@ import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
-
 /**
  * An RPC Service implementation for testing. This RPC service directly executes all asynchronous
  * calls one by one in the calling thread.
@@ -48,10 +48,12 @@ public class TestingSerialRpcService implements RpcService {
 
 	private final DirectExecutorService executorService;
 	private final ConcurrentHashMap<String, RpcGateway> registeredConnections;
+	private final CompletableFuture<Void> terminationFuture;
 
 	public TestingSerialRpcService() {
 		executorService = new DirectExecutorService();
 		this.registeredConnections = new ConcurrentHashMap<>(16);
+		this.terminationFuture = new FlinkCompletableFuture<>();
 	}
 
 	@Override
@@ -89,6 +91,12 @@ public class TestingSerialRpcService implements RpcService {
 	public void stopService() {
 		executorService.shutdown();
 		registeredConnections.clear();
+		terminationFuture.complete(null);
+	}
+
+	@Override
+	public Future<Void> getTerminationFuture() {
+		return terminationFuture;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/b48acd0a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
index 5d76024..ba8eb11 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
@@ -22,6 +22,8 @@ import akka.actor.ActorSystem;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcMethod;
@@ -32,9 +34,15 @@ import org.hamcrest.core.Is;
 import org.junit.AfterClass;
 import org.junit.Test;
 
+import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -148,6 +156,34 @@ public class AkkaRpcActorTest extends TestLogger {
 		}
 	}
 
+	/**
+	 * Tests that we can wait for a RpcEndpoint to terminate.
+	 *
+	 * @throws ExecutionException
+	 * @throws InterruptedException
+	 */
+	@Test(timeout=1000)
+	public void testRpcEndpointTerminationFuture() throws ExecutionException, InterruptedException
{
+		final DummyRpcEndpoint rpcEndpoint = new DummyRpcEndpoint(akkaRpcService);
+		rpcEndpoint.start();
+
+		Future<Void> terminationFuture = rpcEndpoint.getTerminationFuture();
+
+		assertFalse(terminationFuture.isDone());
+
+		FlinkFuture.supplyAsync(new Callable<Void>() {
+			@Override
+			public Void call() throws Exception {
+				rpcEndpoint.shutDown();
+
+				return null;
+			}
+		}, actorSystem.dispatcher());
+
+		// wait until the rpc endpoint has terminated
+		terminationFuture.get();
+	}
+
 	private interface DummyRpcGateway extends RpcGateway {
 		Future<Integer> foobar();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/b48acd0a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
index 3388011..7c8defa 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.AfterClass;
@@ -34,6 +35,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 public class AkkaRpcServiceTest extends TestLogger {
@@ -120,4 +122,31 @@ public class AkkaRpcServiceTest extends TestLogger {
 	public void testGetAddress() {
 		assertEquals(AkkaUtils.getAddress(actorSystem).host().get(), akkaRpcService.getAddress());
 	}
+
+	/**
+	 * Tests that we can wait for the termination of the rpc service
+	 *
+	 * @throws ExecutionException
+	 * @throws InterruptedException
+	 */
+	@Test(timeout = 1000)
+	public void testTerminationFuture() throws ExecutionException, InterruptedException {
+		final ActorSystem actorSystem = AkkaUtils.createDefaultActorSystem();
+		final AkkaRpcService rpcService = new AkkaRpcService(actorSystem, Time.milliseconds(1000));
+
+		Future<Void> terminationFuture = rpcService.getTerminationFuture();
+
+		assertFalse(terminationFuture.isDone());
+
+		FlinkFuture.supplyAsync(new Callable<Void>() {
+			@Override
+			public Void call() throws Exception {
+				rpcService.stopService();
+
+				return null;
+			}
+		}, actorSystem.dispatcher());
+
+		terminationFuture.get();
+	}
 }


Mime
View raw message