flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject flink git commit: [FLINK-5585] [jobmanager] Fix NullPointerException in JobManager.updateAccumulators
Date Fri, 20 Jan 2017 13:46:30 GMT
Repository: flink
Updated Branches:
  refs/heads/release-1.2 6e85106b1 -> e2a4f3232


[FLINK-5585] [jobmanager] Fix NullPointerException in JobManager.updateAccumulators


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e2a4f323
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e2a4f323
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e2a4f323

Branch: refs/heads/release-1.2
Commit: e2a4f323292dfdee4c69c9261b265860cba6f6a0
Parents: 6e85106
Author: Stephan Ewen <sewen@apache.org>
Authored: Fri Jan 20 11:12:12 2017 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Fri Jan 20 14:45:59 2017 +0100

----------------------------------------------------------------------
 .../flink/runtime/jobmanager/JobManager.scala   | 23 ++++++++++----------
 1 file changed, 12 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e2a4f323/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index c5682e2..50a619c 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -1844,18 +1844,19 @@ class JobManager(
    *
    * @param accumulators list of accumulator snapshots
    */
-  private def updateAccumulators(accumulators : Seq[AccumulatorSnapshot]) = {
-    accumulators foreach {
-      case accumulatorEvent =>
-        currentJobs.get(accumulatorEvent.getJobID) match {
-          case Some((jobGraph, jobInfo)) =>
-            future {
-              jobGraph.updateAccumulators(accumulatorEvent)
-            }(context.dispatcher)
-          case None =>
-          // ignore accumulator values for old job
+  private def updateAccumulators(accumulators : Seq[AccumulatorSnapshot]): Unit = {
+    accumulators.foreach( snapshot => {
+        if (snapshot != null) {
+          currentJobs.get(snapshot.getJobID) match {
+            case Some((jobGraph, jobInfo)) =>
+              future {
+                jobGraph.updateAccumulators(snapshot)
+              }(context.dispatcher)
+            case None =>
+              // ignore accumulator values for old job
+          }
         }
-    }
+    })
   }
 
   /**


Mime
View raw message