flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [31/50] [abbrv] flink git commit: [FLINK-4382] [rpc] Buffer rpc calls until the RpcEndpoint has been started
Date Thu, 08 Sep 2016 15:28:53 GMT
[FLINK-4382] [rpc] Buffer rpc calls until the RpcEndpoint has been started

This PR allows the AkkaRpcActor to stash messages until the corresponding RcpEndpoint
has been started. When receiving a Processing.START message, the AkkaRpcActor
unstashes all messages and starts processing rpcs. When receiving a Processing.STOP
message, it will stop processing messages and stash incoming messages again.

Add test case for message stashing

This closes #2358.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2c54a61b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2c54a61b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2c54a61b

Branch: refs/heads/flip-6
Commit: 2c54a61b7e7f6c96834a7f41c39b4a3e4231b19a
Parents: e966f82
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Thu Aug 11 18:13:25 2016 +0200
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Thu Sep 8 17:26:58 2016 +0200

----------------------------------------------------------------------
 .../apache/flink/runtime/rpc/RpcEndpoint.java   |  15 ++-
 .../flink/runtime/rpc/StartStoppable.java       |  35 ++++++
 .../runtime/rpc/akka/AkkaInvocationHandler.java |  21 +++-
 .../flink/runtime/rpc/akka/AkkaRpcActor.java    |  39 ++++++-
 .../flink/runtime/rpc/akka/AkkaRpcService.java  |   8 +-
 .../runtime/rpc/akka/messages/Processing.java   |  27 +++++
 .../flink/runtime/rpc/RpcCompletenessTest.java  |  45 +++++++-
 .../runtime/rpc/akka/AkkaRpcActorTest.java      | 108 +++++++++++++++++++
 .../runtime/rpc/akka/AkkaRpcServiceTest.java    |   3 +
 .../flink/runtime/rpc/akka/AsyncCallsTest.java  |   5 +-
 .../rpc/akka/MainThreadValidationTest.java      |   4 +-
 .../rpc/akka/MessageSerializationTest.java      |   4 +
 .../rpc/taskexecutor/TaskExecutorTest.java      |  18 ++++
 13 files changed, 315 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2c54a61b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
index d36a283..67ac182 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
@@ -74,7 +74,7 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
 
 	/** The main thread execution context to be used to execute future callbacks in the main
thread
 	 * of the executing rpc server. */
-	private final MainThreadExecutionContext mainThreadExecutionContext;
+	private final ExecutionContext mainThreadExecutionContext;
 
 	/** A reference to the endpoint's main thread, if the current method is called by the main
thread */
 	final AtomicReference<Thread> currentMainThread = new AtomicReference<>(null);

@@ -106,10 +106,21 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
 	}
 	
 	// ------------------------------------------------------------------------
