flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [9/9] flink git commit: [FLINK-8900] [yarn] Properly unregister application from Yarn RM
Date Fri, 23 Mar 2018 11:40:22 GMT
[FLINK-8900] [yarn] Properly unregister application from Yarn RM

This closes #5741.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c94b88aa
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c94b88aa
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c94b88aa

Branch: refs/heads/release-1.5
Commit: c94b88aa401f4561a841aa9a986a8eeb692c7ccf
Parents: 2afb70d
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Wed Mar 21 22:19:28 2018 +0100
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Fri Mar 23 12:39:32 2018 +0100

----------------------------------------------------------------------
 .../clusterframework/MesosResourceManager.java  |  4 +-
 .../MesosResourceManagerTest.java               |  2 +-
 .../runtime/entrypoint/ClusterEntrypoint.java   | 81 ++++++++++++--------
 .../resourcemanager/ResourceManager.java        | 16 ++--
 .../resourcemanager/ResourceManagerGateway.java | 11 ++-
 .../StandaloneResourceManager.java              |  2 +-
 .../resourcemanager/TestingResourceManager.java |  2 +-
 .../utils/TestingResourceManagerGateway.java    |  4 +-
 .../apache/flink/yarn/YarnResourceManager.java  |  6 +-
 9 files changed, 77 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c94b88aa/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
index 1f58b11..4f4a6d1 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
@@ -362,9 +362,9 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN
 	}
 
 	@Override
