Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 90746200B67 for ; Tue, 16 Aug 2016 17:45:07 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 8E407160AA8; Tue, 16 Aug 2016 15:45:07 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 16507160A74 for ; Tue, 16 Aug 2016 17:45:05 +0200 (CEST) Received: (qmail 6949 invoked by uid 500); 16 Aug 2016 15:45:05 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 6940 invoked by uid 99); 16 Aug 2016 15:45:05 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 16 Aug 2016 15:45:05 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 2F4A4E020E; Tue, 16 Aug 2016 15:45:05 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: trohrmann@apache.org To: commits@flink.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: flink git commit: [FLINK-4382] [rpc] Buffer rpc calls until the RpcEndpoint has been started Date: Tue, 16 Aug 2016 15:45:05 +0000 (UTC) archived-at: Tue, 16 Aug 2016 15:45:07 -0000 Repository: flink Updated Branches: refs/heads/flip-6 08cf86016 -> 5df27ebd7 [FLINK-4382] [rpc] Buffer rpc calls until the RpcEndpoint has been started This PR allows the AkkaRpcActor to stash messages until the corresponding RcpEndpoint has been started. When receiving a Processing.START message, the AkkaRpcActor unstashes all messages and starts processing rpcs. When receiving a Processing.STOP message, it will stop processing messages and stash incoming messages again. Add test case for message stashing This closes #2358. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5df27ebd Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5df27ebd Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5df27ebd Branch: refs/heads/flip-6 Commit: 5df27ebd73f5218a792fb53385887968c0e4ca36 Parents: 08cf860 Author: Till Rohrmann Authored: Thu Aug 11 18:13:25 2016 +0200 Committer: Till Rohrmann Committed: Tue Aug 16 17:41:16 2016 +0200 ---------------------------------------------------------------------- .../apache/flink/runtime/rpc/RpcEndpoint.java | 15 ++- .../flink/runtime/rpc/StartStoppable.java | 35 ++++++ .../runtime/rpc/akka/AkkaInvocationHandler.java | 21 +++- .../flink/runtime/rpc/akka/AkkaRpcActor.java | 39 ++++++- .../flink/runtime/rpc/akka/AkkaRpcService.java | 8 +- .../runtime/rpc/akka/messages/Processing.java | 27 +++++ .../flink/runtime/rpc/RpcCompletenessTest.java | 45 +++++++- .../runtime/rpc/akka/AkkaRpcActorTest.java | 108 +++++++++++++++++++ .../runtime/rpc/akka/AkkaRpcServiceTest.java | 3 + .../flink/runtime/rpc/akka/AsyncCallsTest.java | 5 +- .../rpc/akka/MainThreadValidationTest.java | 4 +- .../rpc/akka/MessageSerializationTest.java | 4 + .../rpc/taskexecutor/TaskExecutorTest.java | 18 ++++ 13 files changed, 315 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/5df27ebd/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java index d36a283..67ac182 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java @@ -74,7 +74,7 @@ public abstract class RpcEndpoint { /** The main thread execution context to be used to execute future callbacks in the main thread * of the executing rpc server. */ - private final MainThreadExecutionContext mainThreadExecutionContext; + private final ExecutionContext mainThreadExecutionContext; /** A reference to the endpoint's main thread, if the current method is called by the main thread */ final AtomicReference currentMainThread = new AtomicReference<>(null); @@ -106,10 +106,21 @@ public abstract class RpcEndpoint { } // ------------------------------------------------------------------------ - // Shutdown + // Start & Shutdown // ------------------------------------------------------------------------ /** + * Starts the rpc endpoint. This tells the underlying rpc server that the rpc endpoint is ready + * to process remote procedure calls. + * + * IMPORTANT: Whenever you override this method, call the parent implementation to enable + * rpc processing. It is advised to make the parent call last. + */ + public void start() { + ((StartStoppable) self).start(); + } + + /** * Shuts down the underlying RPC endpoint via the RPC service. * After this method was called, the RPC endpoint will no longer be reachable, neither remotely, * not via its {@link #getSelf() self gateway}. It will also not accepts executions in main thread http://git-wip-us.apache.org/repos/asf/flink/blob/5df27ebd/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/StartStoppable.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/StartStoppable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/StartStoppable.java new file mode 100644 index 0000000..dd5595f --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/StartStoppable.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc; + +/** + * Interface to start and stop the processing of rpc calls in the rpc server. + */ +public interface StartStoppable { + + /** + * Starts the processing of remote procedure calls. + */ + void start(); + + /** + * Stops the processing of remote procedure calls. + */ + void stop(); +} http://git-wip-us.apache.org/repos/asf/flink/blob/5df27ebd/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java index 297104b..524bf74 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java @@ -24,8 +24,10 @@ import akka.util.Timeout; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.rpc.MainThreadExecutor; import org.apache.flink.runtime.rpc.RpcTimeout; +import org.apache.flink.runtime.rpc.StartStoppable; import org.apache.flink.runtime.rpc.akka.messages.CallAsync; import org.apache.flink.runtime.rpc.akka.messages.LocalRpcInvocation; +import org.apache.flink.runtime.rpc.akka.messages.Processing; import org.apache.flink.runtime.rpc.akka.messages.RemoteRpcInvocation; import org.apache.flink.runtime.rpc.akka.messages.RpcInvocation; import org.apache.flink.runtime.rpc.akka.messages.RunAsync; @@ -50,7 +52,7 @@ import static org.apache.flink.util.Preconditions.checkArgument; * rpc in a {@link LocalRpcInvocation} message and then sends it to the {@link AkkaRpcActor} where it is * executed. */ -class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, MainThreadExecutor { +class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, MainThreadExecutor, StartStoppable { private static final Logger LOG = Logger.getLogger(AkkaInvocationHandler.class); private final ActorRef rpcEndpoint; @@ -76,7 +78,8 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, MainThrea Object result; - if (declaringClass.equals(AkkaGateway.class) || declaringClass.equals(MainThreadExecutor.class) || declaringClass.equals(Object.class)) { + if (declaringClass.equals(AkkaGateway.class) || declaringClass.equals(MainThreadExecutor.class) || + declaringClass.equals(Object.class) || declaringClass.equals(StartStoppable.class)) { result = method.invoke(this, args); } else { String methodName = method.getName(); @@ -171,6 +174,20 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, MainThrea } } + @Override + public void start() { + rpcEndpoint.tell(Processing.START, ActorRef.noSender()); + } + + @Override + public void stop() { + rpcEndpoint.tell(Processing.STOP, ActorRef.noSender()); + } + + // ------------------------------------------------------------------------ + // Helper methods + // ------------------------------------------------------------------------ + /** * Extracts the {@link RpcTimeout} annotated rpc timeout value from the list of given method * arguments. If no {@link RpcTimeout} annotated parameter could be found, then the default http://git-wip-us.apache.org/repos/asf/flink/blob/5df27ebd/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java index dfcbcc3..2373be9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java @@ -20,13 +20,15 @@ package org.apache.flink.runtime.rpc.akka; import akka.actor.ActorRef; import akka.actor.Status; -import akka.actor.UntypedActor; +import akka.actor.UntypedActorWithStash; +import akka.japi.Procedure; import akka.pattern.Patterns; import org.apache.flink.runtime.rpc.MainThreadValidatorUtil; import org.apache.flink.runtime.rpc.RpcEndpoint; import org.apache.flink.runtime.rpc.RpcGateway; import org.apache.flink.runtime.rpc.akka.messages.CallAsync; import org.apache.flink.runtime.rpc.akka.messages.LocalRpcInvocation; +import org.apache.flink.runtime.rpc.akka.messages.Processing; import org.apache.flink.runtime.rpc.akka.messages.RpcInvocation; import org.apache.flink.runtime.rpc.akka.messages.RunAsync; @@ -45,18 +47,23 @@ import static org.apache.flink.util.Preconditions.checkNotNull; /** * Akka rpc actor which receives {@link LocalRpcInvocation}, {@link RunAsync} and {@link CallAsync} - * messages. + * {@link Processing} messages. *

