flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [2/2] flink git commit: [FLINK-5585] [jobmanager] Fix NullPointerException in JobManager.updateAccumulators
Date Fri, 20 Jan 2017 18:07:16 GMT
[FLINK-5585] [jobmanager] Fix NullPointerException in JobManager.updateAccumulators


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/24ff9eba
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/24ff9eba
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/24ff9eba

Branch: refs/heads/master
Commit: 24ff9ebac926dda13c029c79c44c40580a5a1a2f
Parents: 6dffe28
Author: Stephan Ewen <sewen@apache.org>
Authored: Fri Jan 20 11:12:12 2017 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Fri Jan 20 18:54:30 2017 +0100

----------------------------------------------------------------------
 .../flink/runtime/jobmanager/JobManager.scala   | 23 ++++++++++----------
 1 file changed, 12 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/24ff9eba/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 4938cfc..d175c46 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -1849,18 +1849,19 @@ class JobManager(
    *
    * @param accumulators list of accumulator snapshots
    */
-  private def updateAccumulators(accumulators : Seq[AccumulatorSnapshot]) = {
-    accumulators foreach {
-      case accumulatorEvent =>
-        currentJobs.get(accumulatorEvent.getJobID) match {
-          case Some((jobGraph, jobInfo)) =>
-            future {
-              jobGraph.updateAccumulators(accumulatorEvent)
-            }(context.dispatcher)
-          case None =>
-          // ignore accumulator values for old job
+  private def updateAccumulators(accumulators : Seq[AccumulatorSnapshot]): Unit = {
+    accumulators.foreach( snapshot => {
+        if (snapshot != null) {
+          currentJobs.get(snapshot.getJobID) match {
+            case Some((jobGraph, jobInfo)) =>
+              future {
+                jobGraph.updateAccumulators(snapshot)
+              }(context.dispatcher)
+            case None =>
+              // ignore accumulator values for old job
+          }
         }
-    }
+    })
   }
 
   /**


Mime
View raw message