Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 6E2CC200B8D for ; Thu, 8 Sep 2016 17:28:27 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 6D0D5160AE3; Thu, 8 Sep 2016 15:28:27 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 96BD5160AD9 for ; Thu, 8 Sep 2016 17:28:26 +0200 (CEST) Received: (qmail 33029 invoked by uid 500); 8 Sep 2016 15:28:25 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 32544 invoked by uid 99); 8 Sep 2016 15:28:24 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 08 Sep 2016 15:28:24 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 997EDE3AA3; Thu, 8 Sep 2016 15:28:24 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: trohrmann@apache.org To: commits@flink.apache.org Date: Thu, 08 Sep 2016 15:28:59 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [37/50] [abbrv] flink git commit: [FLINK-4443] [rpc] Add support for rpc gateway and rpc endpoint inheritance archived-at: Thu, 08 Sep 2016 15:28:27 -0000 [FLINK-4443] [rpc] Add support for rpc gateway and rpc endpoint inheritance This commit extends the RpcCompletenessTest such that it can now check for inherited remote procedure calls. All methods defined at the RpcGateway are considered native. This means that they need no RpcEndpoint counterpart because they are implemented by the RpcGateway implementation. This closes #2401. update comments remove native method annotation add line break Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/04824898 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/04824898 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/04824898 Branch: refs/heads/flip-6 Commit: 048248988ccc55db4a793d16f34f73427bf7cdea Parents: 81a35c1 Author: wenlong.lwl Authored: Sun Aug 21 00:46:51 2016 +0800 Committer: Till Rohrmann Committed: Thu Sep 8 17:26:59 2016 +0200 ---------------------------------------------------------------------- .../org/apache/flink/runtime/rpc/RpcMethod.java | 2 ++ .../TestingHighAvailabilityServices.java | 19 +++++++++++ .../flink/runtime/rpc/RpcCompletenessTest.java | 33 ++++++++++++++++++-- 3 files changed, 52 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/04824898/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcMethod.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcMethod.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcMethod.java index 875e557..e4b0e94 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcMethod.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcMethod.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.rpc; import java.lang.annotation.ElementType; +import java.lang.annotation.Inherited; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; @@ -29,6 +30,7 @@ import java.lang.annotation.Target; * RpcCompletenessTest makes sure that the set of rpc methods in a rpc server and the set of * gateway methods in the corresponding gateway implementation are identical. */ +@Inherited @Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) public @interface RpcMethod { http://git-wip-us.apache.org/repos/asf/flink/blob/04824898/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java index 3a9f943..4d654a3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java @@ -18,6 +18,8 @@ package org.apache.flink.runtime.highavailability; +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.leaderelection.LeaderElectionService; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; /** @@ -28,6 +30,8 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices private volatile LeaderRetrievalService resourceManagerLeaderRetriever; + private volatile LeaderElectionService jobMasterLeaderElectionService; + // ------------------------------------------------------------------------ // Setters for mock / testing implementations @@ -36,6 +40,10 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices public void setResourceManagerLeaderRetriever(LeaderRetrievalService resourceManagerLeaderRetriever) { this.resourceManagerLeaderRetriever = resourceManagerLeaderRetriever; } + + public void setJobMasterLeaderElectionService(LeaderElectionService leaderElectionService) { + this.jobMasterLeaderElectionService = leaderElectionService; + } // ------------------------------------------------------------------------ // HA Services Methods @@ -50,4 +58,15 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices throw new IllegalStateException("ResourceManagerLeaderRetriever has not been set"); } } + + @Override + public LeaderElectionService getJobMasterLeaderElectionService(JobID jobID) throws Exception { + LeaderElectionService service = jobMasterLeaderElectionService; + + if (service != null) { + return service; + } else { + throw new IllegalStateException("JobMasterLeaderElectionService has not been set"); + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/04824898/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java index b8aad62..b431eb9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java @@ -68,8 +68,8 @@ public class RpcCompletenessTest extends TestLogger { @SuppressWarnings("rawtypes") private void checkCompleteness(Class rpcEndpoint, Class rpcGateway) { - Method[] gatewayMethods = rpcGateway.getDeclaredMethods(); - Method[] serverMethods = rpcEndpoint.getDeclaredMethods(); + Method[] gatewayMethods = getRpcMethodsFromGateway(rpcGateway).toArray(new Method[0]); + Method[] serverMethods = rpcEndpoint.getMethods(); Map> rpcMethods = new HashMap<>(); Set unmatchedRpcMethods = new HashSet<>(); @@ -340,4 +340,33 @@ public class RpcCompletenessTest extends TestLogger { throw new RuntimeException("Could not retrive basic type information for primitive type " + primitveType + '.'); } } + + /** + * Extract all rpc methods defined by the gateway interface + * + * @param interfaceClass the given rpc gateway interface + * @return all methods defined by the given interface + */ + private List getRpcMethodsFromGateway(Class interfaceClass) { + if(!interfaceClass.isInterface()) { + fail(interfaceClass.getName() + "is not a interface"); + } + + ArrayList allMethods = new ArrayList<>(); + // Methods defined in RpcGateway are native method + if(interfaceClass.equals(RpcGateway.class)) { + return allMethods; + } + + // Get all methods declared in current interface + for(Method method : interfaceClass.getDeclaredMethods()) { + allMethods.add(method); + } + + // Get all method inherited from super interface + for(Class superClass : interfaceClass.getInterfaces()) { + allMethods.addAll(getRpcMethodsFromGateway(superClass)); + } + return allMethods; + } }