flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject flink git commit: [FLINK-7523] Add proper resource shutdown to ResourceManager/JobManagerRunner
Date Tue, 05 Sep 2017 10:38:26 GMT
Repository: flink
Updated Branches:
  refs/heads/master e70de0eb8 -> ff1660629


[FLINK-7523] Add proper resource shutdown to ResourceManager/JobManagerRunner

This commit waits for the completion of the shutdown of the ResourceManager
before shutting down the ResourceManagerRuntimeServices. The JobManagerServices are
now exclusively passed in to the JobManagerRunner which means that it is no
longer responsible for shutting the JobManagerServices down. Additionally,
it waits until the JobMaster has been shut down before closing the
LeaderElectionService as well as the JobManagerMetricGroup.

The JobManagerServices are now managed by the caller of the JobManagerRunner. This
allows to reuse them across multiple JobManagerRunners.

The RpcEndpoint#postStop method is now called by the UntypedActor#postStop method,
which ensures that the RpcEndpoint's method is also called if only the underlying
RpcService is shut down (without explicitly shutting down the RpcEndpoint).

This closes #4596.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ff166062
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ff166062
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ff166062

Branch: refs/heads/master
Commit: ff1660629fea73886bf9c9f802c80dd9bf84c83d
Parents: e70de0e
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Sat Aug 26 11:45:36 2017 +0200
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Tue Sep 5 12:37:52 2017 +0200

----------------------------------------------------------------------
 .../flink/runtime/dispatcher/Dispatcher.java    |  45 ++++++--
 .../dispatcher/StandaloneDispatcher.java        |   5 +-
 .../entrypoint/JobClusterEntrypoint.java        |  19 +++-
 .../runtime/jobmaster/JobManagerRunner.java     | 112 +++++--------------
 .../flink/runtime/minicluster/MiniCluster.java  |   2 +
 .../minicluster/MiniClusterJobDispatcher.java   |  23 +++-
 .../resourcemanager/ResourceManagerRunner.java  |  49 ++++----
 .../flink/runtime/rpc/akka/AkkaRpcActor.java    | 100 +++++++----------
 .../flink/runtime/rpc/akka/AkkaRpcService.java  |   6 +-
 .../runtime/rpc/messages/ControlMessage.java    |  26 -----
 .../flink/runtime/rpc/messages/Shutdown.java    |  36 ------
 .../runtime/dispatcher/DispatcherTest.java      |   3 +-
 12 files changed, 176 insertions(+), 250 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ff166062/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 00cbb2f..8977415 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -34,6 +34,7 @@ import org.apache.flink.runtime.jobmanager.OnCompletionActions;
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraph;
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
 import org.apache.flink.runtime.jobmaster.JobManagerRunner;
+import org.apache.flink.runtime.jobmaster.JobManagerServices;
 import org.apache.flink.runtime.leaderelection.LeaderContender;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.messages.Acknowledge;
@@ -69,7 +70,7 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId>
impleme
 	private final RunningJobsRegistry runningJobsRegistry;
 
 	private final HighAvailabilityServices highAvailabilityServices;
-	private final BlobServer blobServer;
+	private final JobManagerServices jobManagerServices;
 	private final HeartbeatServices heartbeatServices;
 	private final MetricRegistry metricRegistry;
 
@@ -92,7 +93,9 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId>
impleme
 
 		this.configuration = Preconditions.checkNotNull(configuration);
 		this.highAvailabilityServices = Preconditions.checkNotNull(highAvailabilityServices);
-		this.blobServer = Preconditions.checkNotNull(blobServer);
+		this.jobManagerServices = JobManagerServices.fromConfiguration(
+			configuration,
+			Preconditions.checkNotNull(blobServer));
 		this.heartbeatServices = Preconditions.checkNotNull(heartbeatServices);
 		this.metricRegistry = Preconditions.checkNotNull(metricRegistry);
 		this.fatalErrorHandler = Preconditions.checkNotNull(fatalErrorHandler);
