spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Kazuaki Ishizaki (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (SPARK-20254) SPARK-19716 generates inefficient Java code from a primitive array of Dataset
Date Fri, 07 Apr 2017 16:03:41 GMT

     [ https://issues.apache.org/jira/browse/SPARK-20254?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Kazuaki Ishizaki updated SPARK-20254:
-------------------------------------
    Summary: SPARK-19716 generates inefficient Java code from a primitive array of Dataset
 (was: SPARK-19716 generate inefficient Java code from a primitive array of Dataset)

> SPARK-19716 generates inefficient Java code from a primitive array of Dataset
> -----------------------------------------------------------------------------
>
>                 Key: SPARK-20254
>                 URL: https://issues.apache.org/jira/browse/SPARK-20254
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.2.0
>            Reporter: Kazuaki Ishizaki
>
> Since {{unresolvedmapobjects}} is newly introduced by SPARK-19716, the current implementation
generates {{mapobjects()}} at {{DeserializeToObject}}. This {{mapObject()}} introduces Java
code to store an array into {{GenericArrayData}}.
> cc: [~cloud_fan]
>  
> {code}
> val ds = sparkContext.parallelize(Seq(Array(1.1, 2.2)), 1).toDS.cache
> ds.count
> val ds2 = ds.map(e => e)
> ds2.explain(true)
> ds2.show
> {code}
> {code}
> == Parsed Logical Plan ==
> 'SerializeFromObject [staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData,
ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS value#25]
> +- 'MapElements <function1>, class [D, [StructField(value,ArrayType(DoubleType,false),true)],
obj#24: [D
>    +- 'DeserializeToObject unresolveddeserializer(upcast(getcolumnbyordinal(0, ArrayType(DoubleType,false)),
ArrayType(DoubleType,false), - root class: "scala.Array").toDoubleArray), obj#23: [D
>       +- SerializeFromObject [staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData,
ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS value#2]
>          +- ExternalRDD [obj#1]
> == Analyzed Logical Plan ==
> value: array<double>
> SerializeFromObject [staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData,
ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS value#25]
> +- MapElements <function1>, class [D, [StructField(value,ArrayType(DoubleType,false),true)],
obj#24: [D
>    +- DeserializeToObject cast(value#2 as array<double>).toDoubleArray, obj#23:
[D
>       +- SerializeFromObject [staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData,
ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS value#2]
>          +- ExternalRDD [obj#1]
> == Optimized Logical Plan ==
> SerializeFromObject [staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData,
ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS value#25]
> +- MapElements <function1>, class [D, [StructField(value,ArrayType(DoubleType,false),true)],
obj#24: [D
>    +- DeserializeToObject value#2.toDoubleArray, obj#23: [D
>       +- InMemoryRelation [value#2], true, 10000, StorageLevel(disk, memory, deserialized,
1 replicas)
>             +- *SerializeFromObject [staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData,
ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS value#2]
>                +- Scan ExternalRDDScan[obj#1]
> == Physical Plan ==
> *SerializeFromObject [staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData,
ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS value#25]
> +- *MapElements <function1>, obj#24: [D
>    +- *DeserializeToObject value#2.toDoubleArray, obj#23: [D
>       +- *InMemoryTableScan [value#2]
>             +- InMemoryRelation [value#2], true, 10000, StorageLevel(disk, memory, deserialized,
1 replicas)
>                   +- *SerializeFromObject [staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData,
ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS value#2]
>                      +- Scan ExternalRDDScan[obj#1]
> {code}
> {code}
> == Parsed Logical Plan ==
> 'SerializeFromObject [staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData,
ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS value#25]
> +- 'MapElements <function1>, class [D, [StructField(value,ArrayType(DoubleType,false),true)],
obj#24: [D
>    +- 'DeserializeToObject unresolveddeserializer(unresolvedmapobjects(<function1>,
getcolumnbyordinal(0, ArrayType(DoubleType,false)), None).toDoubleArray), obj#23: [D
>       +- SerializeFromObject [staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData,
ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS value#2]
>          +- ExternalRDD [obj#1]
> == Analyzed Logical Plan ==
> value: array<double>
> SerializeFromObject [staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData,
ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS value#25]
> +- MapElements <function1>, class [D, [StructField(value,ArrayType(DoubleType,false),true)],
obj#24: [D
>    +- DeserializeToObject mapobjects(MapObjects_loopValue4, MapObjects_loopIsNull4, DoubleType,
assertnotnull(lambdavariable(MapObjects_loopValue4, MapObjects_loopIsNull4, DoubleType, true),
- array element class: "scala.Double", - root class: "scala.Array"), value#2, None, MapObjects_builderValue4).toDoubleArray,
obj#23: [D
>       +- SerializeFromObject [staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData,
ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS value#2]
>          +- ExternalRDD [obj#1]
> == Optimized Logical Plan ==
> SerializeFromObject [staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData,
ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS value#25]
> +- MapElements <function1>, class [D, [StructField(value,ArrayType(DoubleType,false),true)],
obj#24: [D
>    +- DeserializeToObject mapobjects(MapObjects_loopValue4, MapObjects_loopIsNull4, DoubleType,
assertnotnull(lambdavariable(MapObjects_loopValue4, MapObjects_loopIsNull4, DoubleType, true),
- array element class: "scala.Double", - root class: "scala.Array"), value#2, None, MapObjects_builderValue4).toDoubleArray,
obj#23: [D
>       +- InMemoryRelation [value#2], true, 10000, StorageLevel(disk, memory, deserialized,
1 replicas)
>             +- *SerializeFromObject [staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData,
ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS value#2]
>                +- Scan ExternalRDDScan[obj#1]
> == Physical Plan ==
> *SerializeFromObject [staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData,
ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS value#25]
> +- *MapElements <function1>, obj#24: [D
>    +- *DeserializeToObject mapobjects(MapObjects_loopValue4, MapObjects_loopIsNull4,
DoubleType, assertnotnull(lambdavariable(MapObjects_loopValue4, MapObjects_loopIsNull4, DoubleType,
true), - array element class: "scala.Double", - root class: "scala.Array"), value#2, None,
MapObjects_builderValue4).toDoubleArray, obj#23: [D
>       +- InMemoryTableScan [value#2]
>             +- InMemoryRelation [value#2], true, 10000, StorageLevel(disk, memory, deserialized,
1 replicas)
>                   +- *SerializeFromObject [staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData,
ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS value#2]
>                      +- Scan ExternalRDDScan[obj#1]
> {{java}}
> {{java}}
> ...
> /* 056 */       ArrayData deserializetoobject_value1 = null;
> /* 057 */
> /* 058 */       if (!inputadapter_isNull) {
> /* 059 */         int deserializetoobject_dataLength = inputadapter_value.numElements();
> /* 060 */
> /* 061 */         Double[] deserializetoobject_convertedArray = null;
> /* 062 */         deserializetoobject_convertedArray = new Double[deserializetoobject_dataLength];
> /* 063 */
> /* 064 */         int deserializetoobject_loopIndex = 0;
> /* 065 */         while (deserializetoobject_loopIndex < deserializetoobject_dataLength)
{
> /* 066 */           MapObjects_loopValue2 = (double) (inputadapter_value.getDouble(deserializetoobject_loopIndex));
> /* 067 */           MapObjects_loopIsNull2 = inputadapter_value.isNullAt(deserializetoobject_loopIndex);
> /* 068 */
> /* 069 */           if (MapObjects_loopIsNull2) {
> /* 070 */             throw new RuntimeException(((java.lang.String) references[0]));
> /* 071 */           }
> /* 072 */           if (false) {
> /* 073 */             deserializetoobject_convertedArray[deserializetoobject_loopIndex]
= null;
> /* 074 */           } else {
> /* 075 */             deserializetoobject_convertedArray[deserializetoobject_loopIndex]
= MapObjects_loopValue2;
> /* 076 */           }
> /* 077 */
> /* 078 */           deserializetoobject_loopIndex += 1;
> /* 079 */         }
> /* 080 */
> /* 081 */         deserializetoobject_value1 = new org.apache.spark.sql.catalyst.util.GenericArrayData(deserializetoobject_convertedArray);
/*###*/
> /* 082 */       }
> ...
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message