Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 5FCCE200B94 for ; Sun, 2 Oct 2016 23:58:12 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 5E83B160AC4; Sun, 2 Oct 2016 21:58:12 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id A6A1E160AEC for ; Sun, 2 Oct 2016 23:58:09 +0200 (CEST) Received: (qmail 53465 invoked by uid 500); 2 Oct 2016 21:58:08 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 51679 invoked by uid 99); 2 Oct 2016 21:58:07 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 02 Oct 2016 21:58:07 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 3916FE0EB5; Sun, 2 Oct 2016 21:58:07 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: trohrmann@apache.org To: commits@flink.apache.org Date: Sun, 02 Oct 2016 21:58:51 -0000 Message-Id: <6c35eb0cc8ee493aa533d75392b04340@git.apache.org> In-Reply-To: <522def0466064602947a7367482ae839@git.apache.org> References: <522def0466064602947a7367482ae839@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [46/50] [abbrv] flink git commit: [FLINK-4606] Integrate the new ResourceManager with the existed FlinkResourceManager archived-at: Sun, 02 Oct 2016 21:58:12 -0000 [FLINK-4606] Integrate the new ResourceManager with the existed FlinkResourceManager Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6bace895 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6bace895 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6bace895 Branch: refs/heads/flip-6 Commit: 6bace895907e4a6259367d8b0333f994aefd1f18 Parents: f5ab959 Author: beyond1920 Authored: Fri Sep 9 09:11:24 2016 +0800 Committer: Till Rohrmann Committed: Sun Oct 2 23:45:06 2016 +0200 ---------------------------------------------------------------------- .../InfoMessageListenerRpcGateway.java | 35 +++ .../resourcemanager/ResourceManager.java | 214 ++++++++++++++++--- .../resourcemanager/ResourceManagerGateway.java | 23 ++ .../StandaloneResourceManager.java | 64 ++++++ .../resourcemanager/ResourceManagerHATest.java | 2 +- .../ResourceManagerJobMasterTest.java | 2 +- .../ResourceManagerTaskExecutorTest.java | 2 +- .../slotmanager/SlotProtocolTest.java | 5 +- 8 files changed, 318 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/6bace895/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/InfoMessageListenerRpcGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/InfoMessageListenerRpcGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/InfoMessageListenerRpcGateway.java new file mode 100644 index 0000000..c1eeefa --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/InfoMessageListenerRpcGateway.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.resourcemanager; + +import org.apache.flink.runtime.clusterframework.messages.InfoMessage; +import org.apache.flink.runtime.jobmaster.JobMaster; +import org.apache.flink.runtime.rpc.RpcGateway; + +/** + * A gateway to listen for info messages from {@link ResourceManager} + */ +public interface InfoMessageListenerRpcGateway extends RpcGateway { + + /** + * Notifies when resource manager need to notify listener about InfoMessage + * @param infoMessage + */ + void notifyInfoMessage(InfoMessage infoMessage); +} http://git-wip-us.apache.org/repos/asf/flink/blob/6bace895/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java index 88b8a11..83dc4db 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java @@ -20,19 +20,22 @@ package org.apache.flink.runtime.resourcemanager; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.messages.InfoMessage; import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.concurrent.AcceptFunction; +import org.apache.flink.runtime.concurrent.ApplyFunction; import org.apache.flink.runtime.concurrent.BiFunction; import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; -import org.apache.flink.runtime.instance.InstanceID; import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess; import org.apache.flink.runtime.leaderelection.LeaderContender; import org.apache.flink.runtime.leaderelection.LeaderElectionService; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; -import org.apache.flink.runtime.rpc.RpcMethod; import org.apache.flink.runtime.rpc.RpcEndpoint; +import org.apache.flink.runtime.rpc.RpcMethod; import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.jobmaster.JobMaster; import org.apache.flink.runtime.jobmaster.JobMasterGateway; @@ -42,8 +45,6 @@ import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess; import org.apache.flink.runtime.util.LeaderConnectionInfo; import org.apache.flink.runtime.util.LeaderRetrievalUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import scala.concurrent.duration.FiniteDuration; import java.util.HashMap; @@ -66,15 +67,16 @@ import static org.apache.flink.util.Preconditions.checkNotNull; *
  • {@link #requestSlot(SlotRequest)} requests a slot from the resource manager
  • * */ -public class ResourceManager extends RpcEndpoint implements LeaderContender { +public abstract class ResourceManager extends RpcEndpoint implements LeaderContender { - private final Logger LOG = LoggerFactory.getLogger(getClass()); + /** The exit code with which the process is stopped in case of a fatal error */ + protected static final int EXIT_CODE_FATAL_ERROR = -13; private final Map jobMasterGateways; private final Set jobMasterLeaderRetrievalListeners; - private final Map taskExecutorGateways; + private final Map taskExecutorGateways; private final HighAvailabilityServices highAvailabilityServices; @@ -84,16 +86,16 @@ public class ResourceManager extends RpcEndpoint impleme private UUID leaderSessionID; - public ResourceManager( - RpcService rpcService, - HighAvailabilityServices highAvailabilityServices, - SlotManager slotManager) { + private Map infoMessageListeners; + + public ResourceManager(RpcService rpcService, HighAvailabilityServices highAvailabilityServices, SlotManager slotManager) { super(rpcService); this.highAvailabilityServices = checkNotNull(highAvailabilityServices); this.jobMasterGateways = new HashMap<>(); - this.slotManager = slotManager; + this.slotManager = checkNotNull(slotManager); this.jobMasterLeaderRetrievalListeners = new HashSet<>(); this.taskExecutorGateways = new HashMap<>(); + infoMessageListeners = new HashMap<>(); } @Override @@ -103,6 +105,8 @@ public class ResourceManager extends RpcEndpoint impleme super.start(); leaderElectionService = highAvailabilityServices.getResourceManagerLeaderElectionService(); leaderElectionService.start(this); + // framework specific initialization + initialize(); } catch (Throwable e) { log.error("A fatal error happened when starting the ResourceManager", e); throw new RuntimeException("A fatal error happened when starting the ResourceManager", e); @@ -166,12 +170,12 @@ public class ResourceManager extends RpcEndpoint impleme jobMasterLeaderInfo = LeaderRetrievalUtils.retrieveLeaderConnectionInfo( highAvailabilityServices.getJobMasterLeaderRetriever(jobID), new FiniteDuration(5, TimeUnit.SECONDS)); } catch (Exception e) { - LOG.warn("Failed to start JobMasterLeaderRetriever for JobID {}", jobID); + log.warn("Failed to start JobMasterLeaderRetriever for JobID {}", jobID); throw new Exception("Failed to retrieve JobMasterLeaderRetriever"); } if (!jobMasterLeaderId.equals(jobMasterLeaderInfo.getLeaderSessionID())) { - LOG.info("Declining registration request from non-leading JobManager {}", jobMasterAddress); + log.info("Declining registration request from non-leading JobManager {}", jobMasterAddress); throw new Exception("JobManager is not leading"); } @@ -190,7 +194,7 @@ public class ResourceManager extends RpcEndpoint impleme LeaderRetrievalService jobMasterLeaderRetriever = highAvailabilityServices.getJobMasterLeaderRetriever(jobID); jobMasterLeaderRetriever.start(jobMasterLeaderListener); } catch (Exception e) { - LOG.warn("Failed to start JobMasterLeaderRetriever for JobID {}", jobID); + log.warn("Failed to start JobMasterLeaderRetriever for JobID {}", jobID); return new RegistrationResponse.Decline("Failed to retrieve JobMasterLeaderRetriever"); } jobMasterLeaderRetrievalListeners.add(jobMasterLeaderListener); @@ -237,13 +241,24 @@ public class ResourceManager extends RpcEndpoint impleme if (throwable != null) { return new RegistrationResponse.Decline(throwable.getMessage()); } else { - InstanceID id = new InstanceID(); - TaskExecutorRegistration oldTaskExecutor = - taskExecutorGateways.put(resourceID, new TaskExecutorRegistration(taskExecutorGateway, id)); - if (oldTaskExecutor != null) { - log.warn("Receive a duplicate registration from TaskExecutor {} at ({})", resourceID, taskExecutorAddress); + WorkerType startedWorker = taskExecutorGateways.get(resourceID); + if(startedWorker != null) { + String oldWorkerAddress = startedWorker.getTaskExecutorGateway().getAddress(); + if (taskExecutorAddress.equals(oldWorkerAddress)) { + log.warn("Receive a duplicate registration from TaskExecutor {} at ({})", resourceID, taskExecutorAddress); + } else { + log.warn("Receive a duplicate registration from TaskExecutor {} at different address, previous ({}), new ({})", + resourceID, oldWorkerAddress, taskExecutorAddress); + // TODO :: suggest old taskExecutor to stop itself + slotManager.notifyTaskManagerFailure(resourceID); + startedWorker = workerStarted(resourceID, taskExecutorGateway); + taskExecutorGateways.put(resourceID, startedWorker); + } + } else { + startedWorker = workerStarted(resourceID, taskExecutorGateway); + taskExecutorGateways.put(resourceID, startedWorker); } - return new TaskExecutorRegistrationSuccess(id, 5000); + return new TaskExecutorRegistrationSuccess(startedWorker.getInstanceID(), 5000); } } }, getMainThreadExecutor()); @@ -263,14 +278,12 @@ public class ResourceManager extends RpcEndpoint impleme if (jobMasterGateway != null) { return slotManager.requestSlot(slotRequest); } else { - LOG.info("Ignoring slot request for unknown JobMaster with JobID {}", jobId); + log.info("Ignoring slot request for unknown JobMaster with JobID {}", jobId); return new SlotRequestRejected(slotRequest.getAllocationId()); } } - - // ------------------------------------------------------------------------ // Leader Contender // ------------------------------------------------------------------------ @@ -324,6 +337,158 @@ public class ResourceManager extends RpcEndpoint impleme shutDown(); } + /** + * Registers an infoMessage listener + * + * @param infoMessageListenerAddress address of infoMessage listener to register to this resource manager + */ + @RpcMethod + public void registerInfoMessageListener(final String infoMessageListenerAddress) { + if(infoMessageListeners.containsKey(infoMessageListenerAddress)) { + log.warn("Receive a duplicate registration from info message listener on ({})", infoMessageListenerAddress); + } else { + Future infoMessageListenerRpcGatewayFuture = getRpcService().connect(infoMessageListenerAddress, InfoMessageListenerRpcGateway.class); + + infoMessageListenerRpcGatewayFuture.thenAcceptAsync(new AcceptFunction() { + @Override + public void accept(InfoMessageListenerRpcGateway gateway) { + log.info("Receive a registration from info message listener on ({})", infoMessageListenerAddress); + infoMessageListeners.put(infoMessageListenerAddress, gateway); + } + }, getMainThreadExecutor()); + + infoMessageListenerRpcGatewayFuture.exceptionallyAsync(new ApplyFunction() { + @Override + public Void apply(Throwable failure) { + log.warn("Receive a registration from unreachable info message listener on ({})", infoMessageListenerAddress); + return null; + } + }, getMainThreadExecutor()); + } + } + + /** + * Unregisters an infoMessage listener + * + * @param infoMessageListenerAddress address of infoMessage listener to unregister from this resource manager + * + */ + @RpcMethod + public void unRegisterInfoMessageListener(final String infoMessageListenerAddress) { + infoMessageListeners.remove(infoMessageListenerAddress); + } + + /** + * Shutdowns cluster + * + * @param finalStatus + * @param optionalDiagnostics + */ + @RpcMethod + public void shutDownCluster(final ApplicationStatus finalStatus, final String optionalDiagnostics) { + log.info("shut down cluster because application is in {}, diagnostics {}", finalStatus, optionalDiagnostics); + shutDownApplication(finalStatus, optionalDiagnostics); + } + + /** + * This method should be called by the framework once it detects that a currently registered task executor has failed. + * + * @param resourceID Id of the worker that has failed. + * @param message An informational message that explains why the worker failed. + */ + public void notifyWorkerFailed(final ResourceID resourceID, String message) { + runAsync(new Runnable() { + @Override + public void run() { + WorkerType worker = taskExecutorGateways.remove(resourceID); + if (worker != null) { + // TODO :: suggest failed task executor to stop itself + slotManager.notifyTaskManagerFailure(resourceID); + } + } + }); + } + + /** + * Gets the number of currently started TaskManagers. + * + * @return The number of currently started TaskManagers. + */ + public int getNumberOfStartedTaskManagers() { + return taskExecutorGateways.size(); + } + + /** + * Notifies the resource manager of a fatal error. + * + *

    IMPORTANT: This should not cleanly shut down this master, but exit it in + * such a way that a high-availability setting would restart this or fail over + * to another master. + */ + public void onFatalError(final String message, final Throwable error) { + runAsync(new Runnable() { + @Override + public void run() { + fatalError(message, error); + } + }); + } + + // ------------------------------------------------------------------------ + // Framework specific behavior + // ------------------------------------------------------------------------ + + /** + * Initializes the framework specific components. + * + * @throws Exception Exceptions during initialization cause the resource manager to fail. + */ + protected abstract void initialize() throws Exception; + + /** + * Callback when a task executor register. + * + * @param resourceID The worker resource id + * @param taskExecutorGateway the task executor gateway + */ + protected abstract WorkerType workerStarted(ResourceID resourceID, TaskExecutorGateway taskExecutorGateway); + + /** + * Callback when a resource manager faced a fatal error + * @param message + * @param error + */ + protected abstract void fatalError(String message, Throwable error); + + /** + * The framework specific code for shutting down the application. This should report the + * application's final status and shut down the resource manager cleanly. + * + * This method also needs to make sure all pending containers that are not registered + * yet are returned. + * + * @param finalStatus The application status to report. + * @param optionalDiagnostics An optional diagnostics message. + */ + protected abstract void shutDownApplication(ApplicationStatus finalStatus, String optionalDiagnostics); + + // ------------------------------------------------------------------------ + // Info messaging + // ------------------------------------------------------------------------ + + public void sendInfoMessage(final String message) { + runAsync(new Runnable() { + @Override + public void run() { + InfoMessage infoMessage = new InfoMessage(message); + for (InfoMessageListenerRpcGateway listenerRpcGateway : infoMessageListeners.values()) { + listenerRpcGateway + .notifyInfoMessage(infoMessage); + } + } + }); + } + private static class JobMasterLeaderListener implements LeaderRetrievalListener { private final JobID jobID; @@ -343,5 +508,6 @@ public class ResourceManager extends RpcEndpoint impleme // TODO } } + } http://git-wip-us.apache.org/repos/asf/flink/blob/6bace895/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java index 484cea7..7c44006 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.resourcemanager; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.rpc.RpcGateway; @@ -75,4 +76,26 @@ public interface ResourceManagerGateway extends RpcGateway { String taskExecutorAddress, ResourceID resourceID, @RpcTimeout Time timeout); + + /** + * Registers an infoMessage listener + * + * @param infoMessageListenerAddress address of infoMessage listener to register to this resource manager + */ + void registerInfoMessageListener(String infoMessageListenerAddress); + + /** + * Unregisters an infoMessage listener + * + * @param infoMessageListenerAddress address of infoMessage listener to unregister from this resource manager + * + */ + void unRegisterInfoMessageListener(String infoMessageListenerAddress); + + /** + * shutdown cluster + * @param finalStatus + * @param optionalDiagnostics + */ + void shutDownCluster(final ApplicationStatus finalStatus, final String optionalDiagnostics); } http://git-wip-us.apache.org/repos/asf/flink/blob/6bace895/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java new file mode 100644 index 0000000..84db1ee --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.resourcemanager; + +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.instance.InstanceID; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; + +/** + * A standalone implementation of the resource manager. Used when the system is started in + * standalone mode (via scripts), rather than via a resource framework like YARN or Mesos. + */ +public class StandaloneResourceManager extends ResourceManager { + + public StandaloneResourceManager(RpcService rpcService, + HighAvailabilityServices highAvailabilityServices, + SlotManager slotManager) { + super(rpcService, highAvailabilityServices, slotManager); + } + + @Override + protected void initialize() throws Exception { + // nothing to initialize + } + + @Override + protected void fatalError(final String message, final Throwable error) { + log.error("FATAL ERROR IN RESOURCE MANAGER: " + message, error); + // kill this process + System.exit(EXIT_CODE_FATAL_ERROR); + } + + @Override + protected TaskExecutorRegistration workerStarted(ResourceID resourceID, TaskExecutorGateway taskExecutorGateway) { + InstanceID instanceID = new InstanceID(); + TaskExecutorRegistration taskExecutorRegistration = new TaskExecutorRegistration(taskExecutorGateway, instanceID); + return taskExecutorRegistration; + } + + @Override + protected void shutDownApplication(ApplicationStatus finalStatus, String optionalDiagnostics) { + + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/6bace895/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java index 64a1191..fdb83f5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java @@ -55,7 +55,7 @@ public class ResourceManagerHATest { highAvailabilityServices.setResourceManagerLeaderElectionService(leaderElectionService); SlotManager slotManager = mock(SlotManager.class); - final ResourceManager resourceManager = new ResourceManager(rpcService, highAvailabilityServices, slotManager); + final ResourceManager resourceManager = new StandaloneResourceManager(rpcService, highAvailabilityServices, slotManager); resourceManager.start(); // before grant leadership, resourceManager's leaderId is null Assert.assertNull(resourceManager.getLeaderSessionID()); http://git-wip-us.apache.org/repos/asf/flink/blob/6bace895/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java index 332c093..8f09152 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java @@ -160,7 +160,7 @@ public class ResourceManagerJobMasterTest { TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices(); highAvailabilityServices.setResourceManagerLeaderElectionService(resourceManagerLeaderElectionService); highAvailabilityServices.setJobMasterLeaderRetriever(jobID, jobMasterLeaderRetrievalService); - ResourceManager resourceManager = new ResourceManager(rpcService, highAvailabilityServices, new SimpleSlotManager()); + ResourceManager resourceManager = new StandaloneResourceManager(rpcService, highAvailabilityServices, new SimpleSlotManager()); resourceManager.start(); return resourceManager; } http://git-wip-us.apache.org/repos/asf/flink/blob/6bace895/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java index ed7c7d7..e6d1ed5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java @@ -121,7 +121,7 @@ public class ResourceManagerTaskExecutorTest { private ResourceManager createAndStartResourceManager(TestingLeaderElectionService rmLeaderElectionService) { TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices(); highAvailabilityServices.setResourceManagerLeaderElectionService(rmLeaderElectionService); - ResourceManager resourceManager = new ResourceManager(rpcService, highAvailabilityServices, new SimpleSlotManager()); + ResourceManager resourceManager = new StandaloneResourceManager(rpcService, highAvailabilityServices, new SimpleSlotManager()); resourceManager.start(); return resourceManager; } http://git-wip-us.apache.org/repos/asf/flink/blob/6bace895/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java index 0232fab..ff25897 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java @@ -35,6 +35,7 @@ import org.apache.flink.runtime.registration.RegistrationResponse; import org.apache.flink.runtime.resourcemanager.ResourceManager; import org.apache.flink.runtime.resourcemanager.SlotRequest; import org.apache.flink.runtime.resourcemanager.SlotRequestReply; +import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager; import org.apache.flink.runtime.rpc.TestingSerialRpcService; import org.apache.flink.runtime.taskexecutor.SlotReport; import org.apache.flink.runtime.taskexecutor.SlotStatus; @@ -100,7 +101,7 @@ public class SlotProtocolTest extends TestLogger { TestingSlotManager slotManager = Mockito.spy(new TestingSlotManager()); ResourceManager resourceManager = - new ResourceManager(testRpcService, testingHaServices, slotManager); + new StandaloneResourceManager(testRpcService, testingHaServices, slotManager); resourceManager.start(); rmLeaderElectionService.isLeader(rmLeaderID); @@ -179,7 +180,7 @@ public class SlotProtocolTest extends TestLogger { TestingSlotManager slotManager = Mockito.spy(new TestingSlotManager()); ResourceManager resourceManager = - new ResourceManager(testRpcService, testingHaServices, slotManager); + new StandaloneResourceManager(testRpcService, testingHaServices, slotManager); resourceManager.start(); rmLeaderElectionService.isLeader(rmLeaderID);