flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject flink git commit: [FLINK-7295] [rpc] Add postStop callback for proper shutdown of RpcEndpoints
Date Mon, 31 Jul 2017 09:23:49 GMT
Repository: flink
Updated Branches:
  refs/heads/master 49acd09ec -> 80468b15c


[FLINK-7295] [rpc] Add postStop callback for proper shutdown of RpcEndpoints

In order to execute a proper shutdown of RpcEndpoints it is necessary to have
a callback which is executed in the main thread context directly before stopping
processing of messages. This PR introduces the postStop method which acts as
this callback. All endpoint specific cleanup should be executed in this method.

The RpcEndpoint#shutDown method now only triggers the shut down of an RpcEndpoint.
In order to wait on the completion of the shut down, one has to wait on the
termination future which can be retrieved via RpcEndpoint#getTerminationFuture.

This PR also adapts the existing RpcEndpoints such that they execute their former
shutDown logic in the postStop method.

This closes #4420.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/80468b15
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/80468b15
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/80468b15

Branch: refs/heads/master
Commit: 80468b15c0e5976f2b45160f9ed833a237cb6fcd
Parents: 49acd09
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Fri Jul 28 16:13:55 2017 +0200
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Mon Jul 31 09:23:31 2017 +0200

----------------------------------------------------------------------
 .../clusterframework/MesosResourceManager.java  |  4 +-
 .../flink/runtime/dispatcher/Dispatcher.java    |  4 +-
 .../flink/runtime/jobmaster/JobMaster.java      |  7 +-
 .../resourcemanager/ResourceManager.java        |  8 +--
 .../apache/flink/runtime/rpc/RpcEndpoint.java   | 27 ++++---
 .../flink/runtime/rpc/akka/AkkaRpcActor.java    | 26 ++++++-
 .../flink/runtime/rpc/akka/AkkaRpcService.java  |  6 +-
 .../runtime/rpc/akka/messages/Shutdown.java     | 36 ++++++++++
 .../runtime/taskexecutor/TaskExecutor.java      |  4 +-
 .../runtime/rpc/akka/AkkaRpcActorTest.java      | 75 ++++++++++++++++++++
 .../apache/flink/yarn/YarnResourceManager.java  | 18 +++--
 11 files changed, 181 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/80468b15/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
index 98e9b41..260d5bf 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
@@ -320,7 +320,7 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN
 	}
 
 	@Override