* The {@link LocalRpcInvocation} designates a rpc and is dispatched to the given {@link RpcEndpoint} * instance. *

* The {@link RunAsync} and {@link CallAsync} messages contain executable code which is executed * in the context of the actor thread. + *

+ * The {@link Processing} message controls the processing behaviour of the akka rpc actor. A + * {@link Processing#START} message unstashes all stashed messages and starts processing incoming + * messages. A {@link Processing#STOP} message stops processing messages and stashes incoming + * messages. * * @param Type of the {@link RpcGateway} associated with the {@link RpcEndpoint} * @param Type of the {@link RpcEndpoint} */ -class AkkaRpcActor> extends UntypedActor { +class AkkaRpcActor> extends UntypedActorWithStash { private static final Logger LOG = LoggerFactory.getLogger(AkkaRpcActor.class); @@ -73,6 +80,27 @@ class AkkaRpcActor> extends Untyp @Override public void onReceive(final Object message) { + if (message.equals(Processing.START)) { + unstashAll(); + getContext().become(new Procedure() { + @Override + public void apply(Object message) throws Exception { + if (message.equals(Processing.STOP)) { + getContext().unbecome(); + } else { + handleMessage(message); + } + } + }); + } else { + LOG.info("The rpc endpoint {} has not been started yet. Stashing message {} until processing is started.", + rpcEndpoint.getClass().getName(), + message.getClass().getName()); + stash(); + } + } + + private void handleMessage(Object message) { mainThreadValidator.enterMainThread(); try { if (message instanceof RunAsync) { @@ -82,7 +110,10 @@ class AkkaRpcActor> extends Untyp } else if (message instanceof RpcInvocation) { handleRpcInvocation((RpcInvocation) message); } else { - LOG.warn("Received message of unknown type {}. Dropping this message!", message.getClass()); + LOG.warn( + "Received message of unknown type {} with value {}. Dropping this message!", + message.getClass().getName(), + message); } } finally { mainThreadValidator.exitMainThread(); http://git-wip-us.apache.org/repos/asf/flink/blob/5df27ebd/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java index b963c53..7b33524 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java @@ -34,7 +34,7 @@ import org.apache.flink.runtime.rpc.MainThreadExecutor; import org.apache.flink.runtime.rpc.RpcGateway; import org.apache.flink.runtime.rpc.RpcEndpoint; import org.apache.flink.runtime.rpc.RpcService; - +import org.apache.flink.runtime.rpc.StartStoppable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -136,7 +136,11 @@ public class AkkaRpcService implements RpcService { @SuppressWarnings("unchecked") C self = (C) Proxy.newProxyInstance( classLoader, - new Class[]{rpcEndpoint.getSelfGatewayType(), MainThreadExecutor.class, AkkaGateway.class}, + new Class[]{ + rpcEndpoint.getSelfGatewayType(), + MainThreadExecutor.class, + StartStoppable.class, + AkkaGateway.class}, akkaInvocationHandler); return self; http://git-wip-us.apache.org/repos/asf/flink/blob/5df27ebd/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/Processing.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/Processing.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/Processing.java new file mode 100644 index 0000000..5c7df5d --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/Processing.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc.akka.messages; + +/** + * Controls the processing behaviour of the {@link org.apache.flink.runtime.rpc.akka.AkkaRpcActor} + */ +public enum Processing { + START, // Unstashes all stashed messages and starts processing incoming messages + STOP // Stop processing messages and stashes all incoming messages +} http://git-wip-us.apache.org/repos/asf/flink/blob/5df27ebd/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java index e50533e..97cf0cb 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java @@ -18,6 +18,8 @@ package org.apache.flink.runtime.rpc; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.util.ReflectionUtil; import org.apache.flink.util.TestLogger; import org.junit.Test; @@ -140,7 +142,7 @@ public class RpcCompletenessTest extends TestLogger { int rpcTimeoutParameters = 0; for (int i = 0; i < parameterAnnotations.length; i++) { - if (isRpcTimeout(parameterAnnotations[i])) { + if (RpcCompletenessTest.isRpcTimeout(parameterAnnotations[i])) { assertTrue( "The rpc timeout has to be of type " + FiniteDuration.class.getName() + ".", parameterTypes[i].equals(FiniteDuration.class)); @@ -185,7 +187,7 @@ public class RpcCompletenessTest extends TestLogger { // filter out the RpcTimeout parameters for (int i = 0; i < gatewayParameterTypes.length; i++) { - if (!isRpcTimeout(gatewayParameterAnnotations[i])) { + if (!RpcCompletenessTest.isRpcTimeout(gatewayParameterAnnotations[i])) { filteredGatewayParameterTypes.add(gatewayParameterTypes[i]); } } @@ -235,7 +237,22 @@ public class RpcCompletenessTest extends TestLogger { } private boolean checkType(Class firstType, Class secondType) { - return firstType.equals(secondType); + Class firstResolvedType; + Class secondResolvedType; + + if (firstType.isPrimitive()) { + firstResolvedType = RpcCompletenessTest.resolvePrimitiveType(firstType); + } else { + firstResolvedType = firstType; + } + + if (secondType.isPrimitive()) { + secondResolvedType = RpcCompletenessTest.resolvePrimitiveType(secondType); + } else { + secondResolvedType = secondType; + } + + return firstResolvedType.equals(secondResolvedType); } /** @@ -279,7 +296,7 @@ public class RpcCompletenessTest extends TestLogger { for (int i = 0; i < parameterTypes.length; i++) { // filter out the RpcTimeout parameters - if (!isRpcTimeout(parameterAnnotations[i])) { + if (!RpcCompletenessTest.isRpcTimeout(parameterAnnotations[i])) { builder.append(parameterTypes[i].getName()); if (i < parameterTypes.length -1) { @@ -293,7 +310,7 @@ public class RpcCompletenessTest extends TestLogger { return builder.toString(); } - private boolean isRpcTimeout(Annotation[] annotations) { + private static boolean isRpcTimeout(Annotation[] annotations) { for (Annotation annotation : annotations) { if (annotation.annotationType().equals(RpcTimeout.class)) { return true; @@ -302,4 +319,22 @@ public class RpcCompletenessTest extends TestLogger { return false; } + + /** + * Returns the boxed type for a primitive type. + * + * @param primitveType Primitive type to resolve + * @return Boxed type for the given primitive type + */ + private static Class resolvePrimitiveType(Class primitveType) { + assert primitveType.isPrimitive(); + + TypeInformation typeInformation = BasicTypeInfo.getInfoFor(primitveType); + + if (typeInformation != null) { + return typeInformation.getTypeClass(); + } else { + throw new RuntimeException("Could not retrive basic type information for primitive type " + primitveType + '.'); + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/5df27ebd/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java new file mode 100644 index 0000000..1653fac --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc.akka; + +import akka.actor.ActorSystem; +import akka.util.Timeout; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.rpc.RpcEndpoint; +import org.apache.flink.runtime.rpc.RpcGateway; +import org.apache.flink.runtime.rpc.RpcMethod; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.util.TestLogger; +import org.hamcrest.core.Is; +import org.junit.AfterClass; +import org.junit.Test; +import scala.concurrent.Await; +import scala.concurrent.Future; + +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertThat; + +public class AkkaRpcActorTest extends TestLogger { + + // ------------------------------------------------------------------------ + // shared test members + // ------------------------------------------------------------------------ + + private static ActorSystem actorSystem = AkkaUtils.createDefaultActorSystem(); + + private static Timeout timeout = new Timeout(10000, TimeUnit.MILLISECONDS); + + private static AkkaRpcService akkaRpcService = + new AkkaRpcService(actorSystem, timeout); + + @AfterClass + public static void shutdown() { + akkaRpcService.stopService(); + actorSystem.shutdown(); + actorSystem.awaitTermination(); + } + + /** + * Tests that the {@link AkkaRpcActor} stashes messages until the corresponding + * {@link RpcEndpoint} has been started. + */ + @Test + public void testMessageStashing() throws Exception { + int expectedValue = 1337; + + DummyRpcEndpoint rpcEndpoint = new DummyRpcEndpoint(akkaRpcService); + + DummyRpcGateway rpcGateway = rpcEndpoint.getSelf(); + + // this message should not be processed until we've started the rpc endpoint + Future result = rpcGateway.foobar(); + + // set a new value which we expect to be returned + rpcEndpoint.setFoobar(expectedValue); + + // now process the rpc + rpcEndpoint.start(); + + Integer actualValue = Await.result(result, timeout.duration()); + + assertThat("The new foobar value should have been returned.", actualValue, Is.is(expectedValue)); + + rpcEndpoint.shutDown(); + } + + private interface DummyRpcGateway extends RpcGateway { + Future foobar(); + } + + private static class DummyRpcEndpoint extends RpcEndpoint { + + private volatile int _foobar = 42; + + protected DummyRpcEndpoint(RpcService rpcService) { + super(rpcService); + } + + @RpcMethod + public int foobar() { + return _foobar; + } + + public void setFoobar(int value) { + _foobar = value; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/5df27ebd/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java index f26b40b..fd55904 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java @@ -57,6 +57,9 @@ public class AkkaRpcServiceTest extends TestLogger { ResourceManager resourceManager = new ResourceManager(akkaRpcService, executorService); JobMaster jobMaster = new JobMaster(akkaRpcService2, executorService); + resourceManager.start(); + jobMaster.start(); + ResourceManagerGateway rm = resourceManager.getSelf(); assertTrue(rm instanceof AkkaGateway); http://git-wip-us.apache.org/repos/asf/flink/blob/5df27ebd/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AsyncCallsTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AsyncCallsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AsyncCallsTest.java index f2ce52d..d33987c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AsyncCallsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AsyncCallsTest.java @@ -28,6 +28,7 @@ import org.apache.flink.runtime.rpc.RpcGateway; import org.apache.flink.runtime.rpc.RpcMethod; import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.util.TestLogger; import org.junit.AfterClass; import org.junit.Test; @@ -42,7 +43,7 @@ import java.util.concurrent.locks.ReentrantLock; import static org.junit.Assert.*; -public class AsyncCallsTest { +public class AsyncCallsTest extends TestLogger { // ------------------------------------------------------------------------ // shared test members @@ -72,6 +73,7 @@ public class AsyncCallsTest { final AtomicBoolean concurrentAccess = new AtomicBoolean(false); TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService, lock); + testEndpoint.start(); TestGateway gateway = testEndpoint.getSelf(); // a bunch of gateway calls @@ -127,6 +129,7 @@ public class AsyncCallsTest { final long delay = 200; TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService, lock); + testEndpoint.start(); // run something asynchronously testEndpoint.runAsync(new Runnable() { http://git-wip-us.apache.org/repos/asf/flink/blob/5df27ebd/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java index b854143..9ffafda 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java @@ -27,13 +27,14 @@ import org.apache.flink.runtime.rpc.RpcGateway; import org.apache.flink.runtime.rpc.RpcMethod; import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.util.TestLogger; import org.junit.Test; import java.util.concurrent.TimeUnit; import static org.junit.Assert.assertTrue; -public class MainThreadValidationTest { +public class MainThreadValidationTest extends TestLogger { @Test public void failIfNotInMainThread() { @@ -51,6 +52,7 @@ public class MainThreadValidationTest { try { TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService); + testEndpoint.start(); // this works, because it is executed as an RPC call testEndpoint.getSelf().someConcurrencyCriticalFunction(); http://git-wip-us.apache.org/repos/asf/flink/blob/5df27ebd/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java index ca8179c..9d2ed99 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java @@ -86,6 +86,7 @@ public class MessageSerializationTest extends TestLogger { public void testNonSerializableLocalMessageTransfer() throws InterruptedException, IOException { LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue<>(); TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService1, linkedBlockingQueue); + testEndpoint.start(); TestGateway testGateway = testEndpoint.getSelf(); @@ -106,6 +107,7 @@ public class MessageSerializationTest extends TestLogger { LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue<>(); TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService1, linkedBlockingQueue); + testEndpoint.start(); String address = testEndpoint.getAddress(); @@ -126,6 +128,7 @@ public class MessageSerializationTest extends TestLogger { LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue<>(); TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService1, linkedBlockingQueue); + testEndpoint.start(); String address = testEndpoint.getAddress(); @@ -149,6 +152,7 @@ public class MessageSerializationTest extends TestLogger { LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue<>(); TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService1, linkedBlockingQueue); + testEndpoint.start(); String address = testEndpoint.getAddress(); http://git-wip-us.apache.org/repos/asf/flink/blob/5df27ebd/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorTest.java index 33c9cb6..c96f4f6 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorTest.java @@ -28,17 +28,26 @@ import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.rpc.MainThreadExecutor; +import org.apache.flink.runtime.rpc.RpcEndpoint; +import org.apache.flink.runtime.rpc.RpcGateway; import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.StartStoppable; import org.apache.flink.runtime.util.DirectExecutorService; import org.apache.flink.util.SerializedValue; import org.apache.flink.util.TestLogger; import org.junit.Test; +import org.mockito.Matchers; +import org.mockito.cglib.proxy.InvocationHandler; +import org.mockito.cglib.proxy.Proxy; +import scala.concurrent.Future; import java.net.URL; import java.util.Collections; import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class TaskExecutorTest extends TestLogger { @@ -48,8 +57,13 @@ public class TaskExecutorTest extends TestLogger { @Test public void testTaskExecution() throws Exception { RpcService testingRpcService = mock(RpcService.class); + InvocationHandler invocationHandler = mock(InvocationHandler.class); + Object selfGateway = Proxy.newProxyInstance(ClassLoader.getSystemClassLoader(), new Class[] {TaskExecutorGateway.class, MainThreadExecutor.class, StartStoppable.class}, invocationHandler); + when(testingRpcService.startServer(Matchers.any(RpcEndpoint.class))).thenReturn((RpcGateway)selfGateway); + DirectExecutorService directExecutorService = new DirectExecutorService(); TaskExecutor taskExecutor = new TaskExecutor(testingRpcService, directExecutorService); + taskExecutor.start(); TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor( new JobID(), @@ -82,8 +96,12 @@ public class TaskExecutorTest extends TestLogger { @Test(expected=Exception.class) public void testWrongTaskCancellation() throws Exception { RpcService testingRpcService = mock(RpcService.class); + InvocationHandler invocationHandler = mock(InvocationHandler.class); + Object selfGateway = Proxy.newProxyInstance(ClassLoader.getSystemClassLoader(), new Class[] {TaskExecutorGateway.class, MainThreadExecutor.class, StartStoppable.class}, invocationHandler); + when(testingRpcService.startServer(Matchers.any(RpcEndpoint.class))).thenReturn((RpcGateway)selfGateway); DirectExecutorService directExecutorService = null; TaskExecutor taskExecutor = new TaskExecutor(testingRpcService, directExecutorService); + taskExecutor.start(); taskExecutor.cancelTask(new ExecutionAttemptID());