spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Paul Zaczkieiwcz (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (SPARK-12076) countDistinct behaves inconsistently
Date Tue, 10 Jan 2017 15:24:58 GMT

    [ https://issues.apache.org/jira/browse/SPARK-12076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15815268#comment-15815268
] 

Paul Zaczkieiwcz commented on SPARK-12076:
------------------------------------------

Actually, the counts were wrong with one of the plans. What I saw was that if I added Columns
to the agg method I got inaccurate totals for the countDistinct Column. I couldn't reproduce
without uploading proprietary data to the JIRA ticket, so I tried to document the query plan
as best I could.  The original source of the data were a directory of parquet files.  My guess
is that the data to aggregate was sourced from different parquet files, and hence different
partitions.  The query planner should have been smart enough to redistribute the data to the
same partition in order to do a countDistinct, but it apparently loses track if there are
too many Columns in the agg.

I'm sorry I can't generate a self-contained reproducer. I don't even know if it's still a
problem in Spark 2.1. 

> countDistinct behaves inconsistently
> ------------------------------------
>
>                 Key: SPARK-12076
>                 URL: https://issues.apache.org/jira/browse/SPARK-12076
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 1.5.1
>            Reporter: Paul Zaczkieiwcz
>            Priority: Minor
>
> Assume:
> {code:java}
> val slicePlayed:DataFrame = _
> val joinKeys:DataFrame = _
> {code}
> Also assume that all columns beginning with "cdnt_" are from {{slicePlayed}} and all
columns beginning with "join_" are from {{joinKeys}}.  The following queries can return different
values for slice_count_distinct:
> {code:java}
> slicePlayed.join(
>   joinKeys,
>   ( 
>     $"join_session_id" === $"cdnt_session_id" &&
>     $"join_asset_id" === $"cdnt_asset_id" &&
>     $"join_euid" === $"cdnt_euid"
>   ),
>   "inner"
> ).groupBy(
>   $"cdnt_session_id".as("slice_played_session_id"),
>   $"cdnt_asset_id".as("slice_played_asset_id"),
>   $"cdnt_euid".as("slice_played_euid")
> ).agg(
>   countDistinct($"cdnt_slice_number").as("slice_count_distinct"),
>   count($"cdnt_slice_number").as("slice_count_total"),
>   min($"cdnt_slice_number").as("min_slice_number"),
>   max($"cdnt_slice_number").as("max_slice_number")
> ).show(false)
> {code}
> {code:java}
> slicePlayed.join(
>   joinKeys,
>   ( 
>     $"join_session_id" === $"cdnt_session_id" &&
>     $"join_asset_id" === $"cdnt_asset_id" &&
>     $"join_euid" === $"cdnt_euid"
>   ),
>   "inner"
> ).groupBy(
>   $"cdnt_session_id".as("slice_played_session_id"),
>   $"cdnt_asset_id".as("slice_played_asset_id"),
>   $"cdnt_euid".as("slice_played_euid")
> ).agg(
>   min($"cdnt_event_time").as("slice_start_time"),
>   min($"cdnt_playing_owner_id").as("slice_played_playing_owner_id"),
>   min($"cdnt_user_ip").as("slice_played_user_ip"),
>   min($"cdnt_user_agent").as("slice_played_user_agent"),
>   min($"cdnt_referer").as("slice_played_referer"),
>   max($"cdnt_event_time").as("slice_end_time"),
>   countDistinct($"cdnt_slice_number").as("slice_count_distinct"),
>   count($"cdnt_slice_number").as("slice_count_total"),
>   min($"cdnt_slice_number").as("min_slice_number"),
>   max($"cdnt_slice_number").as("max_slice_number"),
>   min($"cdnt_is_live").as("is_live")
> ).show(false)
> {code}
> The +only+ difference between the two queries are that I'm adding more columns to the
{{agg}} method.
> I can't reproduce by manually creating a dataFrame from {{DataFrame.parallelize}}. The
original sources of the dataFrames are parquet files.
> The explain plans for the two queries are slightly different.
> {code}
> == Physical Plan ==
> TungstenAggregate(key=[cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13], functions=[(count(cdnt_slice_number#24L),mode=Final,isDistinct=false),(min(cdnt_slice_number#24L),mode=Final,isDistinct=false),(max(cdnt_slice_number#24L),mode=Final,isDistinct=false),(count(cdnt_slice_number#24L),mode=Complete,isDistinct=true)],
output=[slice_played_session_id#780,slice_played_asset_id#781,slice_played_euid#782,slice_count_distinct#783L,slice_count_total#784L,min_slice_number#785L,max_slice_number#786L])
>  TungstenAggregate(key=[cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13,cdnt_slice_number#24L],
functions=[(count(cdnt_slice_number#24L),mode=PartialMerge,isDistinct=false),(min(cdnt_slice_number#24L),mode=PartialMerge,isDistinct=false),(max(cdnt_slice_number#24L),mode=PartialMerge,isDistinct=false)],
output=[cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13,cdnt_slice_number#24L,currentCount#795L,min#797L,max#799L])
>   TungstenAggregate(key=[cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13,cdnt_slice_number#24L],
functions=[(count(cdnt_slice_number#24L),mode=Partial,isDistinct=false),(min(cdnt_slice_number#24L),mode=Partial,isDistinct=false),(max(cdnt_slice_number#24L),mode=Partial,isDistinct=false)],
output=[cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13,cdnt_slice_number#24L,currentCount#795L,min#797L,max#799L])
>    TungstenProject [cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13,cdnt_slice_number#24L]
>     SortMergeJoin [cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13], [join_session_id#41,join_asset_id#42,join_euid#43]
>      TungstenSort [cdnt_session_id#23 ASC,cdnt_asset_id#5 ASC,cdnt_euid#13 ASC], false,
0
>       TungstenExchange hashpartitioning(cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13)
>        ConvertToUnsafe
>         Scan ParquetRelation[hdfs://hadoop-namenode1:8020/user/hive/warehouse/src_cdn_events][cdnt_slice_number#24L,cdnt_euid#13,cdnt_asset_id#5,cdnt_session_id#23]
>      TungstenSort [join_session_id#41 ASC,join_asset_id#42 ASC,join_euid#43 ASC], false,
0
>       TungstenExchange hashpartitioning(join_session_id#41,join_asset_id#42,join_euid#43)
>        ConvertToUnsafe
>         Scan PhysicalRDD[join_session_id#41,join_asset_id#42,join_euid#43]
> {code}
> {code}
> == Physical Plan ==
> SortBasedAggregate(key=[cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13], functions=[(max(cdnt_event_time#4),mode=Final,isDistinct=false),(min(cdnt_event_time#4),mode=Final,isDistinct=false),(min(cdnt_is_live#18),mode=Final,isDistinct=false),(min(cdnt_playing_owner_id#21),mode=Final,isDistinct=false),(max(cdnt_slice_number#24L),mode=Final,isDistinct=false),(min(cdnt_slice_number#24L),mode=Final,isDistinct=false),(count(cdnt_slice_number#24L),mode=Final,isDistinct=false),(min(cdnt_user_ip#31),mode=Final,isDistinct=false),(min(cdnt_user_agent#30),mode=Final,isDistinct=false),(min(cdnt_referer#22),mode=Final,isDistinct=false),(count(cdnt_slice_number#24L),mode=Complete,isDistinct=true)],
output=[slice_played_session_id#721,slice_played_asset_id#722,slice_played_euid#723,slice_start_time#724,slice_played_playing_owner_id#725,slice_played_user_ip#726,slice_played_user_agent#727,slice_played_referer#728,slice_end_time#729,slice_count_distinct#730L,slice_count_total#731L,min_slice_number#732L,max_slice_number#733L,is_live#734])
>  SortBasedAggregate(key=[cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13,cdnt_slice_number#24L],
functions=[(max(cdnt_event_time#4),mode=PartialMerge,isDistinct=false),(min(cdnt_event_time#4),mode=PartialMerge,isDistinct=false),(min(cdnt_is_live#18),mode=PartialMerge,isDistinct=false),(min(cdnt_playing_owner_id#21),mode=PartialMerge,isDistinct=false),(max(cdnt_slice_number#24L),mode=PartialMerge,isDistinct=false),(min(cdnt_slice_number#24L),mode=PartialMerge,isDistinct=false),(count(cdnt_slice_number#24L),mode=PartialMerge,isDistinct=false),(min(cdnt_user_ip#31),mode=PartialMerge,isDistinct=false),(min(cdnt_user_agent#30),mode=PartialMerge,isDistinct=false),(min(cdnt_referer#22),mode=PartialMerge,isDistinct=false)],
output=[cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13,cdnt_slice_number#24L,max#758,min#748,min#768,min#750,max#766L,min#764L,currentCount#762L,min#752,min#754,min#756])
>   SortBasedAggregate(key=[cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13,cdnt_slice_number#24L],
functions=[(max(cdnt_event_time#4),mode=Partial,isDistinct=false),(min(cdnt_event_time#4),mode=Partial,isDistinct=false),(min(cdnt_is_live#18),mode=Partial,isDistinct=false),(min(cdnt_playing_owner_id#21),mode=Partial,isDistinct=false),(max(cdnt_slice_number#24L),mode=Partial,isDistinct=false),(min(cdnt_slice_number#24L),mode=Partial,isDistinct=false),(count(cdnt_slice_number#24L),mode=Partial,isDistinct=false),(min(cdnt_user_ip#31),mode=Partial,isDistinct=false),(min(cdnt_user_agent#30),mode=Partial,isDistinct=false),(min(cdnt_referer#22),mode=Partial,isDistinct=false)],
output=[cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13,cdnt_slice_number#24L,max#758,min#748,min#768,min#750,max#766L,min#764L,currentCount#762L,min#752,min#754,min#756])
>    ConvertToSafe
>     TungstenProject [cdnt_playing_owner_id#21,cdnt_session_id#23,cdnt_slice_number#24L,cdnt_euid#13,cdnt_event_time#4,cdnt_is_live#18,cdnt_user_ip#31,cdnt_user_agent#30,cdnt_referer#22,cdnt_asset_id#5]
>      SortMergeJoin [cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13], [join_session_id#41,join_asset_id#42,join_euid#43]
>       TungstenSort [cdnt_session_id#23 ASC,cdnt_asset_id#5 ASC,cdnt_euid#13 ASC], false,
0
>        TungstenExchange hashpartitioning(cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13)
>         ConvertToUnsafe
>          Scan ParquetRelation[hdfs://hadoop-namenode1:8020/user/hive/warehouse/src_cdn_events][cdnt_playing_owner_id#21,cdnt_session_id#23,cdnt_slice_number#24L,cdnt_euid#13,cdnt_event_time#4,cdnt_is_live#18,cdnt_user_ip#31,cdnt_user_agent#30,cdnt_referer#22,cdnt_asset_id#5]
>       TungstenSort [join_session_id#41 ASC,join_asset_id#42 ASC,join_euid#43 ASC], false,
0
>        TungstenExchange hashpartitioning(join_session_id#41,join_asset_id#42,join_euid#43)
>         ConvertToUnsafe
>          Scan PhysicalRDD[join_session_id#41,join_asset_id#42,join_euid#43]
> {code}
> The biggest difference betwen the two plans is whether TungstenAggregate is used or whether
SortBasedAggregate+ConvertToSafe is used. The SortBasedAggregate+ConvertToSafe method gives
the inaccurate results. I've been able to get around this issue by adding a {{sortBy}} call
before the {{groupBy}} clause, but it strikes me that this particular calculation shouldn't
change by adding a manual sort in an intermediate step.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message