flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ches...@apache.org
Subject [02/16] flink git commit: [FLINK-5488] Close YarnClient on error in AbstractYarnClusterDescriptor
Date Sat, 01 Jul 2017 10:06:35 GMT
[FLINK-5488] Close YarnClient on error in AbstractYarnClusterDescriptor

This closes #4022.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3d2f3f65
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3d2f3f65
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3d2f3f65

Branch: refs/heads/master
Commit: 3d2f3f65f6d0722a3b93565846f31a5dd28218d1
Parents: acc2e34
Author: zjureel <zjureel@gmail.com>
Authored: Wed May 31 13:13:34 2017 +0800
Committer: zentol <chesnay@apache.org>
Committed: Sat Jul 1 10:02:07 2017 +0200

----------------------------------------------------------------------
 .../flink/yarn/AbstractYarnClusterDescriptor.java    | 15 +++++++++++++--
 1 file changed, 13 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3d2f3f65/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index c4df92a..db67e9a 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -392,6 +392,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 	@Override
 	public YarnClusterClient retrieve(String applicationID) {
 
+		YarnClient yarnClient = null;
 		try {
 			// check if required Hadoop environment variables are set. If not, warn user
 			if (System.getenv("HADOOP_CONF_DIR") == null &&
@@ -402,7 +403,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 			}
 
 			final ApplicationId yarnAppId = ConverterUtils.toApplicationId(applicationID);
-			final YarnClient yarnClient = getYarnClient();
+			yarnClient = getYarnClient();
 			final ApplicationReport appReport = yarnClient.getApplicationReport(yarnAppId);
 
 			if (appReport.getFinalApplicationStatus() != FinalApplicationStatus.UNDEFINED) {
@@ -420,6 +421,9 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 
 			return createYarnClusterClient(this, yarnClient, appReport, flinkConfiguration, false);
 		} catch (Exception e) {
+			if (null != yarnClient) {
+				yarnClient.stop();
+			}
 			throw new RuntimeException("Couldn't retrieve Yarn cluster", e);
 		}
 	}
@@ -539,7 +543,14 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 			"The allocation might take more time than usual because the Flink YARN client needs to
wait until " +
 			"the resources become available.";
 		int totalMemoryRequired = jobManagerMemoryMb + taskManagerMemoryMb * taskManagerCount;
-		ClusterResourceDescription freeClusterMem = getCurrentFreeClusterResources(yarnClient);
+		ClusterResourceDescription freeClusterMem;
+		try {
+			freeClusterMem = getCurrentFreeClusterResources(yarnClient);
+		} catch (YarnException | IOException e) {
+			failSessionDuringDeployment(yarnClient, yarnApplication);
+			throw new YarnDeploymentException("Could not retrieve information about free cluster resources.",
e);
+		}
+
 		if (freeClusterMem.totalFreeMemory < totalMemoryRequired) {
 			LOG.warn("This YARN session requires " + totalMemoryRequired + "MB of memory in the cluster.
"
 				+ "There are currently only " + freeClusterMem.totalFreeMemory + "MB available." + noteRsc);


Mime
View raw message