@@ -111,11 +114,17 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId>
impleme
 
 	@Override
 	public void postStop() throws Exception {
-		Exception exception = null;
+		Throwable exception = null;
 
 		clearState();
 
 		try {
+			jobManagerServices.shutdown();
+		} catch (Throwable t) {
+			exception = ExceptionUtils.firstOrSuppressed(t, exception);
+		}
+
+		try {
 			submittedJobGraphStore.stop();
 		} catch (Exception e) {
 			exception = ExceptionUtils.firstOrSuppressed(e, exception);
@@ -184,8 +193,8 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId>
impleme
 					configuration,
 					getRpcService(),
 					highAvailabilityServices,
-					blobServer,
 					heartbeatServices,
+					jobManagerServices,
 					metricRegistry,
 					new DispatcherOnCompleteActions(jobGraph.getJobID()),
 					fatalErrorHandler);
@@ -247,13 +256,23 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId>
impleme
 	 *
 	 * <p>The state are all currently running jobs.
 	 */
-	private void clearState() {
+	private void clearState() throws Exception {
+		Exception exception = null;
+
 		// stop all currently running JobManager since they run in the same process
 		for (JobManagerRunner jobManagerRunner : jobManagerRunners.values()) {
-			jobManagerRunner.shutdown();
+			try {
+				jobManagerRunner.shutdown();
+			} catch (Exception e) {
+				exception = ExceptionUtils.firstOrSuppressed(e, exception);
+			}
 		}
 
 		jobManagerRunners.clear();
+
+		if (exception != null) {
+			throw exception;
+		}
 	}
 
 	/**
@@ -296,8 +315,8 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId>
impleme
 		Configuration configuration,
 		RpcService rpcService,
 		HighAvailabilityServices highAvailabilityServices,
-		BlobServer blobServer,
 		HeartbeatServices heartbeatServices,
+		JobManagerServices jobManagerServices,
 		MetricRegistry metricRegistry,
 		OnCompletionActions onCompleteActions,
 		FatalErrorHandler fatalErrorHandler) throws Exception;
@@ -321,7 +340,11 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId>
impleme
 
 				// clear the state if we've been the leader before
 				if (getFencingToken() != null) {
-					clearState();
+					try {
+						clearState();
+					} catch (Exception e) {
+						log.warn("Could not properly clear the Dispatcher state while granting leadership.",
e);
+					}
 				}
 
 				setFencingToken(dispatcherId);
@@ -342,7 +365,11 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId>
impleme
 		runAsyncWithoutFencing(
 			() -> {
 				log.info("Dispatcher {} was revoked leadership.", getAddress());
-				clearState();
+				try {
+					clearState();
+				} catch (Exception e) {
+					log.warn("Could not properly clear the Dispatcher state while revoking leadership.",
e);
+				}
 
 				setFencingToken(DispatcherId.generate());
 			});

http://git-wip-us.apache.org/repos/asf/flink/blob/ff166062/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
index dfd6a8a..d6d82b1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmanager.OnCompletionActions;
 import org.apache.flink.runtime.jobmaster.JobManagerRunner;
+import org.apache.flink.runtime.jobmaster.JobManagerServices;
 import org.apache.flink.runtime.jobmaster.JobMaster;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
@@ -64,8 +65,8 @@ public class StandaloneDispatcher extends Dispatcher {
 			Configuration configuration,
 			RpcService rpcService,
 			HighAvailabilityServices highAvailabilityServices,
-			BlobServer blobServer,
 			HeartbeatServices heartbeatServices,
+			JobManagerServices jobManagerServices,
 			MetricRegistry metricRegistry,
 			OnCompletionActions onCompleteActions,
 			FatalErrorHandler fatalErrorHandler) throws Exception {
@@ -76,8 +77,8 @@ public class StandaloneDispatcher extends Dispatcher {
 			configuration,
 			rpcService,
 			highAvailabilityServices,
-			blobServer,
 			heartbeatServices,
+			jobManagerServices,
 			metricRegistry,
 			onCompleteActions,
 			fatalErrorHandler);

http://git-wip-us.apache.org/repos/asf/flink/blob/ff166062/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
index e70f6c8..124c6c6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmanager.OnCompletionActions;
 import org.apache.flink.runtime.jobmaster.JobManagerRunner;
+import org.apache.flink.runtime.jobmaster.JobManagerServices;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.resourcemanager.ResourceManager;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
@@ -43,6 +44,8 @@ public abstract class JobClusterEntrypoint extends ClusterEntrypoint {
 
 	private ResourceManager<?> resourceManager;
 
+	private JobManagerServices jobManagerServices;
+
 	private JobManagerRunner jobManagerRunner;
 
 	public JobClusterEntrypoint(Configuration configuration) {
@@ -67,12 +70,14 @@ public abstract class JobClusterEntrypoint extends ClusterEntrypoint {
 			metricRegistry,
 			this);
 
+		jobManagerServices = JobManagerServices.fromConfiguration(configuration, blobServer);
+
 		jobManagerRunner = createJobManagerRunner(
 			configuration,
 			ResourceID.generate(),
 			rpcService,
 			highAvailabilityServices,
-			blobServer,
+			jobManagerServices,
 			heartbeatServices,
 			metricRegistry,
 			this);
@@ -89,7 +94,7 @@ public abstract class JobClusterEntrypoint extends ClusterEntrypoint {
 			ResourceID resourceId,
 			RpcService rpcService,
 			HighAvailabilityServices highAvailabilityServices,
-			BlobServer blobService,
+			JobManagerServices jobManagerServices,
 			HeartbeatServices heartbeatServices,
 			MetricRegistry metricRegistry,
 			FatalErrorHandler fatalErrorHandler) throws Exception {
@@ -102,8 +107,8 @@ public abstract class JobClusterEntrypoint extends ClusterEntrypoint {
 			configuration,
 			rpcService,
 			highAvailabilityServices,
-			blobService,
 			heartbeatServices,
+			jobManagerServices,
 			metricRegistry,
 			new TerminatingOnCompleteActions(jobGraph.getJobID()),
 			fatalErrorHandler);
@@ -121,6 +126,14 @@ public abstract class JobClusterEntrypoint extends ClusterEntrypoint
{
 			}
 		}
 
+		if (jobManagerServices != null) {
+			try {
+				jobManagerServices.shutdown();
+			} catch (Throwable t) {
+				exception = ExceptionUtils.firstOrSuppressed(t, exception);
+			}
+		}
+
 		if (resourceManager != null) {
 			try {
 				resourceManager.shutDown();

http://git-wip-us.apache.org/repos/asf/flink/blob/ff166062/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
index 8766fab..b5b4b82 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
@@ -22,9 +22,9 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
@@ -36,10 +36,10 @@ import org.apache.flink.runtime.leaderelection.LeaderContender;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.metrics.MetricRegistry;
-import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.util.ExceptionUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -92,53 +92,6 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions,
F
 
 	// ------------------------------------------------------------------------
 
-	public JobManagerRunner(
-			final ResourceID resourceId,
-			final JobGraph jobGraph,
-			final Configuration configuration,
-			final RpcService rpcService,
-			final HighAvailabilityServices haServices,
-			final BlobServer blobService,
-			final HeartbeatServices heartbeatServices,
-			final OnCompletionActions toNotifyOnComplete,
-			final FatalErrorHandler errorHandler) throws Exception {
-		this(
-			resourceId,
-			jobGraph,
-			configuration,
-			rpcService,
-			haServices,
-			blobService,
-			heartbeatServices,
-			new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(configuration)),
-			toNotifyOnComplete,
-			errorHandler);
-	}
-
-	public JobManagerRunner(
-			final ResourceID resourceId,
-			final JobGraph jobGraph,
-			final Configuration configuration,
-			final RpcService rpcService,
-			final HighAvailabilityServices haServices,
-			final BlobServer blobService,
-			final HeartbeatServices heartbeatServices,
-			final MetricRegistry metricRegistry,
-			final OnCompletionActions toNotifyOnComplete,
-			final FatalErrorHandler errorHandler) throws Exception {
-		this(
-			resourceId,
-			jobGraph,
-			configuration,
-			rpcService,
-			haServices,
-			heartbeatServices,
-			JobManagerServices.fromConfiguration(configuration, blobService),
-			metricRegistry,
-			toNotifyOnComplete,
-			errorHandler);
-	}
-
 	/**
 	 * 
 	 * <p>Exceptions that occur while creating the JobManager or JobManagerRunner are
directly
@@ -217,12 +170,6 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions,
F
 		}
 		catch (Throwable t) {
 			// clean up everything
-			try {
-				jobManagerServices.shutdown();
-			} catch (Throwable tt) {
-				log.error("Error while shutting down JobManager services", tt);
-			}
-
 			if (jobManagerMetrics != null) {
 				jobManagerMetrics.close();
 			}
@@ -245,40 +192,37 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions,
F
 		}
 	}
 
-	public void shutdown() {
-		shutdownInternally();
+	public void shutdown() throws Exception {
+		shutdownInternally().get();
 	}
 
-	private void shutdownInternally() {
+	private CompletableFuture<Void> shutdownInternally() {
 		synchronized (lock) {
 			shutdown = true;
 
-			if (leaderElectionService != null) {
-				try {
-					leaderElectionService.stop();
-				} catch (Throwable t) {
-					log.error("Could not properly shutdown the leader election service", t);
-				}
-			}
-
-			try {
-				jobManager.shutDown();
-			} catch (Throwable t) {
-				log.error("Error shutting down JobManager", t);
-			}
-
-			try {
-				jobManagerServices.shutdown();
-			} catch (Throwable t) {
-				log.error("Error shutting down JobManager services", t);
-			}
-
-			// make all registered metrics go away
-			try {
-				jobManagerMetricGroup.close();
-			} catch (Throwable t) {
-				log.error("Error while unregistering metrics", t);
-			}
+			jobManager.shutDown();
+
+			return jobManager.getTerminationFuture()
+				.thenAccept(
+					ignored -> {
+						Throwable exception = null;
+						try {
+							leaderElectionService.stop();
+						} catch (Throwable t) {
+							exception = ExceptionUtils.firstOrSuppressed(t, exception);
+						}
+
+						// make all registered metrics go away
+						try {
+							jobManagerMetricGroup.close();
+						} catch (Throwable t) {
+							exception = ExceptionUtils.firstOrSuppressed(t, exception);
+						}
+
+						if (exception != null) {
+							throw new FlinkFutureException("Could not properly shut down the JobManagerRunner.",
exception);
+						}
+					});
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ff166062/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 95f430c..2fe0587 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -353,6 +353,8 @@ public class MiniCluster {
 				if (tm != null) {
 					try {
 						tm.shutDown();
+						// wait for the TaskManager to properly terminate
+						tm.getTerminationFuture().get();
 					} catch (Throwable t) {
 						exception = firstOrSuppressed(t, exception);
 					}

http://git-wip-us.apache.org/repos/asf/flink/blob/ff166062/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
index 2bb94f2..60d9a66 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
@@ -33,6 +33,8 @@ import org.apache.flink.runtime.jobmaster.JobManagerServices;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -156,7 +158,7 @@ public class MiniClusterJobDispatcher {
 	 * Shuts down the mini cluster dispatcher. If a job is currently running, that job will
be
 	 * terminally failed.
 	 */
-	public void shutdown() {
+	public void shutdown() throws Exception {
 		synchronized (lock) {
 			if (!shutdown) {
 				shutdown = true;
@@ -166,14 +168,31 @@ public class MiniClusterJobDispatcher {
 				// in this shutdown code we copy the references to the stack first,
 				// to avoid concurrent modification
 
+				Throwable exception = null;
+
 				JobManagerRunner[] runners = this.runners;
 				if (runners != null) {
 					this.runners = null;
 
 					for (JobManagerRunner runner : runners) {
-						runner.shutdown();
+						try {
+							runner.shutdown();
+						} catch (Throwable e) {
+							exception = ExceptionUtils.firstOrSuppressed(e, exception);
+						}
 					}
 				}
+
+				// shut down the JobManagerServices
+				try {
+					jobManagerServices.shutdown();
+				} catch (Throwable throwable) {
+					exception = ExceptionUtils.firstOrSuppressed(throwable, exception);
+				}
+
+				if (exception != null) {
+					throw new FlinkException("Could not properly terminate all JobManagerRunners.", exception);
+				}
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/ff166062/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
index d0c411c..ed6e18c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
@@ -20,16 +20,18 @@ package org.apache.flink.runtime.resourcemanager;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.concurrent.CompletableFuture;
+
 /**
  * Simple {@link StandaloneResourceManager} runner. It instantiates the resource manager's
services
  * and handles fatal errors by shutting the resource manager down.
@@ -91,27 +93,23 @@ public class ResourceManagerRunner implements FatalErrorHandler {
 	}
 
 	public void shutDown() throws Exception {
-		shutDownInternally();
+		// wait for the completion
+		shutDownInternally().get();
 	}
 
-	private void shutDownInternally() throws Exception {
-		Exception exception = null;
+	private CompletableFuture<Void> shutDownInternally() {
 		synchronized (lock) {
-			try {
-				resourceManager.shutDown();
-			} catch (Exception e) {
-				exception = ExceptionUtils.firstOrSuppressed(e, exception);
-			}
-
-			try {
-				resourceManagerRuntimeServices.shutDown();
-			} catch (Exception e) {
-				exception = ExceptionUtils.firstOrSuppressed(e, exception);
-			}
-
-			if (exception != null) {
-				ExceptionUtils.rethrow(exception, "Error while shutting down the resource manager runner.");
-			}
+			resourceManager.shutDown();
+
+			return resourceManager.getTerminationFuture()
+				.thenAccept(
+					ignored -> {
+						try {
+							resourceManagerRuntimeServices.shutDown();
+						} catch (Exception e) {
+							throw new FlinkFutureException("Could not properly shut down the resource manager
runtime services.", e);
+						}
+					});
 		}
 	}
 
@@ -123,10 +121,13 @@ public class ResourceManagerRunner implements FatalErrorHandler {
 	public void onFatalError(Throwable exception) {
 		LOG.error("Encountered fatal error.", exception);
 
-		try {
-			shutDownInternally();
-		} catch (Exception e) {
-			LOG.error("Could not properly shut down the resource manager.", e);
-		}
+		CompletableFuture<Void> shutdownFuture = shutDownInternally();
+
+		shutdownFuture.whenComplete(
+			(Void ignored, Throwable throwable) -> {
+				if (throwable != null) {
+					LOG.error("Could not properly shut down the resource manager runner.", throwable);
+				}
+			});
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ff166062/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
index 74c1509..f6c2e8b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
@@ -18,31 +18,26 @@
 
 package org.apache.flink.runtime.rpc.akka;
 
-import akka.actor.ActorRef;
-import akka.actor.Status;
-import akka.actor.UntypedActor;
-import akka.pattern.Patterns;
 import org.apache.flink.runtime.rpc.MainThreadValidatorUtil;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException;
 import org.apache.flink.runtime.rpc.akka.exceptions.AkkaUnknownMessageException;
+import org.apache.flink.runtime.rpc.akka.messages.Processing;
+import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
 import org.apache.flink.runtime.rpc.messages.CallAsync;
-import org.apache.flink.runtime.rpc.messages.ControlMessage;
 import org.apache.flink.runtime.rpc.messages.LocalRpcInvocation;
-import org.apache.flink.runtime.rpc.akka.messages.Processing;
 import org.apache.flink.runtime.rpc.messages.RpcInvocation;
 import org.apache.flink.runtime.rpc.messages.RunAsync;
-
-import org.apache.flink.runtime.rpc.messages.Shutdown;
-import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
 import org.apache.flink.util.ExceptionUtils;
+
+import akka.actor.ActorRef;
+import akka.actor.Status;
+import akka.actor.UntypedActor;
+import akka.pattern.Patterns;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import scala.concurrent.duration.FiniteDuration;
-import scala.concurrent.impl.Promise;
-
 import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
@@ -50,6 +45,9 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
+import scala.concurrent.duration.FiniteDuration;
+import scala.concurrent.impl.Promise;
+
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -73,38 +71,47 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends
UntypedActor {
 	
 	protected final Logger log = LoggerFactory.getLogger(getClass());
 
-	/** the endpoint to invoke the methods on */
+	/** the endpoint to invoke the methods on. */
 	protected final T rpcEndpoint;
 
-	/** the helper that tracks whether calls come from the main thread */
+	/** the helper that tracks whether calls come from the main thread. */
 	private final MainThreadValidatorUtil mainThreadValidator;
 
 	private final CompletableFuture<Void> terminationFuture;
 
-	/** Throwable which might have been thrown by the postStop method */
-	private Throwable shutdownThrowable;
-
 	AkkaRpcActor(final T rpcEndpoint, final CompletableFuture<Void> terminationFuture)
{
 		this.rpcEndpoint = checkNotNull(rpcEndpoint, "rpc endpoint");
 		this.mainThreadValidator = new MainThreadValidatorUtil(rpcEndpoint);
 		this.terminationFuture = checkNotNull(terminationFuture);
-
-		this.shutdownThrowable = null;
 	}
 
 	@Override
 	public void postStop() throws Exception {
-		super.postStop();
+		mainThreadValidator.enterMainThread();
 
-		// IMPORTANT: This only works if we don't use a restarting supervisor strategy. Otherwise
-		// we would complete the future and let the actor system restart the actor with a completed
-		// future.
-		// Complete the termination future so that others know that we've stopped.
+		try {
+			Throwable shutdownThrowable = null;
 
-		if (shutdownThrowable != null) {
-			terminationFuture.completeExceptionally(shutdownThrowable);
-		} else {
-			terminationFuture.complete(null);
+			try {
+				rpcEndpoint.postStop();
+			} catch (Throwable throwable) {
+				shutdownThrowable = throwable;
+			}
+
+			super.postStop();
+
+			// IMPORTANT: This only works if we don't use a restarting supervisor strategy. Otherwise
+			// we would complete the future and let the actor system restart the actor with a completed
+			// future.
+			// Complete the termination future so that others know that we've stopped.
+
+			if (shutdownThrowable != null) {
+				terminationFuture.completeExceptionally(shutdownThrowable);
+			} else {
+				terminationFuture.complete(null);
+			}
+		} finally {
+			mainThreadValidator.exitMainThread();
 		}
 	}
 
@@ -119,11 +126,7 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends
UntypedActor {
 						mainThreadValidator.enterMainThread();
 
 						try {
-							if (msg instanceof ControlMessage) {
-								handleControlMessage(((ControlMessage) msg));
-							} else {
-								handleMessage(msg);
-							}
+							handleMessage(msg);
 						} finally {
 							mainThreadValidator.exitMainThread();
 						}
@@ -139,20 +142,6 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends
UntypedActor {
 		}
 	}
 
-	private void handleControlMessage(ControlMessage controlMessage) {
-		if (controlMessage instanceof Shutdown) {
-			triggerShutdown();
-		} else {
-			log.warn(
-				"Received control message of unknown type {} with value {}. Dropping this control message!",
-				controlMessage.getClass().getName(),
-				controlMessage);
-
-			sendErrorIfSender(new AkkaUnknownMessageException("Received unknown control message "
+ controlMessage +
-				" of type " + controlMessage.getClass().getSimpleName() + '.'));
-		}
-	}
-
 	protected void handleMessage(Object message) {
 		if (message instanceof RunAsync) {
 			handleRunAsync((RunAsync) message);
@@ -186,7 +175,7 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends
UntypedActor {
 			Class<?>[] parameterTypes = rpcInvocation.getParameterTypes();
 
 			rpcMethod = lookupRpcMethod(methodName, parameterTypes);
-		} catch(ClassNotFoundException e) {
+		} catch (ClassNotFoundException e) {
 			log.error("Could not load method arguments.", e);
 
 			RpcConnectionException rpcException = new RpcConnectionException("Could not load method
arguments.", e);
@@ -294,7 +283,7 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends
UntypedActor {
 				runAsync.getClass().getName());
 		}
 		else {
-			final long timeToRun = runAsync.getTimeNanos(); 
+			final long timeToRun = runAsync.getTimeNanos();
 			final long delayNanos;
 
 			if (timeToRun == 0 || (delayNanos = timeToRun - System.nanoTime()) <= 0) {
@@ -307,7 +296,7 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends
UntypedActor {
 				}
 			}
 			else {
-				// schedule for later. send a new message after the delay, which will then be immediately
executed 
+				// schedule for later. send a new message after the delay, which will then be immediately
executed
 				FiniteDuration delay = new FiniteDuration(delayNanos, TimeUnit.NANOSECONDS);
 				RunAsync message = new RunAsync(runAsync.getRunnable(), timeToRun);
 
@@ -317,17 +306,6 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends
UntypedActor {
 		}
 	}
 
-	private void triggerShutdown() {
-		try {
-			rpcEndpoint.postStop();
-		} catch (Throwable throwable) {
-			shutdownThrowable = throwable;
-		}
-
-		// now stop the actor which will stop processing of any further messages
-		getContext().system().stop(getSelf());
-	}
-
 	/**
 	 * Look up the rpc method on the given {@link RpcEndpoint} instance.
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/ff166062/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
index 536a789..07b334d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
@@ -42,7 +42,6 @@ import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.RpcServer;
 import org.apache.flink.runtime.rpc.RpcUtils;
-import org.apache.flink.runtime.rpc.messages.Shutdown;
 import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
 import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
@@ -297,7 +296,7 @@ public class AkkaRpcService implements RpcService {
 			if (fromThisService) {
 				ActorRef selfActorRef = akkaClient.getRpcEndpoint();
 				LOG.info("Trigger shut down of RPC endpoint {}.", selfActorRef.path());
-				selfActorRef.tell(Shutdown.getInstance(), ActorRef.noSender());
+				actorSystem.stop(selfActorRef);
 			} else {
 				LOG.debug("RPC endpoint {} already stopped or from different RPC service");
 			}
@@ -314,11 +313,14 @@ public class AkkaRpcService implements RpcService {
 			}
 
 			stopped = true;
+
 			actorSystem.shutdown();
 			actors.clear();
 		}
 
 		actorSystem.awaitTermination();
+
+		LOG.info("Stopped Akka RPC service.");
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/ff166062/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/ControlMessage.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/ControlMessage.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/ControlMessage.java
deleted file mode 100644
index c16bdd7..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/ControlMessage.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rpc.messages;
-
-/**
- * Base interface for control messages which are treated separately by the RPC server
- * implementation.
- */
-public interface ControlMessage {
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ff166062/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/Shutdown.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/Shutdown.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/Shutdown.java
deleted file mode 100644
index 50b076c..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/Shutdown.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rpc.messages;
-
-import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
-
-/**
- * Shut down message used to trigger the shut down of an AkkaRpcActor. This
- * message is only intended for internal use by the {@link AkkaRpcService}.
- */
-public final class Shutdown implements ControlMessage {
-
-	private static Shutdown instance = new Shutdown();
-
-	public static Shutdown getInstance() {
-		return instance;
-	}
-
-	private Shutdown() {}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ff166062/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index 8846686..da76115 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmanager.OnCompletionActions;
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
 import org.apache.flink.runtime.jobmaster.JobManagerRunner;
+import org.apache.flink.runtime.jobmaster.JobManagerServices;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.metrics.MetricRegistry;
@@ -216,8 +217,8 @@ public class DispatcherTest extends TestLogger {
 				Configuration configuration,
 				RpcService rpcService,
 				HighAvailabilityServices highAvailabilityServices,
-				BlobServer blobServer,
 				HeartbeatServices heartbeatServices,
+				JobManagerServices jobManagerServices,
 				MetricRegistry metricRegistry,
 				OnCompletionActions onCompleteActions,
 				FatalErrorHandler fatalErrorHandler) throws Exception {


Mime
View raw message