Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 65E54200BB7 for ; Wed, 28 Sep 2016 10:20:47 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 64B63160B06; Wed, 28 Sep 2016 08:20:47 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 6091C160AF1 for ; Wed, 28 Sep 2016 10:20:46 +0200 (CEST) Received: (qmail 92729 invoked by uid 500); 28 Sep 2016 08:20:45 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 91727 invoked by uid 99); 28 Sep 2016 08:20:44 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 28 Sep 2016 08:20:44 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 9AE31EEF49; Wed, 28 Sep 2016 08:20:44 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: trohrmann@apache.org To: commits@flink.apache.org Date: Wed, 28 Sep 2016 08:21:01 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [18/50] [abbrv] flink git commit: [FLINK-4434] [rpc] Add a testing RPC service. archived-at: Wed, 28 Sep 2016 08:20:47 -0000 [FLINK-4434] [rpc] Add a testing RPC service. This closes #2394. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9f73a935 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9f73a935 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9f73a935 Branch: refs/heads/flip-6 Commit: 9f73a9355de131f9154c598871db12c7fe08dd30 Parents: 8d502fa Author: Stephan Ewen Authored: Fri Aug 19 23:29:45 2016 +0200 Committer: Till Rohrmann Committed: Tue Sep 27 19:24:57 2016 +0200 ---------------------------------------------------------------------- .../flink/runtime/rpc/RpcCompletenessTest.java | 3 + .../flink/runtime/rpc/TestingGatewayBase.java | 85 ++++++++++++++ .../flink/runtime/rpc/TestingRpcService.java | 115 +++++++++++++++++++ 3 files changed, 203 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/9f73a935/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java index 97cf0cb..b8aad62 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java @@ -41,9 +41,11 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; public class RpcCompletenessTest extends TestLogger { + private static final Class futureClass = Future.class; @Test + @SuppressWarnings({"rawtypes", "unchecked"}) public void testRpcCompleteness() { Reflections reflections = new Reflections("org.apache.flink"); @@ -64,6 +66,7 @@ public class RpcCompletenessTest extends TestLogger { } } + @SuppressWarnings("rawtypes") private void checkCompleteness(Class rpcEndpoint, Class rpcGateway) { Method[] gatewayMethods = rpcGateway.getDeclaredMethods(); Method[] serverMethods = rpcEndpoint.getDeclaredMethods(); http://git-wip-us.apache.org/repos/asf/flink/blob/9f73a935/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingGatewayBase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingGatewayBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingGatewayBase.java new file mode 100644 index 0000000..4256135 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingGatewayBase.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc; + +import akka.dispatch.Futures; +import scala.concurrent.Future; +import scala.concurrent.Promise; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * Utility base class for testing gateways + */ +public abstract class TestingGatewayBase implements RpcGateway { + + private final ScheduledExecutorService executor; + + protected TestingGatewayBase() { + this.executor = Executors.newSingleThreadScheduledExecutor(); + } + + // ------------------------------------------------------------------------ + // shutdown + // ------------------------------------------------------------------------ + + public void stop() { + executor.shutdownNow(); + } + + @Override + protected void finalize() throws Throwable { + super.finalize(); + executor.shutdownNow(); + } + + // ------------------------------------------------------------------------ + // utilities + // ------------------------------------------------------------------------ + + public Future futureWithTimeout(long timeoutMillis) { + Promise promise = Futures.promise(); + executor.schedule(new FutureTimeout(promise), timeoutMillis, TimeUnit.MILLISECONDS); + return promise.future(); + } + + // ------------------------------------------------------------------------ + + private static final class FutureTimeout implements Runnable { + + private final Promise promise; + + private FutureTimeout(Promise promise) { + this.promise = promise; + } + + @Override + public void run() { + try { + promise.failure(new TimeoutException()); + } catch (Throwable t) { + System.err.println("CAUGHT AN ERROR IN THE TEST: " + t.getMessage()); + t.printStackTrace(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/9f73a935/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java new file mode 100644 index 0000000..7e92e8d --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc; + +import akka.dispatch.Futures; +import akka.util.Timeout; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.rpc.akka.AkkaRpcService; + +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * An RPC Service implementation for testing. This RPC service acts as a replacement for + * teh regular RPC service for cases where tests need to return prepared mock gateways instead of + * proper RPC gateways. + * + *

The TestingRpcService can be used for example in the following fashion, + * using Mockito for mocks and verification: + * + *

{@code
+ * TestingRpcService rpc = new TestingRpcService();
+ *
+ * ResourceManagerGateway testGateway = mock(ResourceManagerGateway.class);
+ * rpc.registerGateway("myAddress", testGateway);
+ * 
+ * MyComponentToTest component = new MyComponentToTest();
+ * component.triggerSomethingThatCallsTheGateway();
+ * 
+ * verify(testGateway, timeout(1000)).theTestMethod(any(UUID.class), anyString());
+ * }
+ */ +public class TestingRpcService extends AkkaRpcService { + + /** Map of pre-registered connections */ + private final ConcurrentHashMap registeredConnections; + + /** + * Creates a new {@code TestingRpcService}. + */ + public TestingRpcService() { + this(new Configuration()); + } + + /** + * Creates a new {@code TestingRpcService}, using the given configuration. + */ + public TestingRpcService(Configuration configuration) { + super(AkkaUtils.createLocalActorSystem(configuration), new Timeout(new FiniteDuration(10, TimeUnit.SECONDS))); + + this.registeredConnections = new ConcurrentHashMap<>(); + } + + // ------------------------------------------------------------------------ + + @Override + public void stopService() { + super.stopService(); + registeredConnections.clear(); + } + + // ------------------------------------------------------------------------ + // connections + // ------------------------------------------------------------------------ + + public void registerGateway(String address, RpcGateway gateway) { + checkNotNull(address); + checkNotNull(gateway); + + if (registeredConnections.putIfAbsent(address, gateway) != null) { + throw new IllegalStateException("a gateway is already registered under " + address); + } + } + + @Override + public Future connect(String address, Class clazz) { + RpcGateway gateway = registeredConnections.get(address); + + if (gateway != null) { + if (clazz.isAssignableFrom(gateway.getClass())) { + @SuppressWarnings("unchecked") + C typedGateway = (C) gateway; + return Futures.successful(typedGateway); + } else { + return Futures.failed( + new Exception("Gateway registered under " + address + " is not of type " + clazz)); + } + } else { + return Futures.failed(new Exception("No gateway registered under that name")); + } + } +} \ No newline at end of file