Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 5D11A200CFA for ; Tue, 5 Sep 2017 12:38:29 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 5BAC416548F; Tue, 5 Sep 2017 10:38:29 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id D194216548D for ; Tue, 5 Sep 2017 12:38:27 +0200 (CEST) Received: (qmail 31149 invoked by uid 500); 5 Sep 2017 10:38:26 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 31140 invoked by uid 99); 5 Sep 2017 10:38:26 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 05 Sep 2017 10:38:26 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id A6390F552A; Tue, 5 Sep 2017 10:38:26 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: trohrmann@apache.org To: commits@flink.apache.org Message-Id: <9f42750865454ebe9787d453d7fa7b85@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: flink git commit: [FLINK-7523] Add proper resource shutdown to ResourceManager/JobManagerRunner Date: Tue, 5 Sep 2017 10:38:26 +0000 (UTC) archived-at: Tue, 05 Sep 2017 10:38:29 -0000 Repository: flink Updated Branches: refs/heads/master e70de0eb8 -> ff1660629 [FLINK-7523] Add proper resource shutdown to ResourceManager/JobManagerRunner This commit waits for the completion of the shutdown of the ResourceManager before shutting down the ResourceManagerRuntimeServices. The JobManagerServices are now exclusively passed in to the JobManagerRunner which means that it is no longer responsible for shutting the JobManagerServices down. Additionally, it waits until the JobMaster has been shut down before closing the LeaderElectionService as well as the JobManagerMetricGroup. The JobManagerServices are now managed by the caller of the JobManagerRunner. This allows to reuse them across multiple JobManagerRunners. The RpcEndpoint#postStop method is now called by the UntypedActor#postStop method, which ensures that the RpcEndpoint's method is also called if only the underlying RpcService is shut down (without explicitly shutting down the RpcEndpoint). This closes #4596. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ff166062 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ff166062 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ff166062 Branch: refs/heads/master Commit: ff1660629fea73886bf9c9f802c80dd9bf84c83d Parents: e70de0e Author: Till Rohrmann Authored: Sat Aug 26 11:45:36 2017 +0200 Committer: Till Rohrmann Committed: Tue Sep 5 12:37:52 2017 +0200 ---------------------------------------------------------------------- .../flink/runtime/dispatcher/Dispatcher.java | 45 ++++++-- .../dispatcher/StandaloneDispatcher.java | 5 +- .../entrypoint/JobClusterEntrypoint.java | 19 +++- .../runtime/jobmaster/JobManagerRunner.java | 112 +++++-------------- .../flink/runtime/minicluster/MiniCluster.java | 2 + .../minicluster/MiniClusterJobDispatcher.java | 23 +++- .../resourcemanager/ResourceManagerRunner.java | 49 ++++---- .../flink/runtime/rpc/akka/AkkaRpcActor.java | 100 +++++++---------- .../flink/runtime/rpc/akka/AkkaRpcService.java | 6 +- .../runtime/rpc/messages/ControlMessage.java | 26 ----- .../flink/runtime/rpc/messages/Shutdown.java | 36 ------ .../runtime/dispatcher/DispatcherTest.java | 3 +- 12 files changed, 176 insertions(+), 250 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/ff166062/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java index 00cbb2f..8977415 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java @@ -34,6 +34,7 @@ import org.apache.flink.runtime.jobmanager.OnCompletionActions; import org.apache.flink.runtime.jobmanager.SubmittedJobGraph; import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore; import org.apache.flink.runtime.jobmaster.JobManagerRunner; +import org.apache.flink.runtime.jobmaster.JobManagerServices; import org.apache.flink.runtime.leaderelection.LeaderContender; import org.apache.flink.runtime.leaderelection.LeaderElectionService; import org.apache.flink.runtime.messages.Acknowledge; @@ -69,7 +70,7 @@ public abstract class Dispatcher extends FencedRpcEndpoint impleme private final RunningJobsRegistry runningJobsRegistry; private final HighAvailabilityServices highAvailabilityServices; - private final BlobServer blobServer; + private final JobManagerServices jobManagerServices; private final HeartbeatServices heartbeatServices; private final MetricRegistry metricRegistry; @@ -92,7 +93,9 @@ public abstract class Dispatcher extends FencedRpcEndpoint impleme this.configuration = Preconditions.checkNotNull(configuration); this.highAvailabilityServices = Preconditions.checkNotNull(highAvailabilityServices); - this.blobServer = Preconditions.checkNotNull(blobServer); + this.jobManagerServices = JobManagerServices.fromConfiguration( + configuration, + Preconditions.checkNotNull(blobServer)); this.heartbeatServices = Preconditions.checkNotNull(heartbeatServices); this.metricRegistry = Preconditions.checkNotNull(metricRegistry); this.fatalErrorHandler = Preconditions.checkNotNull(fatalErrorHandler); @@ -111,11 +114,17 @@ public abstract class Dispatcher extends FencedRpcEndpoint impleme @Override public void postStop() throws Exception { - Exception exception = null; + Throwable exception = null; clearState(); try { + jobManagerServices.shutdown(); + } catch (Throwable t) { + exception = ExceptionUtils.firstOrSuppressed(t, exception); + } + + try { submittedJobGraphStore.stop(); } catch (Exception e) { exception = ExceptionUtils.firstOrSuppressed(e, exception); @@ -184,8 +193,8 @@ public abstract class Dispatcher extends FencedRpcEndpoint impleme configuration, getRpcService(), highAvailabilityServices, - blobServer, heartbeatServices, + jobManagerServices, metricRegistry, new DispatcherOnCompleteActions(jobGraph.getJobID()), fatalErrorHandler); @@ -247,13 +256,23 @@ public abstract class Dispatcher extends FencedRpcEndpoint impleme * *

