spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject spark git commit: [SPARK-18462] Fix ClassCastException in SparkListenerDriverAccumUpdates event
Date Fri, 18 Nov 2016 02:45:37 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 37e6d9930 -> da9d51661


[SPARK-18462] Fix ClassCastException in SparkListenerDriverAccumUpdates event

## What changes were proposed in this pull request?

This patch fixes a `ClassCastException: java.lang.Integer cannot be cast to java.lang.Long`
error which could occur in the HistoryServer while trying to process a deserialized `SparkListenerDriverAccumUpdates`
event.

The problem stems from how `jackson-module-scala` handles primitive type parameters (see https://github.com/FasterXML/jackson-module-scala/wiki/FAQ#deserializing-optionint-and-other-primitive-challenges
for more details). This was causing a problem where our code expected a field to be deserialized
as a `(Long, Long)` tuple but we got an `(Int, Int)` tuple instead.

This patch hacks around this issue by registering a custom `Converter` with Jackson in order
to deserialize the tuples as `(Object, Object)` and perform the appropriate casting.

## How was this patch tested?

New regression tests in `SQLListenerSuite`.

Author: Josh Rosen <joshrosen@databricks.com>

Closes #15922 from JoshRosen/SPARK-18462.

(cherry picked from commit d9dd979d170f44383a9a87f892f2486ddb3cca7d)
Signed-off-by: Reynold Xin <rxin@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/da9d5166
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/da9d5166
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/da9d5166

Branch: refs/heads/branch-2.0
Commit: da9d516610b9b0e0d11eb485b6e56442d9194729
Parents: 37e6d99
Author: Josh Rosen <joshrosen@databricks.com>
Authored: Thu Nov 17 18:45:15 2016 -0800
Committer: Reynold Xin <rxin@databricks.com>
Committed: Thu Nov 17 18:45:33 2016 -0800

----------------------------------------------------------------------
 .../spark/sql/execution/ui/SQLListener.scala    | 39 ++++++++++++++++-
 .../sql/execution/ui/SQLListenerSuite.scala     | 44 +++++++++++++++++++-
 2 files changed, 80 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/da9d5166/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
index 60f1343..5daf215 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
@@ -19,6 +19,11 @@ package org.apache.spark.sql.execution.ui
 
 import scala.collection.mutable
 
+import com.fasterxml.jackson.databind.JavaType
+import com.fasterxml.jackson.databind.`type`.TypeFactory
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize
+import com.fasterxml.jackson.databind.util.Converter
+
 import org.apache.spark.{JobExecutionStatus, SparkConf}
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.internal.Logging
@@ -43,9 +48,41 @@ case class SparkListenerSQLExecutionEnd(executionId: Long, time: Long)
   extends SparkListenerEvent
 
 @DeveloperApi
-case class SparkListenerDriverAccumUpdates(executionId: Long, accumUpdates: Seq[(Long, Long)])
+case class SparkListenerDriverAccumUpdates(
+    executionId: Long,
+    @JsonDeserialize(contentConverter = classOf[LongLongTupleConverter])
+    accumUpdates: Seq[(Long, Long)])
   extends SparkListenerEvent
 
+/**
+ * Jackson [[Converter]] for converting an (Int, Int) tuple into a (Long, Long) tuple.
+ *
+ * This is necessary due to limitations in how Jackson's scala module deserializes primitives;
+ * see the "Deserializing Option[Int] and other primitive challenges" section in
+ * https://github.com/FasterXML/jackson-module-scala/wiki/FAQ for a discussion of this issue
and
+ * SPARK-18462 for the specific problem that motivated this conversion.
+ */
+private class LongLongTupleConverter extends Converter[(Object, Object), (Long, Long)] {
+
+  override def convert(in: (Object, Object)): (Long, Long) = {
+    def toLong(a: Object): Long = a match {
+      case i: java.lang.Integer => i.intValue()
+      case l: java.lang.Long => l.longValue()
+    }
+    (toLong(in._1), toLong(in._2))
+  }
+
+  override def getInputType(typeFactory: TypeFactory): JavaType = {
+    val objectType = typeFactory.uncheckedSimpleType(classOf[Object])
+    typeFactory.constructSimpleType(classOf[(_, _)], classOf[(_, _)], Array(objectType, objectType))
+  }
+
+  override def getOutputType(typeFactory: TypeFactory): JavaType = {
+    val longType = typeFactory.uncheckedSimpleType(classOf[Long])
+    typeFactory.constructSimpleType(classOf[(_, _)], classOf[(_, _)], Array(longType, longType))
+  }
+}
+
 class SQLHistoryListenerFactory extends SparkHistoryListenerFactory {
 
   override def createListeners(conf: SparkConf, sparkUI: SparkUI): Seq[SparkListener] = {

http://git-wip-us.apache.org/repos/asf/spark/blob/da9d5166/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
index 6e60b0e..044fcb4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.ui
 
 import java.util.Properties
 
+import org.json4s.jackson.JsonMethods._
 import org.mockito.Mockito.mock
 
 import org.apache.spark._
@@ -34,10 +35,10 @@ import org.apache.spark.sql.execution.{LeafExecNode, QueryExecution, SparkPlanIn
 import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.ui.SparkUI
-import org.apache.spark.util.{AccumulatorMetadata, LongAccumulator}
+import org.apache.spark.util.{AccumulatorMetadata, JsonProtocol, LongAccumulator}
 
 
-class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {
+class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with JsonTestUtils {
   import testImplicits._
   import org.apache.spark.AccumulatorSuite.makeInfo
 
@@ -415,6 +416,45 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {
     assert(driverUpdates(physicalPlan.longMetric("dummy").id) == expectedAccumValue)
   }
 
+  test("roundtripping SparkListenerDriverAccumUpdates through JsonProtocol (SPARK-18462)")
{
+    val event = SparkListenerDriverAccumUpdates(1L, Seq((2L, 3L)))
+    val json = JsonProtocol.sparkEventToJson(event)
+    assertValidDataInJson(json,
+      parse("""
+        |{
+        |  "Event": "org.apache.spark.sql.execution.ui.SparkListenerDriverAccumUpdates",
+        |  "executionId": 1,
+        |  "accumUpdates": [[2,3]]
+        |}
+      """.stripMargin))
+    JsonProtocol.sparkEventFromJson(json) match {
+      case SparkListenerDriverAccumUpdates(executionId, accums) =>
+        assert(executionId == 1L)
+        accums.foreach { case (a, b) =>
+          assert(a == 2L)
+          assert(b == 3L)
+        }
+    }
+
+    // Test a case where the numbers in the JSON can only fit in longs:
+    val longJson = parse(
+      """
+        |{
+        |  "Event": "org.apache.spark.sql.execution.ui.SparkListenerDriverAccumUpdates",
+        |  "executionId": 4294967294,
+        |  "accumUpdates": [[4294967294,3]]
+        |}
+      """.stripMargin)
+    JsonProtocol.sparkEventFromJson(longJson) match {
+      case SparkListenerDriverAccumUpdates(executionId, accums) =>
+        assert(executionId == 4294967294L)
+        accums.foreach { case (a, b) =>
+          assert(a == 4294967294L)
+          assert(b == 3L)
+        }
+    }
+  }
+
 }
 
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message