flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [37/50] [abbrv] flink git commit: [FLINK-4443] [rpc] Add support for rpc gateway and rpc endpoint inheritance
Date Thu, 08 Sep 2016 15:28:59 GMT
[FLINK-4443] [rpc] Add support for rpc gateway and rpc endpoint inheritance

This commit extends the RpcCompletenessTest such that it can now check for inherited
remote procedure calls. All methods defined at the RpcGateway are considered native.
This means that they need no RpcEndpoint counterpart because they are implemented by
the RpcGateway implementation.

This closes #2401.

update comments

remove native method annotation

add line break


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/04824898
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/04824898
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/04824898

Branch: refs/heads/flip-6
Commit: 048248988ccc55db4a793d16f34f73427bf7cdea
Parents: 81a35c1
Author: wenlong.lwl <wenlong.lwl@alibaba-inc.com>
Authored: Sun Aug 21 00:46:51 2016 +0800
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Thu Sep 8 17:26:59 2016 +0200

----------------------------------------------------------------------
 .../org/apache/flink/runtime/rpc/RpcMethod.java |  2 ++
 .../TestingHighAvailabilityServices.java        | 19 +++++++++++
 .../flink/runtime/rpc/RpcCompletenessTest.java  | 33 ++++++++++++++++++--
 3 files changed, 52 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/04824898/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcMethod.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcMethod.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcMethod.java
index 875e557..e4b0e94 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcMethod.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcMethod.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.rpc;
 
 import java.lang.annotation.ElementType;
+import java.lang.annotation.Inherited;
 import java.lang.annotation.Retention;
 import java.lang.annotation.RetentionPolicy;
 import java.lang.annotation.Target;
@@ -29,6 +30,7 @@ import java.lang.annotation.Target;
  * RpcCompletenessTest makes sure that the set of rpc methods in a rpc server and the set
of
  * gateway methods in the corresponding gateway implementation are identical.
  */
+@Inherited
 @Target(ElementType.METHOD)
 @Retention(RetentionPolicy.RUNTIME)
 public @interface RpcMethod {

http://git-wip-us.apache.org/repos/asf/flink/blob/04824898/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
index 3a9f943..4d654a3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.highavailability;
 
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 
 /**
@@ -28,6 +30,8 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices
 
 	private volatile LeaderRetrievalService resourceManagerLeaderRetriever;
 
+	private volatile LeaderElectionService jobMasterLeaderElectionService;
+
 
 	// ------------------------------------------------------------------------
 	//  Setters for mock / testing implementations
@@ -36,6 +40,10 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices
 	public void setResourceManagerLeaderRetriever(LeaderRetrievalService resourceManagerLeaderRetriever)
{
 		this.resourceManagerLeaderRetriever = resourceManagerLeaderRetriever;
 	}
+
+	public void setJobMasterLeaderElectionService(LeaderElectionService leaderElectionService)
{
+		this.jobMasterLeaderElectionService = leaderElectionService;
+	}
 	
 	// ------------------------------------------------------------------------
 	//  HA Services Methods
@@ -50,4 +58,15 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices
 			throw new IllegalStateException("ResourceManagerLeaderRetriever has not been set");
 		}
 	}
+
+	@Override
+	public LeaderElectionService getJobMasterLeaderElectionService(JobID jobID) throws Exception
{
+		LeaderElectionService service = jobMasterLeaderElectionService;
+
+		if (service != null) {
+			return service;
+		} else {
+			throw new IllegalStateException("JobMasterLeaderElectionService has not been set");
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/04824898/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
index b8aad62..b431eb9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
@@ -68,8 +68,8 @@ public class RpcCompletenessTest extends TestLogger {
 
 	@SuppressWarnings("rawtypes")
 	private void checkCompleteness(Class<? extends RpcEndpoint> rpcEndpoint, Class<?
extends RpcGateway> rpcGateway) {
-		Method[] gatewayMethods = rpcGateway.getDeclaredMethods();
-		Method[] serverMethods = rpcEndpoint.getDeclaredMethods();
+		Method[] gatewayMethods = getRpcMethodsFromGateway(rpcGateway).toArray(new Method[0]);
+		Method[] serverMethods = rpcEndpoint.getMethods();
 
 		Map<String, Set<Method>> rpcMethods = new HashMap<>();
 		Set<Method> unmatchedRpcMethods = new HashSet<>();
@@ -340,4 +340,33 @@ public class RpcCompletenessTest extends TestLogger {
 			throw new RuntimeException("Could not retrive basic type information for primitive type
" + primitveType + '.');
 		}
 	}
+
+	/**
+	 * Extract all rpc methods defined by the gateway interface
+	 *
+	 * @param interfaceClass the given rpc gateway interface
+	 * @return all methods defined by the given interface
+	 */
+	private List<Method> getRpcMethodsFromGateway(Class<? extends RpcGateway> interfaceClass)
{
+		if(!interfaceClass.isInterface()) {
+			fail(interfaceClass.getName() + "is not a interface");
+		}
+
+		ArrayList<Method> allMethods = new ArrayList<>();
+		// Methods defined in RpcGateway are native method
+		if(interfaceClass.equals(RpcGateway.class)) {
+			return allMethods;
+		}
+
+		// Get all methods declared in current interface
+		for(Method method : interfaceClass.getDeclaredMethods()) {
+			allMethods.add(method);
+		}
+
+		// Get all method inherited from super interface
+		for(Class superClass : interfaceClass.getInterfaces()) {
+			allMethods.addAll(getRpcMethodsFromGateway(superClass));
+		}
+		return allMethods;
+	}
 }


Mime
View raw message