The state are all currently running jobs. */ - private void clearState() { + private void clearState() throws Exception { + Exception exception = null; + // stop all currently running JobManager since they run in the same process for (JobManagerRunner jobManagerRunner : jobManagerRunners.values()) { - jobManagerRunner.shutdown(); + try { + jobManagerRunner.shutdown(); + } catch (Exception e) { + exception = ExceptionUtils.firstOrSuppressed(e, exception); + } } jobManagerRunners.clear(); + + if (exception != null) { + throw exception; + } } /** @@ -296,8 +315,8 @@ public abstract class Dispatcher extends FencedRpcEndpoint impleme Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, - BlobServer blobServer, HeartbeatServices heartbeatServices, + JobManagerServices jobManagerServices, MetricRegistry metricRegistry, OnCompletionActions onCompleteActions, FatalErrorHandler fatalErrorHandler) throws Exception; @@ -321,7 +340,11 @@ public abstract class Dispatcher extends FencedRpcEndpoint impleme // clear the state if we've been the leader before if (getFencingToken() != null) { - clearState(); + try { + clearState(); + } catch (Exception e) { + log.warn("Could not properly clear the Dispatcher state while granting leadership.", e); + } } setFencingToken(dispatcherId); @@ -342,7 +365,11 @@ public abstract class Dispatcher extends FencedRpcEndpoint impleme runAsyncWithoutFencing( () -> { log.info("Dispatcher {} was revoked leadership.", getAddress()); - clearState(); + try { + clearState(); + } catch (Exception e) { + log.warn("Could not properly clear the Dispatcher state while revoking leadership.", e); + } setFencingToken(DispatcherId.generate()); }); http://git-wip-us.apache.org/repos/asf/flink/blob/ff166062/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java index dfd6a8a..d6d82b1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java @@ -26,6 +26,7 @@ import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobmanager.OnCompletionActions; import org.apache.flink.runtime.jobmaster.JobManagerRunner; +import org.apache.flink.runtime.jobmaster.JobManagerServices; import org.apache.flink.runtime.jobmaster.JobMaster; import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.rpc.FatalErrorHandler; @@ -64,8 +65,8 @@ public class StandaloneDispatcher extends Dispatcher { Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, - BlobServer blobServer, HeartbeatServices heartbeatServices, + JobManagerServices jobManagerServices, MetricRegistry metricRegistry, OnCompletionActions onCompleteActions, FatalErrorHandler fatalErrorHandler) throws Exception { @@ -76,8 +77,8 @@ public class StandaloneDispatcher extends Dispatcher { configuration, rpcService, highAvailabilityServices, - blobServer, heartbeatServices, + jobManagerServices, metricRegistry, onCompleteActions, fatalErrorHandler); http://git-wip-us.apache.org/repos/asf/flink/blob/ff166062/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java index e70f6c8..124c6c6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java @@ -28,6 +28,7 @@ import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobmanager.OnCompletionActions; import org.apache.flink.runtime.jobmaster.JobManagerRunner; +import org.apache.flink.runtime.jobmaster.JobManagerServices; import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.resourcemanager.ResourceManager; import org.apache.flink.runtime.rpc.FatalErrorHandler; @@ -43,6 +44,8 @@ public abstract class JobClusterEntrypoint extends ClusterEntrypoint { private ResourceManager resourceManager; + private JobManagerServices jobManagerServices; + private JobManagerRunner jobManagerRunner; public JobClusterEntrypoint(Configuration configuration) { @@ -67,12 +70,14 @@ public abstract class JobClusterEntrypoint extends ClusterEntrypoint { metricRegistry, this); + jobManagerServices = JobManagerServices.fromConfiguration(configuration, blobServer); + jobManagerRunner = createJobManagerRunner( configuration, ResourceID.generate(), rpcService, highAvailabilityServices, - blobServer, + jobManagerServices, heartbeatServices, metricRegistry, this); @@ -89,7 +94,7 @@ public abstract class JobClusterEntrypoint extends ClusterEntrypoint { ResourceID resourceId, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, - BlobServer blobService, + JobManagerServices jobManagerServices, HeartbeatServices heartbeatServices, MetricRegistry metricRegistry, FatalErrorHandler fatalErrorHandler) throws Exception { @@ -102,8 +107,8 @@ public abstract class JobClusterEntrypoint extends ClusterEntrypoint { configuration, rpcService, highAvailabilityServices, - blobService, heartbeatServices, + jobManagerServices, metricRegistry, new TerminatingOnCompleteActions(jobGraph.getJobID()), fatalErrorHandler); @@ -121,6 +126,14 @@ public abstract class JobClusterEntrypoint extends ClusterEntrypoint { } } + if (jobManagerServices != null) { + try { + jobManagerServices.shutdown(); + } catch (Throwable t) { + exception = ExceptionUtils.firstOrSuppressed(t, exception); + } + } + if (resourceManager != null) { try { resourceManager.shutDown(); http://git-wip-us.apache.org/repos/asf/flink/blob/ff166062/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java index 8766fab..b5b4b82 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java @@ -22,9 +22,9 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.blob.BlobServer; import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.concurrent.FlinkFutureException; import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager; import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; @@ -36,10 +36,10 @@ import org.apache.flink.runtime.leaderelection.LeaderContender; import org.apache.flink.runtime.leaderelection.LeaderElectionService; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.metrics.MetricRegistry; -import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.util.ExceptionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -92,53 +92,6 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F // ------------------------------------------------------------------------ - public JobManagerRunner( - final ResourceID resourceId, - final JobGraph jobGraph, - final Configuration configuration, - final RpcService rpcService, - final HighAvailabilityServices haServices, - final BlobServer blobService, - final HeartbeatServices heartbeatServices, - final OnCompletionActions toNotifyOnComplete, - final FatalErrorHandler errorHandler) throws Exception { - this( - resourceId, - jobGraph, - configuration, - rpcService, - haServices, - blobService, - heartbeatServices, - new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(configuration)), - toNotifyOnComplete, - errorHandler); - } - - public JobManagerRunner( - final ResourceID resourceId, - final JobGraph jobGraph, - final Configuration configuration, - final RpcService rpcService, - final HighAvailabilityServices haServices, - final BlobServer blobService, - final HeartbeatServices heartbeatServices, - final MetricRegistry metricRegistry, - final OnCompletionActions toNotifyOnComplete, - final FatalErrorHandler errorHandler) throws Exception { - this( - resourceId, - jobGraph, - configuration, - rpcService, - haServices, - heartbeatServices, - JobManagerServices.fromConfiguration(configuration, blobService), - metricRegistry, - toNotifyOnComplete, - errorHandler); - } - /** * *

Exceptions that occur while creating the JobManager or JobManagerRunner are directly @@ -217,12 +170,6 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F } catch (Throwable t) { // clean up everything - try { - jobManagerServices.shutdown(); - } catch (Throwable tt) { - log.error("Error while shutting down JobManager services", tt); - } - if (jobManagerMetrics != null) { jobManagerMetrics.close(); } @@ -245,40 +192,37 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F } } - public void shutdown() { - shutdownInternally(); + public void shutdown() throws Exception { + shutdownInternally().get(); } - private void shutdownInternally() { + private CompletableFuture shutdownInternally() { synchronized (lock) { shutdown = true; - if (leaderElectionService != null) { - try { - leaderElectionService.stop(); - } catch (Throwable t) { - log.error("Could not properly shutdown the leader election service", t); - } - } - - try { - jobManager.shutDown(); - } catch (Throwable t) { - log.error("Error shutting down JobManager", t); - } - - try { - jobManagerServices.shutdown(); - } catch (Throwable t) { - log.error("Error shutting down JobManager services", t); - } - - // make all registered metrics go away - try { - jobManagerMetricGroup.close(); - } catch (Throwable t) { - log.error("Error while unregistering metrics", t); - } + jobManager.shutDown(); + + return jobManager.getTerminationFuture() + .thenAccept( + ignored -> { + Throwable exception = null; + try { + leaderElectionService.stop(); + } catch (Throwable t) { + exception = ExceptionUtils.firstOrSuppressed(t, exception); + } + + // make all registered metrics go away + try { + jobManagerMetricGroup.close(); + } catch (Throwable t) { + exception = ExceptionUtils.firstOrSuppressed(t, exception); + } + + if (exception != null) { + throw new FlinkFutureException("Could not properly shut down the JobManagerRunner.", exception); + } + }); } } http://git-wip-us.apache.org/repos/asf/flink/blob/ff166062/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java index 95f430c..2fe0587 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java @@ -353,6 +353,8 @@ public class MiniCluster { if (tm != null) { try { tm.shutDown(); + // wait for the TaskManager to properly terminate + tm.getTerminationFuture().get(); } catch (Throwable t) { exception = firstOrSuppressed(t, exception); } http://git-wip-us.apache.org/repos/asf/flink/blob/ff166062/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java index 2bb94f2..60d9a66 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java @@ -33,6 +33,8 @@ import org.apache.flink.runtime.jobmaster.JobManagerServices; import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FlinkException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -156,7 +158,7 @@ public class MiniClusterJobDispatcher { * Shuts down the mini cluster dispatcher. If a job is currently running, that job will be * terminally failed. */ - public void shutdown() { + public void shutdown() throws Exception { synchronized (lock) { if (!shutdown) { shutdown = true; @@ -166,14 +168,31 @@ public class MiniClusterJobDispatcher { // in this shutdown code we copy the references to the stack first, // to avoid concurrent modification + Throwable exception = null; + JobManagerRunner[] runners = this.runners; if (runners != null) { this.runners = null; for (JobManagerRunner runner : runners) { - runner.shutdown(); + try { + runner.shutdown(); + } catch (Throwable e) { + exception = ExceptionUtils.firstOrSuppressed(e, exception); + } } } + + // shut down the JobManagerServices + try { + jobManagerServices.shutdown(); + } catch (Throwable throwable) { + exception = ExceptionUtils.firstOrSuppressed(throwable, exception); + } + + if (exception != null) { + throw new FlinkException("Could not properly terminate all JobManagerRunners.", exception); + } } } } http://git-wip-us.apache.org/repos/asf/flink/blob/ff166062/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java index d0c411c..ed6e18c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java @@ -20,16 +20,18 @@ package org.apache.flink.runtime.resourcemanager; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.concurrent.FlinkFutureException; import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; -import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.concurrent.CompletableFuture; + /** * Simple {@link StandaloneResourceManager} runner. It instantiates the resource manager's services * and handles fatal errors by shutting the resource manager down. @@ -91,27 +93,23 @@ public class ResourceManagerRunner implements FatalErrorHandler { } public void shutDown() throws Exception { - shutDownInternally(); + // wait for the completion + shutDownInternally().get(); } - private void shutDownInternally() throws Exception { - Exception exception = null; + private CompletableFuture shutDownInternally() { synchronized (lock) { - try { - resourceManager.shutDown(); - } catch (Exception e) { - exception = ExceptionUtils.firstOrSuppressed(e, exception); - } - - try { - resourceManagerRuntimeServices.shutDown(); - } catch (Exception e) { - exception = ExceptionUtils.firstOrSuppressed(e, exception); - } - - if (exception != null) { - ExceptionUtils.rethrow(exception, "Error while shutting down the resource manager runner."); - } + resourceManager.shutDown(); + + return resourceManager.getTerminationFuture() + .thenAccept( + ignored -> { + try { + resourceManagerRuntimeServices.shutDown(); + } catch (Exception e) { + throw new FlinkFutureException("Could not properly shut down the resource manager runtime services.", e); + } + }); } } @@ -123,10 +121,13 @@ public class ResourceManagerRunner implements FatalErrorHandler { public void onFatalError(Throwable exception) { LOG.error("Encountered fatal error.", exception); - try { - shutDownInternally(); - } catch (Exception e) { - LOG.error("Could not properly shut down the resource manager.", e); - } + CompletableFuture shutdownFuture = shutDownInternally(); + + shutdownFuture.whenComplete( + (Void ignored, Throwable throwable) -> { + if (throwable != null) { + LOG.error("Could not properly shut down the resource manager runner.", throwable); + } + }); } } http://git-wip-us.apache.org/repos/asf/flink/blob/ff166062/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java index 74c1509..f6c2e8b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java @@ -18,31 +18,26 @@ package org.apache.flink.runtime.rpc.akka; -import akka.actor.ActorRef; -import akka.actor.Status; -import akka.actor.UntypedActor; -import akka.pattern.Patterns; import org.apache.flink.runtime.rpc.MainThreadValidatorUtil; import org.apache.flink.runtime.rpc.RpcEndpoint; import org.apache.flink.runtime.rpc.RpcGateway; import org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException; import org.apache.flink.runtime.rpc.akka.exceptions.AkkaUnknownMessageException; +import org.apache.flink.runtime.rpc.akka.messages.Processing; +import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException; import org.apache.flink.runtime.rpc.messages.CallAsync; -import org.apache.flink.runtime.rpc.messages.ControlMessage; import org.apache.flink.runtime.rpc.messages.LocalRpcInvocation; -import org.apache.flink.runtime.rpc.akka.messages.Processing; import org.apache.flink.runtime.rpc.messages.RpcInvocation; import org.apache.flink.runtime.rpc.messages.RunAsync; - -import org.apache.flink.runtime.rpc.messages.Shutdown; -import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException; import org.apache.flink.util.ExceptionUtils; + +import akka.actor.ActorRef; +import akka.actor.Status; +import akka.actor.UntypedActor; +import akka.pattern.Patterns; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.concurrent.duration.FiniteDuration; -import scala.concurrent.impl.Promise; - import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; @@ -50,6 +45,9 @@ import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import scala.concurrent.duration.FiniteDuration; +import scala.concurrent.impl.Promise; + import static org.apache.flink.util.Preconditions.checkNotNull; /** @@ -73,38 +71,47 @@ class AkkaRpcActor extends UntypedActor { protected final Logger log = LoggerFactory.getLogger(getClass()); - /** the endpoint to invoke the methods on */ + /** the endpoint to invoke the methods on. */ protected final T rpcEndpoint; - /** the helper that tracks whether calls come from the main thread */ + /** the helper that tracks whether calls come from the main thread. */ private final MainThreadValidatorUtil mainThreadValidator; private final CompletableFuture terminationFuture; - /** Throwable which might have been thrown by the postStop method */ - private Throwable shutdownThrowable; - AkkaRpcActor(final T rpcEndpoint, final CompletableFuture terminationFuture) { this.rpcEndpoint = checkNotNull(rpcEndpoint, "rpc endpoint"); this.mainThreadValidator = new MainThreadValidatorUtil(rpcEndpoint); this.terminationFuture = checkNotNull(terminationFuture); - - this.shutdownThrowable = null; } @Override public void postStop() throws Exception { - super.postStop(); + mainThreadValidator.enterMainThread(); - // IMPORTANT: This only works if we don't use a restarting supervisor strategy. Otherwise - // we would complete the future and let the actor system restart the actor with a completed - // future. - // Complete the termination future so that others know that we've stopped. + try { + Throwable shutdownThrowable = null; - if (shutdownThrowable != null) { - terminationFuture.completeExceptionally(shutdownThrowable); - } else { - terminationFuture.complete(null); + try { + rpcEndpoint.postStop(); + } catch (Throwable throwable) { + shutdownThrowable = throwable; + } + + super.postStop(); + + // IMPORTANT: This only works if we don't use a restarting supervisor strategy. Otherwise + // we would complete the future and let the actor system restart the actor with a completed + // future. + // Complete the termination future so that others know that we've stopped. + + if (shutdownThrowable != null) { + terminationFuture.completeExceptionally(shutdownThrowable); + } else { + terminationFuture.complete(null); + } + } finally { + mainThreadValidator.exitMainThread(); } } @@ -119,11 +126,7 @@ class AkkaRpcActor extends UntypedActor { mainThreadValidator.enterMainThread(); try { - if (msg instanceof ControlMessage) { - handleControlMessage(((ControlMessage) msg)); - } else { - handleMessage(msg); - } + handleMessage(msg); } finally { mainThreadValidator.exitMainThread(); } @@ -139,20 +142,6 @@ class AkkaRpcActor extends UntypedActor { } } - private void handleControlMessage(ControlMessage controlMessage) { - if (controlMessage instanceof Shutdown) { - triggerShutdown(); - } else { - log.warn( - "Received control message of unknown type {} with value {}. Dropping this control message!", - controlMessage.getClass().getName(), - controlMessage); - - sendErrorIfSender(new AkkaUnknownMessageException("Received unknown control message " + controlMessage + - " of type " + controlMessage.getClass().getSimpleName() + '.')); - } - } - protected void handleMessage(Object message) { if (message instanceof RunAsync) { handleRunAsync((RunAsync) message); @@ -186,7 +175,7 @@ class AkkaRpcActor extends UntypedActor { Class[] parameterTypes = rpcInvocation.getParameterTypes(); rpcMethod = lookupRpcMethod(methodName, parameterTypes); - } catch(ClassNotFoundException e) { + } catch (ClassNotFoundException e) { log.error("Could not load method arguments.", e); RpcConnectionException rpcException = new RpcConnectionException("Could not load method arguments.", e); @@ -294,7 +283,7 @@ class AkkaRpcActor extends UntypedActor { runAsync.getClass().getName()); } else { - final long timeToRun = runAsync.getTimeNanos(); + final long timeToRun = runAsync.getTimeNanos(); final long delayNanos; if (timeToRun == 0 || (delayNanos = timeToRun - System.nanoTime()) <= 0) { @@ -307,7 +296,7 @@ class AkkaRpcActor extends UntypedActor { } } else { - // schedule for later. send a new message after the delay, which will then be immediately executed + // schedule for later. send a new message after the delay, which will then be immediately executed FiniteDuration delay = new FiniteDuration(delayNanos, TimeUnit.NANOSECONDS); RunAsync message = new RunAsync(runAsync.getRunnable(), timeToRun); @@ -317,17 +306,6 @@ class AkkaRpcActor extends UntypedActor { } } - private void triggerShutdown() { - try { - rpcEndpoint.postStop(); - } catch (Throwable throwable) { - shutdownThrowable = throwable; - } - - // now stop the actor which will stop processing of any further messages - getContext().system().stop(getSelf()); - } - /** * Look up the rpc method on the given {@link RpcEndpoint} instance. * http://git-wip-us.apache.org/repos/asf/flink/blob/ff166062/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java index 536a789..07b334d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java @@ -42,7 +42,6 @@ import org.apache.flink.runtime.rpc.RpcGateway; import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.rpc.RpcServer; import org.apache.flink.runtime.rpc.RpcUtils; -import org.apache.flink.runtime.rpc.messages.Shutdown; import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException; import org.apache.flink.util.Preconditions; import org.slf4j.Logger; @@ -297,7 +296,7 @@ public class AkkaRpcService implements RpcService { if (fromThisService) { ActorRef selfActorRef = akkaClient.getRpcEndpoint(); LOG.info("Trigger shut down of RPC endpoint {}.", selfActorRef.path()); - selfActorRef.tell(Shutdown.getInstance(), ActorRef.noSender()); + actorSystem.stop(selfActorRef); } else { LOG.debug("RPC endpoint {} already stopped or from different RPC service"); } @@ -314,11 +313,14 @@ public class AkkaRpcService implements RpcService { } stopped = true; + actorSystem.shutdown(); actors.clear(); } actorSystem.awaitTermination(); + + LOG.info("Stopped Akka RPC service."); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/ff166062/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/ControlMessage.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/ControlMessage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/ControlMessage.java deleted file mode 100644 index c16bdd7..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/ControlMessage.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.rpc.messages; - -/** - * Base interface for control messages which are treated separately by the RPC server - * implementation. - */ -public interface ControlMessage { -} http://git-wip-us.apache.org/repos/asf/flink/blob/ff166062/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/Shutdown.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/Shutdown.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/Shutdown.java deleted file mode 100644 index 50b076c..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/Shutdown.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.rpc.messages; - -import org.apache.flink.runtime.rpc.akka.AkkaRpcService; - -/** - * Shut down message used to trigger the shut down of an AkkaRpcActor. This - * message is only intended for internal use by the {@link AkkaRpcService}. - */ -public final class Shutdown implements ControlMessage { - - private static Shutdown instance = new Shutdown(); - - public static Shutdown getInstance() { - return instance; - } - - private Shutdown() {} -} http://git-wip-us.apache.org/repos/asf/flink/blob/ff166062/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java index 8846686..da76115 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java @@ -31,6 +31,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobmanager.OnCompletionActions; import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore; import org.apache.flink.runtime.jobmaster.JobManagerRunner; +import org.apache.flink.runtime.jobmaster.JobManagerServices; import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.metrics.MetricRegistry; @@ -216,8 +217,8 @@ public class DispatcherTest extends TestLogger { Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, - BlobServer blobServer, HeartbeatServices heartbeatServices, + JobManagerServices jobManagerServices, MetricRegistry metricRegistry, OnCompletionActions onCompleteActions, FatalErrorHandler fatalErrorHandler) throws Exception {