spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Sean Owen (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (SPARK-22316) Cannot Select ReducedAggregator Column
Date Thu, 19 Oct 2017 21:04:00 GMT

    [ https://issues.apache.org/jira/browse/SPARK-22316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16211733#comment-16211733
] 

Sean Owen commented on SPARK-22316:
-----------------------------------

OK, so it's just generally about selecting this column, not needing to achieve a particular
result. 

You're really operating on Datasets here (right? the example doesn't compile without createDataset)
rather than DataFrames. DataFrames are the columnar abstraction, though Datasets also notionally
have a schema. I mean that I wouldn't generally look to manipulate "columns" of a Dataset,
but explicitly turn it back into a DataFrame. And yes as you say you can control the naming
that way as you like. I think that's the right way to approach this; I always avoid relying
on generated column names anyway.

I don't know if it's intentional that the 'column' in the Dataset is un-selectable as it's
an implementation detail or something, or a quirk. Even after toDF() I can't seem to select
it by what its name seems to be.

> Cannot Select ReducedAggregator Column
> --------------------------------------
>
>                 Key: SPARK-22316
>                 URL: https://issues.apache.org/jira/browse/SPARK-22316
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.2.0
>            Reporter: Russell Spitzer
>            Priority: Minor
>
> Given a dataset which has been run through reduceGroups like this
> {code}
> case class Person(name: String, age: Int)
> case class Customer(id: Int, person: Person)
> val ds = spark.createDataFrame(Seq(Customer(1,Person("russ", 85)))
> val grouped = ds.groupByKey(c => c.id).reduceGroups( (x,y) => x )
> {code}
> We end up with a Dataset with the schema
> {code}
>  org.apache.spark.sql.types.StructType = 
> StructType(
>   StructField(value,IntegerType,false), 
>   StructField(ReduceAggregator(Customer),
>     StructType(StructField(id,IntegerType,false),
>     StructField(person,
>       StructType(StructField(name,StringType,true),
>       StructField(age,IntegerType,false))
>    ,true))
> ,true))
> {code}
> The column names are 
> {code}
> Array(value, ReduceAggregator(Customer))
> {code}
> But you cannot select the "ReduceAggregatorColumn"
> {code}
> grouped.select(grouped.columns(1))
> org.apache.spark.sql.AnalysisException: cannot resolve '`ReduceAggregator(Customer)`'
given input columns: [value, ReduceAggregator(Customer)];;
> 'Project ['ReduceAggregator(Customer)]
> +- Aggregate [value#338], [value#338, reduceaggregator(org.apache.spark.sql.expressions.ReduceAggregator@5ada573,
Some(newInstance(class Customer)), Some(class Customer), Some(StructType(StructField(id,IntegerType,false),
StructField(person,StructType(StructField(name,StringType,true), StructField(age,IntegerType,false)),true))),
input[0, scala.Tuple2, true]._1 AS value#340, if ((isnull(input[0, scala.Tuple2, true]._2)
|| None.equals)) null else named_struct(id, assertnotnull(assertnotnull(input[0, scala.Tuple2,
true]._2)).id AS id#195, person, if (isnull(assertnotnull(assertnotnull(input[0, scala.Tuple2,
true]._2)).person)) null else named_struct(name, staticinvoke(class org.apache.spark.unsafe.types.UTF8String,
StringType, fromString, assertnotnull(assertnotnull(assertnotnull(input[0, scala.Tuple2, true]._2)).person).name,
true), age, assertnotnull(assertnotnull(assertnotnull(input[0, scala.Tuple2, true]._2)).person).age)
AS person#196) AS _2#341, newInstance(class scala.Tuple2), assertnotnull(assertnotnull(input[0,
Customer, true])).id AS id#195, if (isnull(assertnotnull(assertnotnull(input[0, Customer,
true])).person)) null else named_struct(name, staticinvoke(class org.apache.spark.unsafe.types.UTF8String,
StringType, fromString, assertnotnull(assertnotnull(assertnotnull(input[0, Customer, true])).person).name,
true), age, assertnotnull(assertnotnull(assertnotnull(input[0, Customer, true])).person).age)
AS person#196, StructField(id,IntegerType,false), StructField(person,StructType(StructField(name,StringType,true),
StructField(age,IntegerType,false)),true), true, 0, 0) AS ReduceAggregator(Customer)#346]
>    +- AppendColumns <function1>, class Customer, [StructField(id,IntegerType,false),
StructField(person,StructType(StructField(name,StringType,true), StructField(age,IntegerType,false)),true)],
newInstance(class Customer), [input[0, int, false] AS value#338]
>       +- LocalRelation [id#197, person#198]
> {code}
> You can work around this by using "toDF" to rename the column
> {code}
> scala> grouped.toDF("key", "reduced").select("reduced")
> res55: org.apache.spark.sql.DataFrame = [reduced: struct<id: int, person: struct<name:
string, age: int>>]
> {code}
> I think that all invocations of 
> {code}
> ds.select(ds.columns(i))
> {code}
> For all valid i < columns size should work.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message