flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject flink git commit: [hotfix] [runtime] Guard async recovery operation in try-catch
Date Fri, 11 Mar 2016 14:31:24 GMT
Repository: flink
Updated Branches:
  refs/heads/release-1.0 5f35d13be -> a405b55b0


[hotfix] [runtime] Guard async recovery operation in try-catch

If something fails during a RecoverJobGraph, the logs don't
show anything.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a405b55b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a405b55b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a405b55b

Branch: refs/heads/release-1.0
Commit: a405b55b0edc13487e2bd99ede59651a1782d0da
Parents: 5f35d13
Author: Ufuk Celebi <uce@apache.org>
Authored: Fri Mar 11 15:05:53 2016 +0100
Committer: Ufuk Celebi <uce@apache.org>
Committed: Fri Mar 11 15:31:07 2016 +0100

----------------------------------------------------------------------
 .../flink/runtime/jobmanager/JobManager.scala   | 40 ++++++++++----------
 1 file changed, 21 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a405b55b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 1c6fce8..5858171 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -393,26 +393,29 @@ class JobManager(
 
     case RecoverJob(jobId) =>
       future {
-        // The ActorRef, which is part of the submitted job graph can only be de-serialized
in the
-        // scope of an actor system.
-        akka.serialization.JavaSerializer.currentSystem.withValue(
-          context.system.asInstanceOf[ExtendedActorSystem]) {
+        try {
+          // The ActorRef, which is part of the submitted job graph can only be
+          // de-serialized in the scope of an actor system.
+          akka.serialization.JavaSerializer.currentSystem.withValue(
+            context.system.asInstanceOf[ExtendedActorSystem]) {
 
-          log.info(s"Attempting to recover job $jobId.")
+            log.info(s"Attempting to recover job $jobId.")
+            val submittedJobGraphOption = submittedJobGraphs.recoverJobGraph(jobId)
 
-          val submittedJobGraphOption = submittedJobGraphs.recoverJobGraph(jobId)
+            submittedJobGraphOption match {
+              case Some(submittedJobGraph) =>
+                if (!leaderElectionService.hasLeadership()) {
+                  // we've lost leadership. mission: abort.
+                  log.warn(s"Lost leadership during recovery. Aborting recovery of $jobId.")
+                } else {
+                  self ! decorateMessage(RecoverSubmittedJob(submittedJobGraph))
+                }
 
-          submittedJobGraphOption match {
-            case Some(submittedJobGraph) =>
-              if (!leaderElectionService.hasLeadership()) {
-                // we've lost leadership. mission: abort.
-                log.warn(s"Lost leadership during recovery. Aborting recovery of $jobId.")
-              }
-              else {
-                self ! decorateMessage(RecoverSubmittedJob(submittedJobGraph))
-              }
-            case None => log.warn(s"Failed to recover job graph $jobId.")
+              case None => log.info(s"Attempted to recover job $jobId, but no job graph
found.")
+            }
           }
+        } catch {
+          case t: Throwable => log.error(s"Failed to recover job $jobId.", t)
         }
       }(context.dispatcher)
 
@@ -432,8 +435,7 @@ class JobManager(
               // we've lost leadership. mission: abort.
               log.warn(s"Lost leadership during recovery. Aborting recovery of ${jobGraphs.size}
" +
                 s"jobs.")
-            }
-            else {
+            } else {
               log.info(s"Re-submitting ${jobGraphs.size} job graphs.")
 
               jobGraphs.foreach{
@@ -443,7 +445,7 @@ class JobManager(
             }
           }
         } catch {
-          case e: Exception => log.error("Fatal error: Failed to recover jobs.", e)
+          case t: Throwable => log.error("Fatal error: Failed to recover jobs.", t)
         }
       }(context.dispatcher)
 


Mime
View raw message