Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 5C218200B8F for ; Thu, 25 Aug 2016 20:48:14 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 5A934160AA5; Thu, 25 Aug 2016 18:48:14 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id AF9A5160AE4 for ; Thu, 25 Aug 2016 20:48:09 +0200 (CEST) Received: (qmail 18177 invoked by uid 500); 25 Aug 2016 18:48:07 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 15909 invoked by uid 99); 25 Aug 2016 18:48:06 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 25 Aug 2016 18:48:06 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id EB670E1171; Thu, 25 Aug 2016 18:48:05 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sewen@apache.org To: commits@flink.apache.org Date: Thu, 25 Aug 2016 18:49:20 -0000 Message-Id: <9053c507cf7b4a18a40f473afdb3a04b@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [77/89] [abbrv] flink git commit: [FLINK-4386] [rpc] Add a utility to verify calls happen in the Rpc Endpoint's main thread archived-at: Thu, 25 Aug 2016 18:48:14 -0000 [FLINK-4386] [rpc] Add a utility to verify calls happen in the Rpc Endpoint's main thread Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a2f3f317 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a2f3f317 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a2f3f317 Branch: refs/heads/flip-6 Commit: a2f3f317e5f748b3930339816309cd1a2bf25c27 Parents: 0d38da0 Author: Stephan Ewen Authored: Thu Aug 11 20:30:54 2016 +0200 Committer: Stephan Ewen Committed: Thu Aug 25 20:21:03 2016 +0200 ---------------------------------------------------------------------- .../flink/runtime/rpc/MainThreadExecutor.java | 2 +- .../runtime/rpc/MainThreadValidatorUtil.java | 47 ++++++++++ .../apache/flink/runtime/rpc/RpcEndpoint.java | 38 +++++++- .../flink/runtime/rpc/akka/AkkaRpcActor.java | 37 +++++--- .../flink/runtime/rpc/akka/AkkaRpcService.java | 2 +- .../rpc/akka/MainThreadValidationTest.java | 97 ++++++++++++++++++++ 6 files changed, 205 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/a2f3f317/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutor.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutor.java index 4efb382..5e4fead 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutor.java @@ -30,7 +30,7 @@ import java.util.concurrent.TimeoutException; * *

