spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wenc...@apache.org
Subject spark git commit: [SPARK-19104][BACKPORT-2.1][SQL] Lambda variables in ExternalMapToCatalyst should be global
Date Tue, 18 Jul 2017 01:18:39 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 ca4d2aa39 -> a9efce46b


[SPARK-19104][BACKPORT-2.1][SQL] Lambda variables in ExternalMapToCatalyst should be global

## What changes were proposed in this pull request?

This PR is backport of #18418 to Spark 2.1. [SPARK-21391](https://issues.apache.org/jira/browse/SPARK-21391)
reported this problem in Spark 2.1.

The issue happens in `ExternalMapToCatalyst`. For example, the following codes create ExternalMap`ExternalMapToCatalyst`ToCatalyst
to convert Scala Map to catalyst map format.

```
val data = Seq.tabulate(10)(i => NestedData(1, Map("key" -> InnerData("name", i + 100))))
val ds = spark.createDataset(data)
```
The `valueConverter` in `ExternalMapToCatalyst` looks like:

```
if (isnull(lambdavariable(ExternalMapToCatalyst_value52, ExternalMapToCatalyst_value_isNull52,
ObjectType(class org.apache.spark.sql.InnerData), true))) null else named_struct(name, staticinvoke(class
org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(lambdavariable(ExternalMapToCatalyst_value52,
ExternalMapToCatalyst_value_isNull52, ObjectType(class org.apache.spark.sql.InnerData), true)).name,
true), value, assertnotnull(lambdavariable(ExternalMapToCatalyst_value52, ExternalMapToCatalyst_value_isNull52,
ObjectType(class org.apache.spark.sql.InnerData), true)).value)
```
There is a `CreateNamedStruct` expression (`named_struct`) to create a row of `InnerData.name`
and `InnerData.value` that are referred by `ExternalMapToCatalyst_value52`.

Because `ExternalMapToCatalyst_value52` are local variable, when `CreateNamedStruct` splits
expressions to individual functions, the local variable can't be accessed anymore.

## How was this patch tested?

Added a new test suite into `DatasetPrimitiveSuite`

Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>

Closes #18627 from kiszk/SPARK-21391.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a9efce46
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a9efce46
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a9efce46

Branch: refs/heads/branch-2.1
Commit: a9efce46b7c9af5fc01dc55a8731a7ae02919a93
Parents: ca4d2aa
Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>
Authored: Tue Jul 18 09:18:32 2017 +0800
Committer: Wenchen Fan <wenchen@databricks.com>
Committed: Tue Jul 18 09:18:32 2017 +0800

----------------------------------------------------------------------
 .../catalyst/expressions/objects/objects.scala    | 18 ++++++++++++------
 .../apache/spark/sql/DatasetPrimitiveSuite.scala  |  8 ++++++++
 2 files changed, 20 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a9efce46/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
index 256de74..127a8c4 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
@@ -660,6 +660,12 @@ case class ExternalMapToCatalyst private(
     val entry = ctx.freshName("entry")
     val entries = ctx.freshName("entries")
 
+    val keyElementJavaType = ctx.javaType(keyType)
+    val valueElementJavaType = ctx.javaType(valueType)
+    ctx.addMutableState(keyElementJavaType, key, "")
+    ctx.addMutableState("boolean", valueIsNull, "")
+    ctx.addMutableState(valueElementJavaType, value, "")
+
     val (defineEntries, defineKeyValue) = child.dataType match {
       case ObjectType(cls) if classOf[java.util.Map[_, _]].isAssignableFrom(cls) =>
         val javaIteratorCls = classOf[java.util.Iterator[_]].getName
@@ -671,8 +677,8 @@ case class ExternalMapToCatalyst private(
         val defineKeyValue =
           s"""
             final $javaMapEntryCls $entry = ($javaMapEntryCls) $entries.next();
-            ${ctx.javaType(keyType)} $key = (${ctx.boxedType(keyType)}) $entry.getKey();
-            ${ctx.javaType(valueType)} $value = (${ctx.boxedType(valueType)}) $entry.getValue();
+            $key = (${ctx.boxedType(keyType)}) $entry.getKey();
+            $value = (${ctx.boxedType(valueType)}) $entry.getValue();
           """
 
         defineEntries -> defineKeyValue
@@ -686,17 +692,17 @@ case class ExternalMapToCatalyst private(
         val defineKeyValue =
           s"""
             final $scalaMapEntryCls $entry = ($scalaMapEntryCls) $entries.next();
-            ${ctx.javaType(keyType)} $key = (${ctx.boxedType(keyType)}) $entry._1();
-            ${ctx.javaType(valueType)} $value = (${ctx.boxedType(valueType)}) $entry._2();
+            $key = (${ctx.boxedType(keyType)}) $entry._1();
+            $value = (${ctx.boxedType(valueType)}) $entry._2();
           """
 
         defineEntries -> defineKeyValue
     }
 
     val valueNullCheck = if (ctx.isPrimitiveType(valueType)) {
-      s"boolean $valueIsNull = false;"
+      s"$valueIsNull = false;"
     } else {
-      s"boolean $valueIsNull = $value == null;"
+      s"$valueIsNull = $value == null;"
     }
 
     val arrayCls = classOf[GenericArrayData].getName

http://git-wip-us.apache.org/repos/asf/spark/blob/a9efce46/sql/core/src/test/scala/org/apache/spark/sql/DatasetPrimitiveSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetPrimitiveSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetPrimitiveSuite.scala
index f8d4c61..deb76ce 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetPrimitiveSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetPrimitiveSuite.scala
@@ -21,6 +21,9 @@ import org.apache.spark.sql.test.SharedSQLContext
 
 case class IntClass(value: Int)
 
+case class InnerData(name: String, value: Int)
+case class NestedData(id: Int, param: Map[String, InnerData])
+
 package object packageobject {
   case class PackageClass(value: Int)
 }
@@ -135,4 +138,9 @@ class DatasetPrimitiveSuite extends QueryTest with SharedSQLContext {
     checkDataset(Seq(PackageClass(1)).toDS(), PackageClass(1))
   }
 
+  test("SPARK-19104: Lambda variables in ExternalMapToCatalyst should be global") {
+    val data = Seq.tabulate(10)(i => NestedData(1, Map("key" -> InnerData("name", i
+ 100))))
+    val ds = spark.createDataset(data)
+    checkDataset(ds, data: _*)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message