flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m..@apache.org
Subject [10/10] flink git commit: [FLINK-3863] Yarn Cluster shutdown may fail if leader changed recently
Date Fri, 17 Jun 2016 08:45:24 GMT
[FLINK-3863] Yarn Cluster shutdown may fail if leader changed recently


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9e984241
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9e984241
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9e984241

Branch: refs/heads/master
Commit: 9e9842410a635d183a002d1f25a6f489ce9d6a2f
Parents: f9b52a3
Author: Maximilian Michels <mxm@apache.org>
Authored: Wed Jun 1 12:45:52 2016 +0200
Committer: Maximilian Michels <mxm@apache.org>
Committed: Fri Jun 17 10:37:58 2016 +0200

----------------------------------------------------------------------
 .../apache/flink/yarn/ApplicationClient.scala   | 31 ++++++++------------
 1 file changed, 12 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9e984241/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala
index aea1aac..e701269 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala
@@ -176,29 +176,22 @@ class ApplicationClient(
           }
       }
 
-    case LocalStopYarnSession(status, diagnostics) =>
+    case msg @ LocalStopYarnSession(status, diagnostics) =>
       log.info("Sending StopCluster request to JobManager.")
 
-      val clusterStatus =
-        status match {
-          case FinalApplicationStatus.SUCCEEDED => ApplicationStatus.SUCCEEDED
-          case FinalApplicationStatus.KILLED => ApplicationStatus.CANCELED
-          case FinalApplicationStatus.FAILED => ApplicationStatus.FAILED
-          case _ => ApplicationStatus.UNKNOWN
-        }
-
-      yarnJobManager foreach {
-        // forward to preserve the sender's address
-        _ forward decorateMessage(new StopCluster(clusterStatus, diagnostics))
+      // preserve the original sender so we can reply
+      val originalSender = sender()
+
+      yarnJobManager match {
+        case Some(jm) =>
+          jm.tell(decorateMessage(new StopCluster(status, diagnostics)), originalSender)
+        case None =>
+          context.system.scheduler.scheduleOnce(1 second) {
+            // try once more; we might have been connected in the meantime
+            self.tell(msg, originalSender)
+          }(context.dispatcher)
       }
 
-    case msg: StopClusterSuccessful =>
-      log.info("Remote JobManager has been stopped successfully. " +
-        "Stopping local application client")
-
-      // poison ourselves
-      self ! decorateMessage(PoisonPill)
-
     // handle the responses from the PollYarnClusterStatus messages to the yarn job mgr
     case status: GetClusterStatusResponse =>
       latestClusterStatus = Some(status)


Mime
View raw message