spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Russell Spitzer (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (SPARK-22316) Cannot Select ReducedAggregator Column
Date Thu, 19 Oct 2017 22:17:00 GMT

    [ https://issues.apache.org/jira/browse/SPARK-22316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16211846#comment-16211846
] 

Russell Spitzer commented on SPARK-22316:
-----------------------------------------

You are right, i meant to have createDataset up there. I"ll modify the example.

This shouldn't be Dataset specific since the underlying issue isn't really dependent on the
encoders, it's just that the name that is automatically made cannot be used to select the
column. After all a Dataframe is just a Dataset[Row] :). 

I haven't really looked into this but i'm guessing it's the "(" characters in the auto-generated
column name.

Here is an example with Dataframes for good measure
{code}
case class Person(name: String, age: Int)
case class Customer(id: Int, person: Person)
val ds = spark.createDataFrame(Seq(Customer(1,Person("russ", 85))))
val grouped = ds.groupByKey(c => c.getInt(0)).reduceGroups( (x,y) => x )
grouped(grouped.columns(1))
/**org.apache.spark.sql.AnalysisException: Cannot resolve column name "ReduceAggregator(org.apache.spark.sql.Row)"
among (value, ReduceAggregator(org.apache.spark.sql.Row));
**/
{code}






> Cannot Select ReducedAggregator Column
> --------------------------------------
>
>                 Key: SPARK-22316
>                 URL: https://issues.apache.org/jira/browse/SPARK-22316
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.2.0
>            Reporter: Russell Spitzer
>            Priority: Minor
>
> Given a dataset which has been run through reduceGroups like this
> {code}
> case class Person(name: String, age: Int)
> case class Customer(id: Int, person: Person)
> val ds = spark.createDataFrame(Seq(Customer(1,Person("russ", 85)))
> val grouped = ds.groupByKey(c => c.id).reduceGroups( (x,y) => x )
> {code}
> We end up with a Dataset with the schema
> {code}
>  org.apache.spark.sql.types.StructType = 
> StructType(
>   StructField(value,IntegerType,false), 
>   StructField(ReduceAggregator(Customer),
>     StructType(StructField(id,IntegerType,false),
>     StructField(person,
>       StructType(StructField(name,StringType,true),
>       StructField(age,IntegerType,false))
>    ,true))
> ,true))
> {code}
> The column names are 
> {code}
> Array(value, ReduceAggregator(Customer))
> {code}
> But you cannot select the "ReduceAggregatorColumn"
> {code}
> grouped.select(grouped.columns(1))
> org.apache.spark.sql.AnalysisException: cannot resolve '`ReduceAggregator(Customer)`'
given input columns: [value, ReduceAggregator(Customer)];;
> 'Project ['ReduceAggregator(Customer)]
> +- Aggregate [value#338], [value#338, reduceaggregator(org.apache.spark.sql.expressions.ReduceAggregator@5ada573,
Some(newInstance(class Customer)), Some(class Customer), Some(StructType(StructField(id,IntegerType,false),
StructField(person,StructType(StructField(name,StringType,true), StructField(age,IntegerType,false)),true))),
input[0, scala.Tuple2, true]._1 AS value#340, if ((isnull(input[0, scala.Tuple2, true]._2)
|| None.equals)) null else named_struct(id, assertnotnull(assertnotnull(input[0, scala.Tuple2,
true]._2)).id AS id#195, person, if (isnull(assertnotnull(assertnotnull(input[0, scala.Tuple2,
true]._2)).person)) null else named_struct(name, staticinvoke(class org.apache.spark.unsafe.types.UTF8String,
StringType, fromString, assertnotnull(assertnotnull(assertnotnull(input[0, scala.Tuple2, true]._2)).person).name,
true), age, assertnotnull(assertnotnull(assertnotnull(input[0, scala.Tuple2, true]._2)).person).age)
AS person#196) AS _2#341, newInstance(class scala.Tuple2), assertnotnull(assertnotnull(input[0,
Customer, true])).id AS id#195, if (isnull(assertnotnull(assertnotnull(input[0, Customer,
true])).person)) null else named_struct(name, staticinvoke(class org.apache.spark.unsafe.types.UTF8String,
StringType, fromString, assertnotnull(assertnotnull(assertnotnull(input[0, Customer, true])).person).name,
true), age, assertnotnull(assertnotnull(assertnotnull(input[0, Customer, true])).person).age)
AS person#196, StructField(id,IntegerType,false), StructField(person,StructType(StructField(name,StringType,true),
StructField(age,IntegerType,false)),true), true, 0, 0) AS ReduceAggregator(Customer)#346]
>    +- AppendColumns <function1>, class Customer, [StructField(id,IntegerType,false),
StructField(person,StructType(StructField(name,StringType,true), StructField(age,IntegerType,false)),true)],
newInstance(class Customer), [input[0, int, false] AS value#338]
>       +- LocalRelation [id#197, person#198]
> {code}
> You can work around this by using "toDF" to rename the column
> {code}
> scala> grouped.toDF("key", "reduced").select("reduced")
> res55: org.apache.spark.sql.DataFrame = [reduced: struct<id: int, person: struct<name:
string, age: int>>]
> {code}
> I think that all invocations of 
> {code}
> ds.select(ds.columns(i))
> {code}
> For all valid i < columns size should work.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message