spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Liang-Chi Hsieh (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (SPARK-17867) Dataset.dropDuplicates (i.e. distinct) should consider the columns with same column name
Date Thu, 18 May 2017 14:41:04 GMT

    [ https://issues.apache.org/jira/browse/SPARK-17867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16015852#comment-16015852
] 

Liang-Chi Hsieh commented on SPARK-17867:
-----------------------------------------

The above example code can't compile with current codebase. There is no repartitionByColumns
but only repartition.

{code}
    val df = Seq((1, 2, 3, "hi"), (1, 2, 4, "hi"))
      .toDF("userid", "eventid", "vk", "del")
      .filter("userid is not null and eventid is not null and vk is not null")
      .repartition($"userid")
      .sortWithinPartitions(asc("userid"), asc("eventid"), desc("vk"))
      .dropDuplicates("eventid")
      .filter("userid is not null")
      .repartition($"userid")
      .sortWithinPartitions(asc("userid"))
      .filter("del <> 'hi'")
{code}

The optimized plan looks like:

{code}
Sort [userid#9 ASC NULLS FIRST], false
+- RepartitionByExpression [userid#9], 5
   +- Filter (isnotnull(del#12) && NOT (del#12 = hi))
      +- Aggregate [eventid#10], [first(userid#9, false) AS userid#9, eventid#10, first(vk#11,
false) AS vk#11, first(del#12, false) AS del#12]
         +- Sort [userid#9 ASC NULLS FIRST, eventid#10 ASC NULLS FIRST, vk#11 DESC NULLS LAST],
false
            +- RepartitionByExpression [userid#9], 5
               +- LocalRelation [userid#9, eventid#10, vk#11, del#12]
{code}

The spark plan looks like:

{code}
Sort [userid#9 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(userid#9, 5)
   +- Filter (isnotnull(del#12) && NOT (del#12 = hi))
      +- SortAggregate(key=[eventid#10], functions=[first(userid#9, false), first(vk#11, false),
first(del#12, false)], output=[userid#9, eventid#10, vk#11, del#12])
         +- SortAggregate(key=[eventid#10], functions=[partial_first(userid#9, false), partial_first(vk#11,
false), partial_first(del#12, false)], output=[eventid#10, first#35, valueSet#36, first#37,
valueSet#38, first#39, valueSet#40])
            +- Sort [userid#9 ASC NULLS FIRST, eventid#10 ASC NULLS FIRST, vk#11 DESC NULLS
LAST], false, 0
               +- Exchange hashpartitioning(userid#9, 5)
                  +- LocalTableScan [userid#9, eventid#10, vk#11, del#12]
{code}

Looks like the "del <> 'hi'" filter doesn't be pushed down?

> Dataset.dropDuplicates (i.e. distinct) should consider the columns with same column name
> ----------------------------------------------------------------------------------------
>
>                 Key: SPARK-17867
>                 URL: https://issues.apache.org/jira/browse/SPARK-17867
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>            Reporter: Liang-Chi Hsieh
>            Assignee: Liang-Chi Hsieh
>             Fix For: 2.1.0
>
>
> We find and get the first resolved attribute from output with the given column name in
Dataset.dropDuplicates. When we have the more than one columns with the same name. Other columns
are put into aggregation columns, instead of grouping columns. We should fix this.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message