-	protected void shutDownApplication(
+	protected void internalDeregisterApplication(
 			ApplicationStatus finalStatus,
-			@Nullable String optionalDiagnostics) throws ResourceManagerException {
+			@Nullable String diagnostics) throws ResourceManagerException {
 		LOG.info("Shutting down and unregistering as a Mesos framework.");
 
 		Exception exception = null;

http://git-wip-us.apache.org/repos/asf/flink/blob/c94b88aa/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
index 412e18d..5d9a6cf 100644
--- a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
+++ b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
@@ -737,7 +737,7 @@ public class MesosResourceManagerTest extends TestLogger {
 	public void testShutdownApplication() throws Exception {
 		new Context() {{
 			startResourceManager();
-			resourceManager.shutDownCluster(ApplicationStatus.SUCCEEDED, "");
+			resourceManager.deregisterApplication(ApplicationStatus.SUCCEEDED, "");
 
 			// verify that the Mesos framework is shutdown
 			verify(rmServices.schedulerDriver).stop(false);

http://git-wip-us.apache.org/repos/asf/flink/blob/c94b88aa/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index 50d0db3..b25729b 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -197,6 +197,7 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
 			shutDownAndTerminate(
 				STARTUP_FAILURE_RETURN_CODE,
 				ApplicationStatus.FAILED,
+				t.getMessage(),
 				false);
 		}
 	}
@@ -245,6 +246,7 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
 					shutDownAndTerminate(
 						SUCCESS_RETURN_CODE,
 						ApplicationStatus.SUCCEEDED,
+						throwable != null ? throwable.getMessage() : null,
 						true);
 				});
 		}
@@ -544,38 +546,34 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler
{
 		return resultConfiguration;
 	}
 
-	private CompletableFuture<Void> shutDownAsync(boolean cleanupHaData) {
+	private CompletableFuture<Void> shutDownAsync(
+			boolean cleanupHaData,
+			ApplicationStatus applicationStatus,
+			@Nullable String diagnostics) {
 		if (isShutDown.compareAndSet(false, true)) {
 			LOG.info("Stopping {}.", getClass().getSimpleName());
 
-			final CompletableFuture<Void> componentShutdownFuture = stopClusterComponents();
-
-			componentShutdownFuture.whenComplete(
-				(Void ignored1, Throwable componentThrowable) -> {
-					final CompletableFuture<Void> serviceShutdownFuture = stopClusterServices(cleanupHaData);
-
-					serviceShutdownFuture.whenComplete(
-						(Void ignored2, Throwable serviceThrowable) -> {
-							Throwable finalException = null;
-
-							if (serviceThrowable != null) {
-								finalException = ExceptionUtils.firstOrSuppressed(serviceThrowable, componentThrowable);
-							} else if (componentThrowable != null) {
-								finalException = componentThrowable;
-							}
-
-							try {
-								cleanupDirectories();
-							} catch (IOException e) {
-								finalException = ExceptionUtils.firstOrSuppressed(e, finalException);
-							}
-
-							if (finalException != null) {
-								terminationFuture.completeExceptionally(finalException);
-							} else {
-								terminationFuture.complete(null);
-							}
-						});
+			final CompletableFuture<Void> shutDownApplicationFuture = deregisterApplication(applicationStatus,
diagnostics);
+
+			final CompletableFuture<Void> componentShutdownFuture = FutureUtils.composeAfterwards(
+				shutDownApplicationFuture,
+				this::stopClusterComponents);
+
+			final CompletableFuture<Void> serviceShutdownFuture = FutureUtils.composeAfterwards(
+				componentShutdownFuture,
+				() -> stopClusterServices(cleanupHaData));
+
+			final CompletableFuture<Void> cleanupDirectoriesFuture = FutureUtils.runAfterwards(
+				serviceShutdownFuture,
+				this::cleanupDirectories);
+
+			cleanupDirectoriesFuture.whenComplete(
+				(Void ignored2, Throwable serviceThrowable) -> {
+					if (serviceThrowable != null) {
+						terminationFuture.completeExceptionally(serviceThrowable);
+					} else {
+						terminationFuture.complete(null);
+					}
 				});
 		}
 
@@ -585,6 +583,7 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
 	private void shutDownAndTerminate(
 		int returnCode,
 		ApplicationStatus applicationStatus,
+		@Nullable String diagnostics,
 		boolean cleanupHaData) {
 
 		if (isTerminating.compareAndSet(false, true)) {
@@ -593,7 +592,10 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler
{
 				returnCode,
 				applicationStatus);
 
-			shutDownAsync(cleanupHaData).whenComplete(
+			shutDownAsync(
+				cleanupHaData,
+				applicationStatus,
+				diagnostics).whenComplete(
 				(Void ignored, Throwable t) -> {
 					if (t != null) {
 						LOG.info("Could not properly shut down cluster entrypoint.", t);
@@ -609,6 +611,25 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler
{
 	}
 
 	/**
+	 * Deregister the Flink application from the resource management system by signalling
+	 * the {@link ResourceManager}.
+	 *
+	 * @param applicationStatus to terminate the application with
+	 * @param diagnostics additional information about the shut down, can be {@code null}
+	 * @return Future which is completed once the shut down
+	 */
+	private CompletableFuture<Void> deregisterApplication(ApplicationStatus applicationStatus,
@Nullable String diagnostics) {
+		synchronized (lock) {
+			if (resourceManager != null) {
+				final ResourceManagerGateway selfGateway = resourceManager.getSelfGateway(ResourceManagerGateway.class);
+				return selfGateway.deregisterApplication(applicationStatus, diagnostics).thenApply(ack
-> null);
+			} else {
+				return CompletableFuture.completedFuture(null);
+			}
+		}
+	}
+
+	/**
 	 * Clean up of temporary directories created by the {@link ClusterEntrypoint}.
 	 *
 	 * @throws IOException if the temporary directories could not be cleaned up

http://git-wip-us.apache.org/repos/asf/flink/blob/c94b88aa/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index cae9c6c..c753469 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -486,19 +486,21 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
 	 * Cleanup application and shut down cluster.
 	 *
 	 * @param finalStatus of the Flink application
-	 * @param optionalDiagnostics diagnostics message for the Flink application or {@code null}
+	 * @param diagnostics diagnostics message for the Flink application or {@code null}
 	 */
 	@Override
-	public void shutDownCluster(
+	public CompletableFuture<Acknowledge> deregisterApplication(
 			final ApplicationStatus finalStatus,
-			@Nullable final String optionalDiagnostics) {
-		log.info("Shut down cluster because application is in {}, diagnostics {}.", finalStatus,
optionalDiagnostics);
+			@Nullable final String diagnostics) {
+		log.info("Shut down cluster because application is in {}, diagnostics {}.", finalStatus,
diagnostics);
 
 		try {
-			shutDownApplication(finalStatus, optionalDiagnostics);
+			internalDeregisterApplication(finalStatus, diagnostics);
 		} catch (ResourceManagerException e) {
 			log.warn("Could not properly shutdown the application.", e);
 		}
+
+		return CompletableFuture.completedFuture(Acknowledge.get());
 	}
 
 	@Override
@@ -946,7 +948,7 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
 	protected abstract void initialize() throws ResourceManagerException;
 
 	/**
-	 * The framework specific code for shutting down the application. This should report the
+	 * The framework specific code to deregister the application. This should report the
 	 * application's final status and shut down the resource manager cleanly.
 	 *
 	 * <p>This method also needs to make sure all pending containers that are not registered
@@ -956,7 +958,7 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
 	 * @param optionalDiagnostics A diagnostics message or {@code null}.
 	 * @throws ResourceManagerException if the application could not be shut down.
 	 */
-	protected abstract void shutDownApplication(
+	protected abstract void internalDeregisterApplication(
 		ApplicationStatus finalStatus,
 		@Nullable String optionalDiagnostics) throws ResourceManagerException;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c94b88aa/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
index 836bc0b..bd282d6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
@@ -41,6 +41,8 @@ import org.apache.flink.runtime.taskexecutor.FileType;
 import org.apache.flink.runtime.taskexecutor.SlotReport;
 import org.apache.flink.runtime.taskexecutor.TaskExecutor;
 
+import javax.annotation.Nullable;
+
 import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
 
@@ -133,11 +135,12 @@ public interface ResourceManagerGateway extends FencedRpcGateway<ResourceManager
 	void unRegisterInfoMessageListener(String infoMessageListenerAddress);
 
 	/**
-	 * shutdown cluster
-	 * @param finalStatus
-	 * @param optionalDiagnostics
+	 * Deregister Flink from the underlying resource management system.
+	 *
+	 * @param finalStatus final status with which to deregister the Flink application
+	 * @param diagnostics additional information for the resource management system, can be
{@code null}
 	 */
-	void shutDownCluster(final ApplicationStatus finalStatus, final String optionalDiagnostics);
+	CompletableFuture<Acknowledge> deregisterApplication(final ApplicationStatus finalStatus,
@Nullable final String diagnostics);
 
 	/**
 	 * Gets the currently registered number of TaskManagers.

http://git-wip-us.apache.org/repos/asf/flink/blob/c94b88aa/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
index 7226d29..d8e0e48 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
@@ -72,7 +72,7 @@ public class StandaloneResourceManager extends ResourceManager<ResourceID>
{
 	}
 
 	@Override
-	protected void shutDownApplication(ApplicationStatus finalStatus, @Nullable String optionalDiagnostics)
{
+	protected void internalDeregisterApplication(ApplicationStatus finalStatus, @Nullable String
diagnostics) {
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/c94b88aa/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java
index 3db9be0..2bd976b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java
@@ -68,7 +68,7 @@ public class TestingResourceManager extends ResourceManager<ResourceID>
{
 	}
 
 	@Override
-	protected void shutDownApplication(ApplicationStatus finalStatus, @Nullable String optionalDiagnostics)
throws ResourceManagerException {
+	protected void internalDeregisterApplication(ApplicationStatus finalStatus, @Nullable String
diagnostics) throws ResourceManagerException {
 		// noop
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c94b88aa/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
index 33c6c08..9b40414 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
@@ -214,8 +214,8 @@ public class TestingResourceManagerGateway implements ResourceManagerGateway
{
 	}
 
 	@Override
-	public void shutDownCluster(ApplicationStatus finalStatus, String optionalDiagnostics) {
-
+	public CompletableFuture<Acknowledge> deregisterApplication(ApplicationStatus finalStatus,
String diagnostics) {
+		return CompletableFuture.completedFuture(Acknowledge.get());
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/c94b88aa/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index 97db2ad..bfe7d65 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -266,16 +266,16 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode>
impleme
 	}
 
 	@Override
-	protected void shutDownApplication(
+	protected void internalDeregisterApplication(
 		ApplicationStatus finalStatus,
-		@Nullable String optionalDiagnostics) {
+		@Nullable String diagnostics) {
 
 		// first, de-register from YARN
 		FinalApplicationStatus yarnStatus = getYarnStatus(finalStatus);
 		log.info("Unregister application from the YARN Resource Manager with final status {}.",
yarnStatus);
 
 		try {
-			resourceManagerClient.unregisterApplicationMaster(yarnStatus, optionalDiagnostics, "");
+			resourceManagerClient.unregisterApplicationMaster(yarnStatus, diagnostics, "");
 		} catch (Throwable t) {
 			log.error("Could not unregister the application master.", t);
 		}


Mime
View raw message