spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wenc...@apache.org
Subject spark git commit: [SPARK-19674][SQL] Ignore driver accumulator updates don't belong to the execution when merging all accumulator updates
Date Thu, 23 Feb 2017 22:31:21 GMT
Repository: spark
Updated Branches:
  refs/heads/master f87a6a59a -> eff7b4089


[SPARK-19674][SQL] Ignore driver accumulator updates don't belong to the execution when merging
all accumulator updates

## What changes were proposed in this pull request?
In SQLListener.getExecutionMetrics, driver accumulator updates don't belong to the execution
should be ignored when merging all accumulator updates to prevent NoSuchElementException.

## How was this patch tested?
Updated unit test.

Author: Carson Wang <carson.wang@intel.com>

Closes #17009 from carsonwang/FixSQLMetrics.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/eff7b408
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/eff7b408
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/eff7b408

Branch: refs/heads/master
Commit: eff7b40890f39617538d300df747277781a6f014
Parents: f87a6a5
Author: Carson Wang <carson.wang@intel.com>
Authored: Thu Feb 23 14:31:16 2017 -0800
Committer: Wenchen Fan <wenchen@databricks.com>
Committed: Thu Feb 23 14:31:16 2017 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/sql/execution/ui/SQLListener.scala | 7 +++++--
 .../org/apache/spark/sql/execution/ui/SQLListenerSuite.scala  | 5 +++++
 2 files changed, 10 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/eff7b408/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
index 5daf215..12d3bc9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
@@ -343,10 +343,13 @@ class SQLListener(conf: SparkConf) extends SparkListener with Logging
{
                accumulatorUpdate <- taskMetrics.accumulatorUpdates) yield {
             (accumulatorUpdate._1, accumulatorUpdate._2)
           }
-        }.filter { case (id, _) => executionUIData.accumulatorMetrics.contains(id) }
+        }
 
         val driverUpdates = executionUIData.driverAccumUpdates.toSeq
-        mergeAccumulatorUpdates(accumulatorUpdates ++ driverUpdates, accumulatorId =>
+        val totalUpdates = (accumulatorUpdates ++ driverUpdates).filter {
+          case (id, _) => executionUIData.accumulatorMetrics.contains(id)
+        }
+        mergeAccumulatorUpdates(totalUpdates, accumulatorId =>
           executionUIData.accumulatorMetrics(accumulatorId).metricType)
       case None =>
         // This execution has been dropped

http://git-wip-us.apache.org/repos/asf/spark/blob/eff7b408/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
index 8aea112..e41c00e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
@@ -147,6 +147,11 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with
JsonTest
 
     checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 2))
 
+    // Driver accumulator updates don't belong to this execution should be filtered and no
+    // exception will be thrown.
+    listener.onOtherEvent(SparkListenerDriverAccumUpdates(0, Seq((999L, 2L))))
+    checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 2))
+
     listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
       // (task id, stage id, stage attempt, accum updates)
       (0L, 0, 0, createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)),


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message