-	//  Shutdown
+	//  Start & Shutdown
 	// ------------------------------------------------------------------------
 
 	/**
+	 * Starts the rpc endpoint. This tells the underlying rpc server that the rpc endpoint is
ready
+	 * to process remote procedure calls.
+	 *
+	 * IMPORTANT: Whenever you override this method, call the parent implementation to enable
+	 * rpc processing. It is advised to make the parent call last.
+	 */
+	public void start() {
+		((StartStoppable) self).start();
+	}
+
+	/**
 	 * Shuts down the underlying RPC endpoint via the RPC service.
 	 * After this method was called, the RPC endpoint will no longer be reachable, neither remotely,
 	 * not via its {@link #getSelf() self gateway}. It will also not accepts executions in main
thread

http://git-wip-us.apache.org/repos/asf/flink/blob/2c54a61b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/StartStoppable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/StartStoppable.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/StartStoppable.java
new file mode 100644
index 0000000..dd5595f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/StartStoppable.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc;
+
+/**
+ * Interface to start and stop the processing of rpc calls in the rpc server.
+ */
+public interface StartStoppable {
+
+	/**
+	 * Starts the processing of remote procedure calls.
+	 */
+	void start();
+
+	/**
+	 * Stops the processing of remote procedure calls.
+	 */
+	void stop();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c54a61b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
index 297104b..524bf74 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
@@ -24,8 +24,10 @@ import akka.util.Timeout;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.rpc.MainThreadExecutor;
 import org.apache.flink.runtime.rpc.RpcTimeout;
+import org.apache.flink.runtime.rpc.StartStoppable;
 import org.apache.flink.runtime.rpc.akka.messages.CallAsync;
 import org.apache.flink.runtime.rpc.akka.messages.LocalRpcInvocation;
+import org.apache.flink.runtime.rpc.akka.messages.Processing;
 import org.apache.flink.runtime.rpc.akka.messages.RemoteRpcInvocation;
 import org.apache.flink.runtime.rpc.akka.messages.RpcInvocation;
 import org.apache.flink.runtime.rpc.akka.messages.RunAsync;
@@ -50,7 +52,7 @@ import static org.apache.flink.util.Preconditions.checkArgument;
  * rpc in a {@link LocalRpcInvocation} message and then sends it to the {@link AkkaRpcActor}
where it is
  * executed.
  */
-class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, MainThreadExecutor
{
+class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, MainThreadExecutor,
StartStoppable {
 	private static final Logger LOG = Logger.getLogger(AkkaInvocationHandler.class);
 
 	private final ActorRef rpcEndpoint;
@@ -76,7 +78,8 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, MainThrea
 
 		Object result;
 
-		if (declaringClass.equals(AkkaGateway.class) || declaringClass.equals(MainThreadExecutor.class)
|| declaringClass.equals(Object.class)) {
+		if (declaringClass.equals(AkkaGateway.class) || declaringClass.equals(MainThreadExecutor.class)
||
+			declaringClass.equals(Object.class) || declaringClass.equals(StartStoppable.class)) {
 			result = method.invoke(this, args);
 		} else {
 			String methodName = method.getName();
@@ -171,6 +174,20 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaGateway,
MainThrea
 		}
 	}
 
+	@Override
+	public void start() {
+		rpcEndpoint.tell(Processing.START, ActorRef.noSender());
+	}
+
+	@Override
+	public void stop() {
+		rpcEndpoint.tell(Processing.STOP, ActorRef.noSender());
+	}
+
+	// ------------------------------------------------------------------------
+	//  Helper methods
+	// ------------------------------------------------------------------------
+
 	/**
 	 * Extracts the {@link RpcTimeout} annotated rpc timeout value from the list of given method
 	 * arguments. If no {@link RpcTimeout} annotated parameter could be found, then the default

http://git-wip-us.apache.org/repos/asf/flink/blob/2c54a61b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
index dfcbcc3..2373be9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
@@ -20,13 +20,15 @@ package org.apache.flink.runtime.rpc.akka;
 
 import akka.actor.ActorRef;
 import akka.actor.Status;
-import akka.actor.UntypedActor;
+import akka.actor.UntypedActorWithStash;
+import akka.japi.Procedure;
 import akka.pattern.Patterns;
 import org.apache.flink.runtime.rpc.MainThreadValidatorUtil;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.akka.messages.CallAsync;
 import org.apache.flink.runtime.rpc.akka.messages.LocalRpcInvocation;
+import org.apache.flink.runtime.rpc.akka.messages.Processing;
 import org.apache.flink.runtime.rpc.akka.messages.RpcInvocation;
 import org.apache.flink.runtime.rpc.akka.messages.RunAsync;
 
@@ -45,18 +47,23 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * Akka rpc actor which receives {@link LocalRpcInvocation}, {@link RunAsync} and {@link
CallAsync}
- * messages.
+ * {@link Processing} messages.
  * <p>
  * The {@link LocalRpcInvocation} designates a rpc and is dispatched to the given {@link
RpcEndpoint}
  * instance.
  * <p>
  * The {@link RunAsync} and {@link CallAsync} messages contain executable code which is executed
  * in the context of the actor thread.
+ * <p>
+ * The {@link Processing} message controls the processing behaviour of the akka rpc actor.
A
+ * {@link Processing#START} message unstashes all stashed messages and starts processing
incoming
+ * messages. A {@link Processing#STOP} message stops processing messages and stashes incoming
+ * messages.
  *
  * @param <C> Type of the {@link RpcGateway} associated with the {@link RpcEndpoint}
  * @param <T> Type of the {@link RpcEndpoint}
  */
-class AkkaRpcActor<C extends RpcGateway, T extends RpcEndpoint<C>> extends UntypedActor
{
+class AkkaRpcActor<C extends RpcGateway, T extends RpcEndpoint<C>> extends UntypedActorWithStash
{
 	
 	private static final Logger LOG = LoggerFactory.getLogger(AkkaRpcActor.class);
 
@@ -73,6 +80,27 @@ class AkkaRpcActor<C extends RpcGateway, T extends RpcEndpoint<C>>
extends Untyp
 
 	@Override
 	public void onReceive(final Object message) {
+		if (message.equals(Processing.START)) {
+			unstashAll();
+			getContext().become(new Procedure<Object>() {
+				@Override
+				public void apply(Object message) throws Exception {
+					if (message.equals(Processing.STOP)) {
+						getContext().unbecome();
+					} else {
+						handleMessage(message);
+					}
+				}
+			});
+		} else {
+			LOG.info("The rpc endpoint {} has not been started yet. Stashing message {} until processing
is started.",
+				rpcEndpoint.getClass().getName(),
+				message.getClass().getName());
+			stash();
+		}
+	}
+
+	private void handleMessage(Object message) {
 		mainThreadValidator.enterMainThread();
 		try {
 			if (message instanceof RunAsync) {
@@ -82,7 +110,10 @@ class AkkaRpcActor<C extends RpcGateway, T extends RpcEndpoint<C>>
extends Untyp
 			} else if (message instanceof RpcInvocation) {
 				handleRpcInvocation((RpcInvocation) message);
 			} else {
-				LOG.warn("Received message of unknown type {}. Dropping this message!", message.getClass());
+				LOG.warn(
+					"Received message of unknown type {} with value {}. Dropping this message!",
+					message.getClass().getName(),
+					message);
 			}
 		} finally {
 			mainThreadValidator.exitMainThread();

http://git-wip-us.apache.org/repos/asf/flink/blob/2c54a61b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
index b963c53..7b33524 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
@@ -34,7 +34,7 @@ import org.apache.flink.runtime.rpc.MainThreadExecutor;
 import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcService;
-
+import org.apache.flink.runtime.rpc.StartStoppable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -136,7 +136,11 @@ public class AkkaRpcService implements RpcService {
 		@SuppressWarnings("unchecked")
 		C self = (C) Proxy.newProxyInstance(
 			classLoader,
-			new Class<?>[]{rpcEndpoint.getSelfGatewayType(), MainThreadExecutor.class, AkkaGateway.class},
+			new Class<?>[]{
+				rpcEndpoint.getSelfGatewayType(),
+				MainThreadExecutor.class,
+				StartStoppable.class,
+				AkkaGateway.class},
 			akkaInvocationHandler);
 
 		return self;

http://git-wip-us.apache.org/repos/asf/flink/blob/2c54a61b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/Processing.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/Processing.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/Processing.java
new file mode 100644
index 0000000..5c7df5d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/Processing.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka.messages;
+
+/**
+ * Controls the processing behaviour of the {@link org.apache.flink.runtime.rpc.akka.AkkaRpcActor}
+ */
+public enum Processing {
+	START, // Unstashes all stashed messages and starts processing incoming messages
+	STOP // Stop processing messages and stashes all incoming messages
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c54a61b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
index e50533e..97cf0cb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.rpc;
 
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.util.ReflectionUtil;
 import org.apache.flink.util.TestLogger;
 import org.junit.Test;
@@ -140,7 +142,7 @@ public class RpcCompletenessTest extends TestLogger {
 		int rpcTimeoutParameters = 0;
 
 		for (int i = 0; i < parameterAnnotations.length; i++) {
-			if (isRpcTimeout(parameterAnnotations[i])) {
+			if (RpcCompletenessTest.isRpcTimeout(parameterAnnotations[i])) {
 				assertTrue(
 					"The rpc timeout has to be of type " + FiniteDuration.class.getName() + ".",
 					parameterTypes[i].equals(FiniteDuration.class));
@@ -185,7 +187,7 @@ public class RpcCompletenessTest extends TestLogger {
 
 		// filter out the RpcTimeout parameters
 		for (int i = 0; i < gatewayParameterTypes.length; i++) {
-			if (!isRpcTimeout(gatewayParameterAnnotations[i])) {
+			if (!RpcCompletenessTest.isRpcTimeout(gatewayParameterAnnotations[i])) {
 				filteredGatewayParameterTypes.add(gatewayParameterTypes[i]);
 			}
 		}
@@ -235,7 +237,22 @@ public class RpcCompletenessTest extends TestLogger {
 	}
 
 	private boolean checkType(Class<?> firstType, Class<?> secondType) {
-		return firstType.equals(secondType);
+		Class<?> firstResolvedType;
+		Class<?> secondResolvedType;
+
+		if (firstType.isPrimitive()) {
+			firstResolvedType = RpcCompletenessTest.resolvePrimitiveType(firstType);
+		} else {
+			firstResolvedType = firstType;
+		}
+
+		if (secondType.isPrimitive()) {
+			secondResolvedType = RpcCompletenessTest.resolvePrimitiveType(secondType);
+		} else {
+			secondResolvedType = secondType;
+		}
+
+		return firstResolvedType.equals(secondResolvedType);
 	}
 
 	/**
@@ -279,7 +296,7 @@ public class RpcCompletenessTest extends TestLogger {
 
 		for (int i = 0; i < parameterTypes.length; i++) {
 			// filter out the RpcTimeout parameters
-			if (!isRpcTimeout(parameterAnnotations[i])) {
+			if (!RpcCompletenessTest.isRpcTimeout(parameterAnnotations[i])) {
 				builder.append(parameterTypes[i].getName());
 
 				if (i < parameterTypes.length -1) {
@@ -293,7 +310,7 @@ public class RpcCompletenessTest extends TestLogger {
 		return builder.toString();
 	}
 
-	private boolean isRpcTimeout(Annotation[] annotations) {
+	private static boolean isRpcTimeout(Annotation[] annotations) {
 		for (Annotation annotation : annotations) {
 			if (annotation.annotationType().equals(RpcTimeout.class)) {
 				return true;
@@ -302,4 +319,22 @@ public class RpcCompletenessTest extends TestLogger {
 
 		return false;
 	}
+
+	/**
+	 * Returns the boxed type for a primitive type.
+	 *
+	 * @param primitveType Primitive type to resolve
+	 * @return Boxed type for the given primitive type
+	 */
+	private static Class<?> resolvePrimitiveType(Class<?> primitveType) {
+		assert primitveType.isPrimitive();
+
+		TypeInformation<?> typeInformation = BasicTypeInfo.getInfoFor(primitveType);
+
+		if (typeInformation != null) {
+			return typeInformation.getTypeClass();
+		} else {
+			throw new RuntimeException("Could not retrive basic type information for primitive type
" + primitveType + '.');
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2c54a61b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
new file mode 100644
index 0000000..1653fac
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka;
+
+import akka.actor.ActorSystem;
+import akka.util.Timeout;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.rpc.RpcEndpoint;
+import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.runtime.rpc.RpcMethod;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.util.TestLogger;
+import org.hamcrest.core.Is;
+import org.junit.AfterClass;
+import org.junit.Test;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertThat;
+
+public class AkkaRpcActorTest extends TestLogger {
+
+	// ------------------------------------------------------------------------
+	//  shared test members
+	// ------------------------------------------------------------------------
+
+	private static ActorSystem actorSystem = AkkaUtils.createDefaultActorSystem();
+
+	private static Timeout timeout = new Timeout(10000, TimeUnit.MILLISECONDS);
+
+	private static AkkaRpcService akkaRpcService =
+		new AkkaRpcService(actorSystem, timeout);
+
+	@AfterClass
+	public static void shutdown() {
+		akkaRpcService.stopService();
+		actorSystem.shutdown();
+		actorSystem.awaitTermination();
+	}
+
+	/**
+	 * Tests that the {@link AkkaRpcActor} stashes messages until the corresponding
+	 * {@link RpcEndpoint} has been started.
+	 */
+	@Test
+	public void testMessageStashing() throws Exception {
+		int expectedValue = 1337;
+
+		DummyRpcEndpoint rpcEndpoint = new DummyRpcEndpoint(akkaRpcService);
+
+		DummyRpcGateway rpcGateway = rpcEndpoint.getSelf();
+
+		// this message should not be processed until we've started the rpc endpoint
+		Future<Integer> result = rpcGateway.foobar();
+
+		// set a new value which we expect to be returned
+		rpcEndpoint.setFoobar(expectedValue);
+
+		// now process the rpc
+		rpcEndpoint.start();
+
+		Integer actualValue = Await.result(result, timeout.duration());
+
+		assertThat("The new foobar value should have been returned.", actualValue, Is.is(expectedValue));
+
+		rpcEndpoint.shutDown();
+	}
+
+	private interface DummyRpcGateway extends RpcGateway {
+		Future<Integer> foobar();
+	}
+
+	private static class DummyRpcEndpoint extends RpcEndpoint<DummyRpcGateway> {
+
+		private volatile int _foobar = 42;
+
+		protected DummyRpcEndpoint(RpcService rpcService) {
+			super(rpcService);
+		}
+
+		@RpcMethod
+		public int foobar() {
+			return _foobar;
+		}
+
+		public void setFoobar(int value) {
+			_foobar = value;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c54a61b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
index f26b40b..fd55904 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
@@ -57,6 +57,9 @@ public class AkkaRpcServiceTest extends TestLogger {
 		ResourceManager resourceManager = new ResourceManager(akkaRpcService, executorService);
 		JobMaster jobMaster = new JobMaster(akkaRpcService2, executorService);
 
+		resourceManager.start();
+		jobMaster.start();
+
 		ResourceManagerGateway rm = resourceManager.getSelf();
 
 		assertTrue(rm instanceof AkkaGateway);

http://git-wip-us.apache.org/repos/asf/flink/blob/2c54a61b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AsyncCallsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AsyncCallsTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AsyncCallsTest.java
index f2ce52d..d33987c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AsyncCallsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AsyncCallsTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcMethod;
 import org.apache.flink.runtime.rpc.RpcService;
 
+import org.apache.flink.util.TestLogger;
 import org.junit.AfterClass;
 import org.junit.Test;
 
@@ -42,7 +43,7 @@ import java.util.concurrent.locks.ReentrantLock;
 
 import static org.junit.Assert.*;
 
-public class AsyncCallsTest {
+public class AsyncCallsTest extends TestLogger {
 
 	// ------------------------------------------------------------------------
 	//  shared test members
@@ -72,6 +73,7 @@ public class AsyncCallsTest {
 		final AtomicBoolean concurrentAccess = new AtomicBoolean(false);
 
 		TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService, lock);
+		testEndpoint.start();
 		TestGateway gateway = testEndpoint.getSelf();
 
 		// a bunch of gateway calls
@@ -127,6 +129,7 @@ public class AsyncCallsTest {
 		final long delay = 200;
 
 		TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService, lock);
+		testEndpoint.start();
 
 		// run something asynchronously
 		testEndpoint.runAsync(new Runnable() {

http://git-wip-us.apache.org/repos/asf/flink/blob/2c54a61b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java
index b854143..9ffafda 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java
@@ -27,13 +27,14 @@ import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcMethod;
 import org.apache.flink.runtime.rpc.RpcService;
 
+import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 
 import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertTrue;
 
-public class MainThreadValidationTest {
+public class MainThreadValidationTest extends TestLogger {
 
 	@Test
 	public void failIfNotInMainThread() {
@@ -51,6 +52,7 @@ public class MainThreadValidationTest {
 
 		try {
 			TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService);
+			testEndpoint.start();
 
 			// this works, because it is executed as an RPC call
 			testEndpoint.getSelf().someConcurrencyCriticalFunction();

http://git-wip-us.apache.org/repos/asf/flink/blob/2c54a61b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java
index ca8179c..9d2ed99 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java
@@ -86,6 +86,7 @@ public class MessageSerializationTest extends TestLogger {
 	public void testNonSerializableLocalMessageTransfer() throws InterruptedException, IOException
{
 		LinkedBlockingQueue<Object> linkedBlockingQueue = new LinkedBlockingQueue<>();
 		TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService1, linkedBlockingQueue);
+		testEndpoint.start();
 
 		TestGateway testGateway = testEndpoint.getSelf();
 
@@ -106,6 +107,7 @@ public class MessageSerializationTest extends TestLogger {
 		LinkedBlockingQueue<Object> linkedBlockingQueue = new LinkedBlockingQueue<>();
 
 		TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService1, linkedBlockingQueue);
+		testEndpoint.start();
 
 		String address = testEndpoint.getAddress();
 
@@ -126,6 +128,7 @@ public class MessageSerializationTest extends TestLogger {
 		LinkedBlockingQueue<Object> linkedBlockingQueue = new LinkedBlockingQueue<>();
 
 		TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService1, linkedBlockingQueue);
+		testEndpoint.start();
 
 		String address = testEndpoint.getAddress();
 
@@ -149,6 +152,7 @@ public class MessageSerializationTest extends TestLogger {
 		LinkedBlockingQueue<Object> linkedBlockingQueue = new LinkedBlockingQueue<>();
 
 		TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService1, linkedBlockingQueue);
+		testEndpoint.start();
 
 		String address = testEndpoint.getAddress();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2c54a61b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorTest.java
index 33c9cb6..c96f4f6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorTest.java
@@ -28,17 +28,26 @@ import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.rpc.MainThreadExecutor;
+import org.apache.flink.runtime.rpc.RpcEndpoint;
+import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.StartStoppable;
 import org.apache.flink.runtime.util.DirectExecutorService;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
 import org.junit.Test;
+import org.mockito.Matchers;
+import org.mockito.cglib.proxy.InvocationHandler;
+import org.mockito.cglib.proxy.Proxy;
+import scala.concurrent.Future;
 
 import java.net.URL;
 import java.util.Collections;
 
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 public class TaskExecutorTest extends TestLogger {
 
@@ -48,8 +57,13 @@ public class TaskExecutorTest extends TestLogger {
 	@Test
 	public void testTaskExecution() throws Exception {
 		RpcService testingRpcService = mock(RpcService.class);
+		InvocationHandler invocationHandler = mock(InvocationHandler.class);
+		Object selfGateway = Proxy.newProxyInstance(ClassLoader.getSystemClassLoader(), new Class<?>[]
{TaskExecutorGateway.class, MainThreadExecutor.class, StartStoppable.class}, invocationHandler);
+		when(testingRpcService.startServer(Matchers.any(RpcEndpoint.class))).thenReturn((RpcGateway)selfGateway);
+
 		DirectExecutorService directExecutorService = new DirectExecutorService();
 		TaskExecutor taskExecutor = new TaskExecutor(testingRpcService, directExecutorService);
+		taskExecutor.start();
 
 		TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(
 			new JobID(),
@@ -82,8 +96,12 @@ public class TaskExecutorTest extends TestLogger {
 	@Test(expected=Exception.class)
 	public void testWrongTaskCancellation() throws Exception {
 		RpcService testingRpcService = mock(RpcService.class);
+		InvocationHandler invocationHandler = mock(InvocationHandler.class);
+		Object selfGateway = Proxy.newProxyInstance(ClassLoader.getSystemClassLoader(), new Class<?>[]
{TaskExecutorGateway.class, MainThreadExecutor.class, StartStoppable.class}, invocationHandler);
+		when(testingRpcService.startServer(Matchers.any(RpcEndpoint.class))).thenReturn((RpcGateway)selfGateway);
 		DirectExecutorService directExecutorService = null;
 		TaskExecutor taskExecutor = new TaskExecutor(testingRpcService, directExecutorService);
+		taskExecutor.start();
 
 		taskExecutor.cancelTask(new ExecutionAttemptID());
 


Mime
View raw message