This interface is intended to be implemented by the self gateway in a {@link RpcEndpoint} * implementation which allows to dispatch local procedures to the main thread of the underlying - * rpc server. + * RPC endpoint. */ public interface MainThreadExecutor { http://git-wip-us.apache.org/repos/asf/flink/blob/a2f3f317/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadValidatorUtil.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadValidatorUtil.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadValidatorUtil.java new file mode 100644 index 0000000..b3fea77 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadValidatorUtil.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * This utility exists to bridge between the visibility of the + * {@code currentMainThread} field in the {@link RpcEndpoint}. + * + * The {@code currentMainThread} can be hidden from {@code RpcEndpoint} implementations + * and only be accessed via this utility from other packages. + */ +public final class MainThreadValidatorUtil { + + private final RpcEndpoint endpoint; + + public MainThreadValidatorUtil(RpcEndpoint endpoint) { + this.endpoint = checkNotNull(endpoint); + } + + public void enterMainThread() { + assert(endpoint.currentMainThread.compareAndSet(null, Thread.currentThread())) : + "The RpcEndpoint has concurrent access from " + endpoint.currentMainThread.get(); + } + + public void exitMainThread() { + assert(endpoint.currentMainThread.compareAndSet(Thread.currentThread(), null)) : + "The RpcEndpoint has concurrent access from " + endpoint.currentMainThread.get(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/a2f3f317/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java index 44933d5..d36a283 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java @@ -29,6 +29,7 @@ import scala.concurrent.Future; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -75,6 +76,9 @@ public abstract class RpcEndpoint { * of the executing rpc server. */ private final MainThreadExecutionContext mainThreadExecutionContext; + /** A reference to the endpoint's main thread, if the current method is called by the main thread */ + final AtomicReference currentMainThread = new AtomicReference<>(null); + /** * Initializes the RPC endpoint. * @@ -92,6 +96,15 @@ public abstract class RpcEndpoint { this.mainThreadExecutionContext = new MainThreadExecutionContext((MainThreadExecutor) self); } + /** + * Returns the class of the self gateway type. + * + * @return Class of the self gateway type + */ + public final Class getSelfGatewayType() { + return selfGatewayType; + } + // ------------------------------------------------------------------------ // Shutdown // ------------------------------------------------------------------------ @@ -193,13 +206,28 @@ public abstract class RpcEndpoint { return ((MainThreadExecutor) self).callAsync(callable, timeout); } + // ------------------------------------------------------------------------ + // Main Thread Validation + // ------------------------------------------------------------------------ + /** - * Returns the class of the self gateway type. - * - * @return Class of the self gateway type + * Validates that the method call happens in the RPC endpoint's main thread. + * + *

IMPORTANT: This check only happens when assertions are enabled, + * such as when running tests. + * + *

This can be used for additional checks, like + *

{@code
+	 * protected void concurrencyCriticalMethod() {
+	 *     validateRunsInMainThread();
+	 *     
+	 *     // some critical stuff
+	 * }
+	 * }
*/ - public final Class getSelfGatewayType() { - return selfGatewayType; + public void validateRunsInMainThread() { + // because the initialization is lazy, it can be that certain methods are + assert currentMainThread.get() == Thread.currentThread(); } // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/a2f3f317/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java index 18ccf1b..5e0a7da 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java @@ -22,14 +22,16 @@ import akka.actor.ActorRef; import akka.actor.Status; import akka.actor.UntypedActor; import akka.pattern.Patterns; +import org.apache.flink.runtime.rpc.MainThreadValidatorUtil; import org.apache.flink.runtime.rpc.RpcEndpoint; import org.apache.flink.runtime.rpc.RpcGateway; import org.apache.flink.runtime.rpc.akka.messages.CallAsync; import org.apache.flink.runtime.rpc.akka.messages.RpcInvocation; import org.apache.flink.runtime.rpc.akka.messages.RunAsync; -import org.apache.flink.util.Preconditions; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; @@ -37,6 +39,8 @@ import java.lang.reflect.Method; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; +import static org.apache.flink.util.Preconditions.checkNotNull; + /** * Akka rpc actor which receives {@link RpcInvocation}, {@link RunAsync} and {@link CallAsync} * messages. @@ -51,24 +55,35 @@ import java.util.concurrent.TimeUnit; * @param Type of the {@link RpcEndpoint} */ class AkkaRpcActor> extends UntypedActor { + private static final Logger LOG = LoggerFactory.getLogger(AkkaRpcActor.class); + /** the endpoint to invoke the methods on */ private final T rpcEndpoint; + /** the helper that tracks whether calls come from the main thread */ + private final MainThreadValidatorUtil mainThreadValidator; + AkkaRpcActor(final T rpcEndpoint) { - this.rpcEndpoint = Preconditions.checkNotNull(rpcEndpoint, "rpc endpoint"); + this.rpcEndpoint = checkNotNull(rpcEndpoint, "rpc endpoint"); + this.mainThreadValidator = new MainThreadValidatorUtil(rpcEndpoint); } @Override - public void onReceive(final Object message) { - if (message instanceof RunAsync) { - handleRunAsync((RunAsync) message); - } else if (message instanceof CallAsync) { - handleCallAsync((CallAsync) message); - } else if (message instanceof RpcInvocation) { - handleRpcInvocation((RpcInvocation) message); - } else { - LOG.warn("Received message of unknown type {}. Dropping this message!", message.getClass()); + public void onReceive(final Object message) { + mainThreadValidator.enterMainThread(); + try { + if (message instanceof RunAsync) { + handleRunAsync((RunAsync) message); + } else if (message instanceof CallAsync) { + handleCallAsync((CallAsync) message); + } else if (message instanceof RpcInvocation) { + handleRpcInvocation((RpcInvocation) message); + } else { + LOG.warn("Received message of unknown type {}. Dropping this message!", message.getClass()); + } + } finally { + mainThreadValidator.exitMainThread(); } } http://git-wip-us.apache.org/repos/asf/flink/blob/a2f3f317/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java index 448216c..db40f10 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java @@ -174,7 +174,7 @@ public class AkkaRpcService implements RpcService { } @Override - public String getAddress(C selfGateway) { + public String getAddress(RpcGateway selfGateway) { checkState(!stopped, "RpcService is stopped"); if (selfGateway instanceof AkkaGateway) { http://git-wip-us.apache.org/repos/asf/flink/blob/a2f3f317/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java new file mode 100644 index 0000000..b854143 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc.akka; + +import akka.util.Timeout; + +import org.apache.flink.runtime.akka.AkkaUtils; + +import org.apache.flink.runtime.rpc.RpcEndpoint; +import org.apache.flink.runtime.rpc.RpcGateway; +import org.apache.flink.runtime.rpc.RpcMethod; +import org.apache.flink.runtime.rpc.RpcService; + +import org.junit.Test; + +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertTrue; + +public class MainThreadValidationTest { + + @Test + public void failIfNotInMainThread() { + // test if assertions are activated. The test only works if assertions are loaded. + try { + assert false; + // apparently they are not activated + return; + } catch (AssertionError ignored) {} + + // actual test + AkkaRpcService akkaRpcService = new AkkaRpcService( + AkkaUtils.createDefaultActorSystem(), + new Timeout(10000, TimeUnit.MILLISECONDS)); + + try { + TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService); + + // this works, because it is executed as an RPC call + testEndpoint.getSelf().someConcurrencyCriticalFunction(); + + // this fails, because it is executed directly + boolean exceptionThrown; + try { + testEndpoint.someConcurrencyCriticalFunction(); + exceptionThrown = false; + } + catch (AssertionError e) { + exceptionThrown = true; + } + assertTrue("should fail with an assertion error", exceptionThrown); + + akkaRpcService.stopServer(testEndpoint.getSelf()); + } + finally { + akkaRpcService.stopService(); + } + } + + // ------------------------------------------------------------------------ + // test RPC endpoint + // ------------------------------------------------------------------------ + + interface TestGateway extends RpcGateway { + + void someConcurrencyCriticalFunction(); + } + + @SuppressWarnings("unused") + public static class TestEndpoint extends RpcEndpoint { + + public TestEndpoint(RpcService rpcService) { + super(rpcService); + } + + @RpcMethod + public void someConcurrencyCriticalFunction() { + validateRunsInMainThread(); + } + } +}