-	public void shutDown() throws Exception {
+	public void postStop() throws Exception {
 		Exception exception = null;
 		FiniteDuration stopTimeout = new FiniteDuration(5L, TimeUnit.SECONDS);
 
@@ -351,7 +351,7 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN
 		}
 
 		try {
-			super.shutDown();
+			super.postStop();
 		} catch (Exception e) {
 			exception = ExceptionUtils.firstOrSuppressed(e, exception);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/80468b15/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index e0ec049..2eb0e36 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -101,7 +101,7 @@ public abstract class Dispatcher extends RpcEndpoint<DispatcherGateway>
{
 	//------------------------------------------------------
 
 	@Override
-	public void shutDown() throws Exception {
+	public void postStop() throws Exception {
 		Exception exception = null;
 		// stop all currently running JobManagerRunners
 		for (JobManagerRunner jobManagerRunner : jobManagerRunners.values()) {
@@ -117,7 +117,7 @@ public abstract class Dispatcher extends RpcEndpoint<DispatcherGateway>
{
 		}
 
 		try {
-			super.shutDown();
+			super.postStop();
 		} catch (Exception e) {
 			exception = ExceptionUtils.firstOrSuppressed(e, exception);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/80468b15/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 3a55f2e..947a914 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -320,13 +320,14 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 	 * Suspend the job and shutdown all other services including rpc.
 	 */
 	@Override
-	public void shutDown() throws Exception {
+	public void postStop() throws Exception {
 		taskManagerHeartbeatManager.stop();
 		resourceManagerHeartbeatManager.stop();
 
 		// make sure there is a graceful exit
-		getSelf().suspendExecution(new Exception("JobManager is shutting down."));
-		super.shutDown();
+		suspendExecution(new Exception("JobManager is shutting down."));
+
+		super.postStop();
 	}
 
 	//----------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/80468b15/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index ebba9ca..0dfbbcd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -204,7 +204,7 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 	}
 
 	@Override
-	public void shutDown() throws Exception {
+	public void postStop() throws Exception {
 		Exception exception = null;
 
 		taskManagerHeartbeatManager.stop();
@@ -229,14 +229,14 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 			exception = ExceptionUtils.firstOrSuppressed(e, exception);
 		}
 
+		clearState();
+
 		try {
-			super.shutDown();
+			super.postStop();
 		} catch (Exception e) {
 			exception = ExceptionUtils.firstOrSuppressed(e, exception);
 		}
 
-		clearState();
-
 		if (exception != null) {
 			ExceptionUtils.rethrowException(exception, "Error while shutting the ResourceManager down.");
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/80468b15/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
index 311fa49..331f3a3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
@@ -126,7 +126,7 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
 	}
 
 	// ------------------------------------------------------------------------
-	//  Start & Shutdown
+	//  Start & shutdown & lifecycle callbacks
 	// ------------------------------------------------------------------------
 
 	/**
@@ -143,17 +143,24 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
 	}
 
 	/**
-	 * Shuts down the underlying RPC endpoint via the RPC service.
-	 * After this method was called, the RPC endpoint will no longer be reachable, neither remotely,
-	 * not via its {@link #getSelf() self gateway}. It will also not accepts executions in main
thread
-	 * any more (via {@link #callAsync(Callable, Time)} and {@link #runAsync(Runnable)}).
-	 * 
-	 * <p>This method can be overridden to add RPC endpoint specific shut down code.
-	 * The overridden method should always call the parent shut down method.
+	 * User overridable callback.
+	 *
+	 * <p>This method is called when the RpcEndpoint is being shut down. The method is
guaranteed
+	 * to be executed in the main thread context and can be used to clean up internal state.
+	 *
+	 * IMPORTANT: This method should never be called directly by the user.
+	 *
+	 * @throws Exception if an error occurs. The exception is returned as result of the termination
future.
+	 */
+	public void postStop() throws Exception {}
+
+	/**
+	 * Triggers the shut down of the rpc endpoint. The shut down is executed asynchronously.
 	 *
-	 * @throws Exception indicating that the something went wrong while shutting the RPC endpoint
down
+	 * <p>In order to wait on the completion of the shut down, obtain the termination
future
+	 * via {@link #getTerminationFuture()}} and wait on its completion.
 	 */
-	public void shutDown() throws Exception {
+	public final void shutDown() {
 		rpcService.stopServer(self);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/80468b15/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
index 86cd83e..fe3fcc9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
@@ -37,6 +37,7 @@ import org.apache.flink.runtime.rpc.akka.messages.Processing;
 import org.apache.flink.runtime.rpc.akka.messages.RpcInvocation;
 import org.apache.flink.runtime.rpc.akka.messages.RunAsync;
 
+import org.apache.flink.runtime.rpc.akka.messages.Shutdown;
 import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
 import org.apache.flink.util.ExceptionUtils;
 import org.slf4j.Logger;
@@ -82,10 +83,15 @@ class AkkaRpcActor<C extends RpcGateway, T extends RpcEndpoint<C>>
extends Untyp
 
 	private final CompletableFuture<Void> terminationFuture;
 
+	/** Throwable which might have been thrown by the postStop method */
+	private Throwable shutdownThrowable;
+
 	AkkaRpcActor(final T rpcEndpoint, final CompletableFuture<Void> terminationFuture)
{
 		this.rpcEndpoint = checkNotNull(rpcEndpoint, "rpc endpoint");
 		this.mainThreadValidator = new MainThreadValidatorUtil(rpcEndpoint);
 		this.terminationFuture = checkNotNull(terminationFuture);
+
+		this.shutdownThrowable = null;
 	}
 
 	@Override
@@ -96,7 +102,12 @@ class AkkaRpcActor<C extends RpcGateway, T extends RpcEndpoint<C>>
extends Untyp
 		// we would complete the future and let the actor system restart the actor with a completed
 		// future.
 		// Complete the termination future so that others know that we've stopped.
-		terminationFuture.complete(null);
+
+		if (shutdownThrowable != null) {
+			terminationFuture.completeExceptionally(shutdownThrowable);
+		} else {
+			terminationFuture.complete(null);
+		}
 	}
 
 	@Override
@@ -134,6 +145,8 @@ class AkkaRpcActor<C extends RpcGateway, T extends RpcEndpoint<C>>
extends Untyp
 				handleCallAsync((CallAsync) message);
 			} else if (message instanceof RpcInvocation) {
 				handleRpcInvocation((RpcInvocation) message);
+			} else if (message instanceof Shutdown) {
+				triggerShutdown();
 			} else {
 				LOG.warn(
 					"Received message of unknown type {} with value {}. Dropping this message!",
@@ -292,6 +305,17 @@ class AkkaRpcActor<C extends RpcGateway, T extends RpcEndpoint<C>>
extends Untyp
 		}
 	}
 
+	private void triggerShutdown() {
+		try {
+			rpcEndpoint.postStop();
+		} catch (Throwable throwable) {
+			shutdownThrowable = throwable;
+		}
+
+		// now stop the actor which will stop processing of any further messages
+		getContext().system().stop(getSelf());
+	}
+
 	/**
 	 * Look up the rpc method on the given {@link RpcEndpoint} instance.
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/80468b15/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
index e17364f..2f02e8c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
@@ -25,7 +25,6 @@ import akka.actor.ActorSystem;
 import akka.actor.Address;
 import akka.actor.Cancellable;
 import akka.actor.Identify;
-import akka.actor.PoisonPill;
 import akka.actor.Props;
 import akka.dispatch.Futures;
 import akka.dispatch.Mapper;
@@ -43,6 +42,7 @@ import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.SelfGateway;
 import org.apache.flink.runtime.rpc.StartStoppable;
+import org.apache.flink.runtime.rpc.akka.messages.Shutdown;
 import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
 import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
@@ -257,8 +257,8 @@ public class AkkaRpcService implements RpcService {
 
 			if (fromThisService) {
 				ActorRef selfActorRef = akkaClient.getRpcEndpoint();
-				LOG.info("Stopping RPC endpoint {}.", selfActorRef.path());
-				selfActorRef.tell(PoisonPill.getInstance(), ActorRef.noSender());
+				LOG.info("Trigger shut down of RPC endpoint {}.", selfActorRef.path());
+				selfActorRef.tell(Shutdown.getInstance(), ActorRef.noSender());
 			} else {
 				LOG.debug("RPC endpoint {} already stopped or from different RPC service");
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/80468b15/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/Shutdown.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/Shutdown.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/Shutdown.java
new file mode 100644
index 0000000..c596d12
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/Shutdown.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka.messages;
+
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+
+/**
+ * Shut down message used to trigger the shut down of an AkkaRpcActor. This
+ * message is only intended for internal use by the {@link AkkaRpcService}.
+ */
+public final class Shutdown {
+
+	private static Shutdown instance = new Shutdown();
+
+	public static Shutdown getInstance() {
+		return instance;
+	}
+
+	private Shutdown() {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/80468b15/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 8038228..cdec08e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -248,7 +248,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway>
{
 	 * Called to shut down the TaskManager. The method closes all TaskManager services.
 	 */
 	@Override
-	public void shutDown() throws Exception {
+	public void postStop() throws Exception {
 		log.info("Stopping TaskManager {}.", getAddress());
 
 		Exception exception = null;
@@ -272,7 +272,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway>
{
 		fileCache.shutdown();
 
 		try {
-			super.shutDown();
+			super.postStop();
 		} catch (Exception e) {
 			exception = ExceptionUtils.firstOrSuppressed(e, exception);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/80468b15/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
index 3c40bc2..0b06267 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.rpc.RpcMethod;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException;
 import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
+import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
 
 import org.hamcrest.core.Is;
@@ -232,6 +233,43 @@ public class AkkaRpcActorTest extends TestLogger {
 		}
 	}
 
+	/**
+	 * Tests that exception thrown in the postStop method are returned by the termination
+	 * future.
+	 */
+	@Test
+	public void testPostStopExceptionPropagation() throws Exception {
+		FailingPostStopEndpoint rpcEndpoint = new FailingPostStopEndpoint(akkaRpcService, "FailingPostStopEndpoint");
+		rpcEndpoint.start();
+
+		rpcEndpoint.shutDown();
+
+		Future<Void> terminationFuture = rpcEndpoint.getTerminationFuture();
+
+		try {
+			terminationFuture.get();
+		} catch (ExecutionException e) {
+			assertTrue(e.getCause() instanceof FailingPostStopEndpoint.PostStopException);
+		}
+	}
+
+	/**
+	 * Checks that the postStop callback is executed within the main thread.
+	 */
+	@Test
+	public void testPostStopExecutedByMainThread() throws Exception {
+		SimpleRpcEndpoint simpleRpcEndpoint = new SimpleRpcEndpoint(akkaRpcService, "SimpleRpcEndpoint");
+		simpleRpcEndpoint.start();
+
+		simpleRpcEndpoint.shutDown();
+
+		Future<Void> terminationFuture = simpleRpcEndpoint.getTerminationFuture();
+
+		// check that we executed the postStop method in the main thread, otherwise an exception
+		// would be thrown here.
+		terminationFuture.get();
+	}
+
 	// ------------------------------------------------------------------------
 	//  Test Actors and Interfaces
 	// ------------------------------------------------------------------------
@@ -305,4 +343,41 @@ public class AkkaRpcActorTest extends TestLogger {
 			return future;
 		}
 	}
+
+	// ------------------------------------------------------------------------
+
+	private static class SimpleRpcEndpoint extends RpcEndpoint<RpcGateway> {
+
+		protected SimpleRpcEndpoint(RpcService rpcService, String endpointId) {
+			super(rpcService, endpointId);
+		}
+
+		@Override
+		public void postStop() {
+			validateRunsInMainThread();
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	private static class FailingPostStopEndpoint extends RpcEndpoint<RpcGateway> {
+
+		protected FailingPostStopEndpoint(RpcService rpcService, String endpointId) {
+			super(rpcService, endpointId);
+		}
+
+		@Override
+		public void postStop() throws Exception {
+			throw new PostStopException("Test exception.");
+		}
+
+		private static class PostStopException extends FlinkException {
+
+			private static final long serialVersionUID = 6701096588415871592L;
+
+			public PostStopException(String message) {
+				super(message);
+			}
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/80468b15/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index 6099d18..8327b6a 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -171,9 +171,10 @@ public class YarnResourceManager extends ResourceManager<ResourceID>
implements
 	}
 
 	@Override
-	public void shutDown() throws Exception {
+	public void postStop() throws Exception {
 		// shut down all components
 		Throwable firstException = null;
+
 		if (resourceManagerClient != null) {
 			try {
 				resourceManagerClient.stop();
@@ -181,21 +182,24 @@ public class YarnResourceManager extends ResourceManager<ResourceID>
implements
 				firstException = t;
 			}
 		}
+
 		if (nodeManagerClient != null) {
 			try {
 				nodeManagerClient.stop();
 			} catch (Throwable t) {
-				if (firstException == null) {
-					firstException = t;
-				} else {
-					firstException.addSuppressed(t);
-				}
+				firstException = ExceptionUtils.firstOrSuppressed(t, firstException);
 			}
 		}
+
+		try {
+			super.postStop();
+		} catch (Throwable t) {
+			firstException = ExceptionUtils.firstOrSuppressed(t, firstException);
+		}
+
 		if (firstException != null) {
 			ExceptionUtils.rethrowException(firstException, "Error while shutting down YARN resource
manager");
 		}
-		super.shutDown();
 	}
 
 	@Override


Mime
View raw message