Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 8E0AC200BC4 for ; Fri, 14 Oct 2016 15:45:46 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 8CF82160AD0; Fri, 14 Oct 2016 13:45:46 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 074E4160AFB for ; Fri, 14 Oct 2016 15:45:44 +0200 (CEST) Received: (qmail 98780 invoked by uid 500); 14 Oct 2016 13:45:44 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 98486 invoked by uid 99); 14 Oct 2016 13:45:43 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 14 Oct 2016 13:45:43 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id CB5A4E3934; Fri, 14 Oct 2016 13:45:43 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sewen@apache.org To: commits@flink.apache.org Date: Fri, 14 Oct 2016 13:45:50 -0000 Message-Id: In-Reply-To: <30efb58745f743e78873d5d71cfb6016@git.apache.org> References: <30efb58745f743e78873d5d71cfb6016@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [08/50] [abbrv] flink git commit: [FLINK-4516] leader election of resourcemanager archived-at: Fri, 14 Oct 2016 13:45:46 -0000 [FLINK-4516] leader election of resourcemanager - add serial rpc service - add a special rpcService implementation which directly executes the asynchronous calls serially one by one, it is just for testcase - Change ResourceManagerLeaderContender code and TestingSerialRpcService code - override shutdown logic to stop leadershipService - use a mocked RpcService rather than TestingSerialRpcService for resourceManager HA test This closes #2427 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4b077af4 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4b077af4 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4b077af4 Branch: refs/heads/flip-6 Commit: 4b077af4dfab4e9c6e1ae4e6f4865f1f319deb0c Parents: 6e22c64 Author: beyond1920 Authored: Sat Aug 27 14:14:28 2016 +0800 Committer: Stephan Ewen Committed: Fri Oct 14 15:14:39 2016 +0200 ---------------------------------------------------------------------- .../HighAvailabilityServices.java | 7 + .../runtime/highavailability/NonHaServices.java | 5 + .../rpc/resourcemanager/ResourceManager.java | 111 +++++- .../TestingHighAvailabilityServices.java | 19 +- .../runtime/rpc/TestingSerialRpcService.java | 369 +++++++++++++++++++ .../resourcemanager/ResourceManagerHATest.java | 76 ++++ 6 files changed, 578 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/4b077af4/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java index 73e4f1f..298147c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java @@ -40,6 +40,13 @@ public interface HighAvailabilityServices { LeaderRetrievalService getResourceManagerLeaderRetriever() throws Exception; /** + * Gets the leader election service for the cluster's resource manager. + * @return + * @throws Exception + */ + LeaderElectionService getResourceManagerLeaderElectionService() throws Exception; + + /** * Gets the leader election service for the given job. * * @param jobID The identifier of the job running the election. http://git-wip-us.apache.org/repos/asf/flink/blob/4b077af4/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java index 3d2769b..292a404 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java @@ -61,6 +61,11 @@ public class NonHaServices implements HighAvailabilityServices { } @Override + public LeaderElectionService getResourceManagerLeaderElectionService() throws Exception { + return new StandaloneLeaderElectionService(); + } + + @Override public LeaderElectionService getJobMasterLeaderElectionService(JobID jobID) throws Exception { return new StandaloneLeaderElectionService(); } http://git-wip-us.apache.org/repos/asf/flink/blob/4b077af4/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java index 6f34465..f7147c9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java @@ -20,24 +20,26 @@ package org.apache.flink.runtime.rpc.resourcemanager; import akka.dispatch.Mapper; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.instance.InstanceID; +import org.apache.flink.runtime.leaderelection.LeaderContender; +import org.apache.flink.runtime.leaderelection.LeaderElectionService; import org.apache.flink.runtime.rpc.RpcMethod; import org.apache.flink.runtime.rpc.RpcEndpoint; import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.rpc.jobmaster.JobMaster; import org.apache.flink.runtime.rpc.jobmaster.JobMasterGateway; import org.apache.flink.runtime.rpc.taskexecutor.TaskExecutorRegistrationSuccess; -import org.apache.flink.util.Preconditions; -import scala.concurrent.ExecutionContext; -import scala.concurrent.ExecutionContext$; import scala.concurrent.Future; import java.util.HashMap; import java.util.Map; import java.util.UUID; -import java.util.concurrent.ExecutorService; + +import static org.apache.flink.util.Preconditions.checkNotNull; /** * ResourceManager implementation. The resource manager is responsible for resource de-/allocation @@ -50,16 +52,51 @@ import java.util.concurrent.ExecutorService; * */ public class ResourceManager extends RpcEndpoint { - private final ExecutionContext executionContext; private final Map jobMasterGateways; + private final HighAvailabilityServices highAvailabilityServices; + private LeaderElectionService leaderElectionService = null; + private UUID leaderSessionID = null; - public ResourceManager(RpcService rpcService, ExecutorService executorService) { + public ResourceManager(RpcService rpcService, HighAvailabilityServices highAvailabilityServices) { super(rpcService); - this.executionContext = ExecutionContext$.MODULE$.fromExecutor( - Preconditions.checkNotNull(executorService)); + this.highAvailabilityServices = checkNotNull(highAvailabilityServices); this.jobMasterGateways = new HashMap<>(); } + @Override + public void start() { + // start a leader + try { + super.start(); + leaderElectionService = highAvailabilityServices.getResourceManagerLeaderElectionService(); + leaderElectionService.start(new ResourceManagerLeaderContender()); + } catch (Throwable e) { + log.error("A fatal error happened when starting the ResourceManager", e); + throw new RuntimeException("A fatal error happened when starting the ResourceManager", e); + } + } + + @Override + public void shutDown() { + try { + leaderElectionService.stop(); + super.shutDown(); + } catch(Throwable e) { + log.error("A fatal error happened when shutdown the ResourceManager", e); + throw new RuntimeException("A fatal error happened when shutdown the ResourceManager", e); + } + } + + /** + * Gets the leader session id of current resourceManager. + * + * @return return the leaderSessionId of current resourceManager, this returns null until the current resourceManager is granted leadership. + */ + @VisibleForTesting + UUID getLeaderSessionID() { + return leaderSessionID; + } + /** * Register a {@link JobMaster} at the resource manager. * @@ -116,4 +153,62 @@ public class ResourceManager extends RpcEndpoint { return new TaskExecutorRegistrationSuccess(new InstanceID(), 5000); } + + private class ResourceManagerLeaderContender implements LeaderContender { + + /** + * Callback method when current resourceManager is granted leadership + * + * @param leaderSessionID unique leadershipID + */ + @Override + public void grantLeadership(final UUID leaderSessionID) { + runAsync(new Runnable() { + @Override + public void run() { + log.info("ResourceManager {} was granted leadership with leader session ID {}", getAddress(), leaderSessionID); + ResourceManager.this.leaderSessionID = leaderSessionID; + // confirming the leader session ID might be blocking, + leaderElectionService.confirmLeaderSessionID(leaderSessionID); + } + }); + } + + /** + * Callback method when current resourceManager lose leadership. + */ + @Override + public void revokeLeadership() { + runAsync(new Runnable() { + @Override + public void run() { + log.info("ResourceManager {} was revoked leadership.", getAddress()); + jobMasterGateways.clear(); + leaderSessionID = null; + } + }); + } + + @Override + public String getAddress() { + return ResourceManager.this.getAddress(); + } + + /** + * Handles error occurring in the leader election service + * + * @param exception Exception being thrown in the leader election service + */ + @Override + public void handleError(final Exception exception) { + runAsync(new Runnable() { + @Override + public void run() { + log.error("ResourceManager received an error from the LeaderElectionService.", exception); + // terminate ResourceManager in case of an error + shutDown(); + } + }); + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/4b077af4/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java index 4d654a3..3162f40 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java @@ -32,6 +32,8 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices private volatile LeaderElectionService jobMasterLeaderElectionService; + private volatile LeaderElectionService resourceManagerLeaderElectionService; + // ------------------------------------------------------------------------ // Setters for mock / testing implementations @@ -44,7 +46,11 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices public void setJobMasterLeaderElectionService(LeaderElectionService leaderElectionService) { this.jobMasterLeaderElectionService = leaderElectionService; } - + + public void setResourceManagerLeaderElectionService(LeaderElectionService leaderElectionService) { + this.resourceManagerLeaderElectionService = leaderElectionService; + } + // ------------------------------------------------------------------------ // HA Services Methods // ------------------------------------------------------------------------ @@ -69,4 +75,15 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices throw new IllegalStateException("JobMasterLeaderElectionService has not been set"); } } + + @Override + public LeaderElectionService getResourceManagerLeaderElectionService() throws Exception { + LeaderElectionService service = resourceManagerLeaderElectionService; + + if (service != null) { + return service; + } else { + throw new IllegalStateException("ResourceManagerLeaderElectionService has not been set"); + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/4b077af4/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java new file mode 100644 index 0000000..7bdbb99 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java @@ -0,0 +1,369 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc; + +import akka.dispatch.ExecutionContexts; +import akka.dispatch.Futures; +import akka.util.Timeout; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.util.DirectExecutorService; +import org.apache.flink.util.Preconditions; +import scala.concurrent.Await; +import scala.concurrent.ExecutionContext; +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; + +import java.lang.annotation.Annotation; +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; +import java.util.BitSet; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.util.Preconditions.checkNotNull; + + +/** + * An RPC Service implementation for testing. This RPC service directly executes all asynchronous calls one by one in the main thread. + */ +public class TestingSerialRpcService implements RpcService { + + private final DirectExecutorService executorService; + private final ConcurrentHashMap registeredConnections; + + public TestingSerialRpcService() { + executorService = new DirectExecutorService(); + this.registeredConnections = new ConcurrentHashMap<>(); + } + + @Override + public void scheduleRunnable(final Runnable runnable, final long delay, final TimeUnit unit) { + try { + unit.sleep(delay); + runnable.run(); + } catch (Throwable e) { + throw new RuntimeException(e); + } + } + + @Override + public ExecutionContext getExecutionContext() { + return ExecutionContexts.fromExecutorService(executorService); + } + + @Override + public void stopService() { + executorService.shutdown(); + registeredConnections.clear(); + } + + @Override + public void stopServer(RpcGateway selfGateway) { + + } + + @Override + public > C startServer(S rpcEndpoint) { + final String address = UUID.randomUUID().toString(); + + InvocationHandler akkaInvocationHandler = new TestingSerialInvocationHandler(address, rpcEndpoint); + ClassLoader classLoader = getClass().getClassLoader(); + + @SuppressWarnings("unchecked") + C self = (C) Proxy.newProxyInstance( + classLoader, + new Class[]{ + rpcEndpoint.getSelfGatewayType(), + MainThreadExecutor.class, + StartStoppable.class, + RpcGateway.class + }, + akkaInvocationHandler); + + return self; + } + + @Override + public Future connect(String address, Class clazz) { + RpcGateway gateway = registeredConnections.get(address); + + if (gateway != null) { + if (clazz.isAssignableFrom(gateway.getClass())) { + @SuppressWarnings("unchecked") + C typedGateway = (C) gateway; + return Futures.successful(typedGateway); + } else { + return Futures.failed( + new Exception("Gateway registered under " + address + " is not of type " + clazz)); + } + } else { + return Futures.failed(new Exception("No gateway registered under that name")); + } + } + + // ------------------------------------------------------------------------ + // connections + // ------------------------------------------------------------------------ + + public void registerGateway(String address, RpcGateway gateway) { + checkNotNull(address); + checkNotNull(gateway); + + if (registeredConnections.putIfAbsent(address, gateway) != null) { + throw new IllegalStateException("a gateway is already registered under " + address); + } + } + + private static class TestingSerialInvocationHandler> implements InvocationHandler, RpcGateway, MainThreadExecutor, StartStoppable { + + private final T rpcEndpoint; + + /** default timeout for asks */ + private final Timeout timeout; + + private final String address; + + private TestingSerialInvocationHandler(String address, T rpcEndpoint) { + this(address, rpcEndpoint, new Timeout(new FiniteDuration(10, TimeUnit.SECONDS))); + } + + private TestingSerialInvocationHandler(String address, T rpcEndpoint, Timeout timeout) { + this.rpcEndpoint = rpcEndpoint; + this.timeout = timeout; + this.address = address; + } + + @Override + public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { + Class declaringClass = method.getDeclaringClass(); + if (declaringClass.equals(MainThreadExecutor.class) || + declaringClass.equals(Object.class) || declaringClass.equals(StartStoppable.class) || + declaringClass.equals(RpcGateway.class)) { + return method.invoke(this, args); + } else { + final String methodName = method.getName(); + Class[] parameterTypes = method.getParameterTypes(); + Annotation[][] parameterAnnotations = method.getParameterAnnotations(); + Timeout futureTimeout = extractRpcTimeout(parameterAnnotations, args, timeout); + + final Tuple2[], Object[]> filteredArguments = filterArguments( + parameterTypes, + parameterAnnotations, + args); + + Class returnType = method.getReturnType(); + + if (returnType.equals(Future.class)) { + try { + Object result = handleRpcInvocationSync(methodName, filteredArguments.f0, filteredArguments.f1, futureTimeout); + return Futures.successful(result); + } catch (Throwable e) { + return Futures.failed(e); + } + } else { + return handleRpcInvocationSync(methodName, filteredArguments.f0, filteredArguments.f1, futureTimeout); + } + } + } + + /** + * Handle rpc invocations by looking up the rpc method on the rpc endpoint and calling this + * method with the provided method arguments. If the method has a return value, it is returned + * to the sender of the call. + */ + private Object handleRpcInvocationSync(final String methodName, + final Class[] parameterTypes, + final Object[] args, + final Timeout futureTimeout) throws Exception { + final Method rpcMethod = lookupRpcMethod(methodName, parameterTypes); + Object result = rpcMethod.invoke(rpcEndpoint, args); + + if (result != null && result instanceof Future) { + Future future = (Future) result; + return Await.result(future, futureTimeout.duration()); + } else { + return result; + } + } + + @Override + public void runAsync(Runnable runnable) { + runnable.run(); + } + + @Override + public Future callAsync(Callable callable, Timeout callTimeout) { + try { + return Futures.successful(callable.call()); + } catch (Throwable e) { + return Futures.failed(e); + } + } + + @Override + public void scheduleRunAsync(final Runnable runnable, final long delay) { + try { + TimeUnit.MILLISECONDS.sleep(delay); + runnable.run(); + } catch (Throwable e) { + throw new RuntimeException(e); + } + } + + @Override + public String getAddress() { + return address; + } + + @Override + public void start() { + // do nothing + } + + @Override + public void stop() { + // do nothing + } + + /** + * Look up the rpc method on the given {@link RpcEndpoint} instance. + * + * @param methodName Name of the method + * @param parameterTypes Parameter types of the method + * @return Method of the rpc endpoint + * @throws NoSuchMethodException Thrown if the method with the given name and parameter types + * cannot be found at the rpc endpoint + */ + private Method lookupRpcMethod(final String methodName, + final Class[] parameterTypes) throws NoSuchMethodException { + return rpcEndpoint.getClass().getMethod(methodName, parameterTypes); + } + + // ------------------------------------------------------------------------ + // Helper methods + // ------------------------------------------------------------------------ + + /** + * Extracts the {@link RpcTimeout} annotated rpc timeout value from the list of given method + * arguments. If no {@link RpcTimeout} annotated parameter could be found, then the default + * timeout is returned. + * + * @param parameterAnnotations Parameter annotations + * @param args Array of arguments + * @param defaultTimeout Default timeout to return if no {@link RpcTimeout} annotated parameter + * has been found + * @return Timeout extracted from the array of arguments or the default timeout + */ + private static Timeout extractRpcTimeout(Annotation[][] parameterAnnotations, Object[] args, + Timeout defaultTimeout) { + if (args != null) { + Preconditions.checkArgument(parameterAnnotations.length == args.length); + + for (int i = 0; i < parameterAnnotations.length; i++) { + if (isRpcTimeout(parameterAnnotations[i])) { + if (args[i] instanceof FiniteDuration) { + return new Timeout((FiniteDuration) args[i]); + } else { + throw new RuntimeException("The rpc timeout parameter must be of type " + + FiniteDuration.class.getName() + ". The type " + args[i].getClass().getName() + + " is not supported."); + } + } + } + } + + return defaultTimeout; + } + + /** + * Removes all {@link RpcTimeout} annotated parameters from the parameter type and argument + * list. + * + * @param parameterTypes Array of parameter types + * @param parameterAnnotations Array of parameter annotations + * @param args Arary of arguments + * @return Tuple of filtered parameter types and arguments which no longer contain the + * {@link RpcTimeout} annotated parameter types and arguments + */ + private static Tuple2[], Object[]> filterArguments( + Class[] parameterTypes, + Annotation[][] parameterAnnotations, + Object[] args) { + + Class[] filteredParameterTypes; + Object[] filteredArgs; + + if (args == null) { + filteredParameterTypes = parameterTypes; + filteredArgs = null; + } else { + Preconditions.checkArgument(parameterTypes.length == parameterAnnotations.length); + Preconditions.checkArgument(parameterAnnotations.length == args.length); + + BitSet isRpcTimeoutParameter = new BitSet(parameterTypes.length); + int numberRpcParameters = parameterTypes.length; + + for (int i = 0; i < parameterTypes.length; i++) { + if (isRpcTimeout(parameterAnnotations[i])) { + isRpcTimeoutParameter.set(i); + numberRpcParameters--; + } + } + + if (numberRpcParameters == parameterTypes.length) { + filteredParameterTypes = parameterTypes; + filteredArgs = args; + } else { + filteredParameterTypes = new Class[numberRpcParameters]; + filteredArgs = new Object[numberRpcParameters]; + int counter = 0; + + for (int i = 0; i < parameterTypes.length; i++) { + if (!isRpcTimeoutParameter.get(i)) { + filteredParameterTypes[counter] = parameterTypes[i]; + filteredArgs[counter] = args[i]; + counter++; + } + } + } + } + return Tuple2.of(filteredParameterTypes, filteredArgs); + } + + /** + * Checks whether any of the annotations is of type {@link RpcTimeout} + * + * @param annotations Array of annotations + * @return True if {@link RpcTimeout} was found; otherwise false + */ + private static boolean isRpcTimeout(Annotation[] annotations) { + for (Annotation annotation : annotations) { + if (annotation.annotationType().equals(RpcTimeout.class)) { + return true; + } + } + + return false; + } + + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4b077af4/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerHATest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerHATest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerHATest.java new file mode 100644 index 0000000..dfffeda --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerHATest.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc.resourcemanager; + +import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; +import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; +import org.apache.flink.runtime.rpc.MainThreadExecutor; +import org.apache.flink.runtime.rpc.RpcEndpoint; +import org.apache.flink.runtime.rpc.RpcGateway; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.StartStoppable; +import org.junit.Assert; +import org.junit.Test; + +import java.util.UUID; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doCallRealMethod; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * resourceManager HA test, including grant leadership and revoke leadership + */ +public class ResourceManagerHATest { + + @Test + public void testGrantAndRevokeLeadership() throws Exception { + // mock a RpcService which will return a special RpcGateway when call its startServer method, the returned RpcGateway directly execute runAsync call + TestingResourceManagerGatewayProxy gateway = mock(TestingResourceManagerGatewayProxy.class); + doCallRealMethod().when(gateway).runAsync(any(Runnable.class)); + + RpcService rpcService = mock(RpcService.class); + when(rpcService.startServer(any(RpcEndpoint.class))).thenReturn(gateway); + + TestingLeaderElectionService leaderElectionService = new TestingLeaderElectionService(); + TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices(); + highAvailabilityServices.setResourceManagerLeaderElectionService(leaderElectionService); + + final ResourceManager resourceManager = new ResourceManager(rpcService, highAvailabilityServices); + resourceManager.start(); + // before grant leadership, resourceManager's leaderId is null + Assert.assertNull(resourceManager.getLeaderSessionID()); + final UUID leaderId = UUID.randomUUID(); + leaderElectionService.isLeader(leaderId); + // after grant leadership, resourceManager's leaderId has value + Assert.assertEquals(leaderId, resourceManager.getLeaderSessionID()); + // then revoke leadership, resourceManager's leaderId is null again + leaderElectionService.notLeader(); + Assert.assertNull(resourceManager.getLeaderSessionID()); + } + + private static abstract class TestingResourceManagerGatewayProxy implements MainThreadExecutor, StartStoppable, RpcGateway { + @Override + public void runAsync(Runnable runnable) { + runnable.run(); + } + } + +}