flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [1/2] flink git commit: [FLINK-3000] Adds shutdown hook to clean up lingering yarn sessions
Date Wed, 25 Nov 2015 15:41:17 GMT
Repository: flink
Updated Branches:
  refs/heads/master 90c76ad1c -> 008060f16


[FLINK-3000] Adds shutdown hook to clean up lingering yarn sessions

This closes #1354


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/aad99f25
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/aad99f25
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/aad99f25

Branch: refs/heads/master
Commit: aad99f25cd107a8eaa24a8d407680a8186ba6460
Parents: 90c76ad
Author: Sachin Goel <sachingoel0101@gmail.com>
Authored: Sun Nov 15 06:11:30 2015 +0530
Committer: Robert Metzger <rmetzger@apache.org>
Committed: Wed Nov 25 16:20:37 2015 +0100

----------------------------------------------------------------------
 .../org/apache/flink/client/CliFrontend.java    |  8 ----
 .../apache/flink/yarn/FlinkYarnClientBase.java  | 39 +++++++++++++++++++-
 2 files changed, 37 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/aad99f25/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index 6a79677..5a57bf4 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -635,14 +635,6 @@ public class CliFrontend {
 	// --------------------------------------------------------------------------------------------
 
 	protected int executeProgramDetached(PackagedProgram program, Client client, int parallelism)
{
-		// log message for detached yarn job
-		if (yarnCluster != null) {
-			logAndSysout("The Flink YARN client has been started in detached mode. In order to stop
" +
-					"Flink on YARN, use the following command or a YARN web interface to stop it:\n" +
-					"yarn application -kill " + yarnCluster.getApplicationId() + "\n" +
-					"Please also note that the temporary files of the YARN session in the home directoy
will not be removed.");
-		}
-
 		LOG.info("Starting execution of program");
 
 		JobSubmissionResult result;

http://git-wip-us.apache.org/repos/asf/flink/blob/aad99f25/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClientBase.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClientBase.java b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClientBase.java
index 8c4a53e..74ef5c3 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClientBase.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClientBase.java
@@ -110,7 +110,7 @@ public abstract class FlinkYarnClientBase extends AbstractFlinkYarnClient
{
 	private Configuration conf;
 	private YarnClient yarnClient;
 	private YarnClientApplication yarnApplication;
-
+	private Thread deploymentFailureHook = new DeploymentFailureHook();
 
 	/**
 	 * Files (usually in a distributed file system) used for the YARN session of Flink.
@@ -629,13 +629,20 @@ public abstract class FlinkYarnClientBase extends AbstractFlinkYarnClient
{
 			appContext.setQueue(yarnQueue);
 		}
 
+		// add a hook to clean up in case deployment fails
+		Runtime.getRuntime().addShutdownHook(deploymentFailureHook);
 		LOG.info("Submitting application master " + appId);
 		yarnClient.submitApplication(appContext);
 
 		LOG.info("Waiting for the cluster to be allocated");
 		int waittime = 0;
 		loop: while( true ) {
-			ApplicationReport report = yarnClient.getApplicationReport(appId);
+			ApplicationReport report;
+			try {
+				report = yarnClient.getApplicationReport(appId);
+			} catch (IOException e) {
+				throw new YarnDeploymentException("Failed to deploy the cluster: " + e.getMessage());
+			}
 			YarnApplicationState appState = report.getYarnApplicationState();
 			switch(appState) {
 				case FAILED:
@@ -660,6 +667,19 @@ public abstract class FlinkYarnClientBase extends AbstractFlinkYarnClient
{
 			waittime += 1000;
 			Thread.sleep(1000);
 		}
+		// print the application id for user to cancel themselves.
+		if (isDetached()) {
+			LOG.info("The Flink YARN client has been started in detached mode. In order to stop "
+
+					"Flink on YARN, use the following command or a YARN web interface to stop " +
+					"it:\nyarn application -kill " + appId + "\nPlease also note that the " +
+					"temporary files of the YARN session in the home directoy will not be removed.");
+		}
+		// since deployment was successful, remove the hook
+		try {
+			Runtime.getRuntime().removeShutdownHook(deploymentFailureHook);
+		} catch (IllegalStateException e) {
+			// we're already in the shut down hook.
+		}
 		// the Flink cluster is deployed in YARN. Represent cluster
 		return new FlinkYarnCluster(yarnClient, appId, conf, flinkConfiguration, sessionFilesDir,
detached);
 	}
@@ -869,5 +889,20 @@ public abstract class FlinkYarnClientBase extends AbstractFlinkYarnClient
{
 		}
 	}
 
+	private class DeploymentFailureHook extends Thread {
+		@Override
+		public void run() {
+			LOG.info("Cancelling deployment from Deployment Failure Hook");
+			failSessionDuringDeployment();
+			LOG.info("Deleting files in " + sessionFilesDir);
+			try {
+				FileSystem fs = FileSystem.get(conf);
+				fs.delete(sessionFilesDir, true);
+				fs.close();
+			} catch (IOException e) {
+				LOG.error("Failed to delete Flink Jar and conf files in HDFS", e);
+			}
+		}
+	}
 }
 


Mime
View raw message