flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tzuli...@apache.org
Subject [4/8] flink git commit: [FLINK-6708] [yarn] Minor improvements to YARN session HA fixes
Date Fri, 26 May 2017 08:42:03 GMT
[FLINK-6708] [yarn] Minor improvements to YARN session HA fixes

This closes #3981.
This closes #3982.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2e138f10
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2e138f10
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2e138f10

Branch: refs/heads/release-1.3
Commit: 2e138f1009b30c91aa73a704cc175f9e61ca52ea
Parents: 9bc34bf
Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
Authored: Fri May 26 15:00:47 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
Committed: Fri May 26 16:33:20 2017 +0800

----------------------------------------------------------------------
 .../apache/flink/yarn/AbstractYarnClusterDescriptor.java  |  6 +++++-
 .../org/apache/flink/yarn/cli/FlinkYarnSessionCli.java    |  2 +-
 .../main/scala/org/apache/flink/yarn/YarnJobManager.scala | 10 +++++++---
 3 files changed, 13 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2e138f10/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index 818a3e8..044d1e7 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -1235,7 +1235,11 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 			LOG.info("Deleting files in {}.", yarnFilesDir);
 			try {
 				FileSystem fs = FileSystem.get(conf);
-				fs.delete(yarnFilesDir, true);
+
+				if (!fs.delete(yarnFilesDir, true)) {
+					throw new IOException("Deleting files in " + yarnFilesDir + " was unsuccessful");
+				}
+
 				fs.close();
 			} catch (IOException e) {
 				LOG.error("Failed to delete Flink Jar and conf files in HDFS", e);

http://git-wip-us.apache.org/repos/asf/flink/blob/2e138f10/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index d2a4340..53253d6 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -424,7 +424,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 						numTaskmanagers = status.numRegisteredTaskManagers();
 					}
 				} catch (Exception e) {
-					LOG.warn("Could not retrieve the current cluster status. Retrying...", e);
+					LOG.warn("Could not retrieve the current cluster status. Skipping current retrieval
attempt ...", e);
 				}
 
 				List<String> messages = yarnCluster.getNewMessages();

http://git-wip-us.apache.org/repos/asf/flink/blob/2e138f10/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
index 902553f..e094bb7 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
@@ -100,8 +100,8 @@ class YarnJobManager(
     handleYarnShutdown orElse super.handleMessage
   }
 
-  def handleYarnShutdown: Receive = {
-    case msg:StopCluster =>
+  private def handleYarnShutdown: Receive = {
+    case msg: StopCluster =>
       super.handleMessage(msg)
 
       // do global cleanup if the yarn files path has been set
@@ -113,7 +113,11 @@ class YarnJobManager(
 
           try {
             val fs = path.getFileSystem
-            fs.delete(path, true)
+
+            if (!fs.delete(path, true)) {
+              throw new IOException(s"Deleting yarn application files under $filePath " +
+                s"was unsuccessful.")
+            }
           } catch {
             case ioe: IOException =>
               log.warn(


Mime
View raw message