hive-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "liyunzhang_intel (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (HIVE-17087) Remove unnecessary HoS DPP trees during map-join conversion
Date Fri, 21 Jul 2017 07:17:00 GMT

    [ https://issues.apache.org/jira/browse/HIVE-17087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16095886#comment-16095886
] 

liyunzhang_intel commented on HIVE-17087:
-----------------------------------------

[~stakiar]:  1 question about the patch
1. 
{noformat}
/* Two of the optimization rules, ConvertJoinMapJoin and RemoveDynamicPruningBySize, are put
into
49	     stats dependent optimizations and run together in TezCompiler. There's no guarantee
which one
50	     runs first, but in either case, the prior one may have removed a chain which the latter
one is
51	     not aware of. So we need to remember the leaf node(s) of that chain so it can be skipped.
52	
53	     For example, as ConvertJoinMapJoin is removing the reduce sink, it may also have removed
a
54	     dynamic partition pruning operator chain. However, RemoveDynamicPruningBySize doesn't
know this
55	     and still tries to traverse that removed chain which will cause NPE.
56	
57	     This may also happen when RemoveDynamicPruningBySize happens first.
58	    */
59	  public HashSet<SparkPartitionPruningSinkOperator> pruningOpsRemovedByPriorOpt;
{noformat}

[ConvertJoinMapJoin|https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java#L155]
is inserted to opRules first and then [RemoveDynamicPruningBySize|https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java#L165]
and opRules is LinkedHashMap which is insertion-ordered, why say "There's no guarantee which
one runs first"?


> Remove unnecessary HoS DPP trees during map-join conversion
> -----------------------------------------------------------
>
>                 Key: HIVE-17087
>                 URL: https://issues.apache.org/jira/browse/HIVE-17087
>             Project: Hive
>          Issue Type: Sub-task
>          Components: Spark
>            Reporter: Sahil Takiar
>            Assignee: Sahil Takiar
>         Attachments: HIVE-17087.1.patch
>
>
> Ran the following query in the {{TestSparkCliDriver}}:
> {code:sql}
> set hive.spark.dynamic.partition.pruning=true;
> set hive.auto.convert.join=true;
> create table partitioned_table1 (col int) partitioned by (part_col int);
> create table partitioned_table2 (col int) partitioned by (part_col int);
> create table regular_table (col int);
> insert into table regular_table values (1);
> alter table partitioned_table1 add partition (part_col = 1);
> insert into table partitioned_table1 partition (part_col = 1) values (1), (2), (3), (4),
(5), (6), (7), (8), (9), (10);
> alter table partitioned_table2 add partition (part_col = 1);
> insert into table partitioned_table2 partition (part_col = 1) values (1), (2), (3), (4),
(5), (6), (7), (8), (9), (10);
> explain select * from partitioned_table1 where partitioned_table1.part_col in (select
regular_table.col from regular_table join partitioned_table2 on regular_table.col = partitioned_table2.part_col);
> {code}
> and got the following explain plan:
> {code}
> STAGE DEPENDENCIES:
>   Stage-2 is a root stage
>   Stage-4 depends on stages: Stage-2
>   Stage-5 depends on stages: Stage-4
>   Stage-3 depends on stages: Stage-5
>   Stage-1 depends on stages: Stage-3
>   Stage-0 depends on stages: Stage-1
> STAGE PLANS:
>   Stage: Stage-2
>     Spark
> #### A masked pattern was here ####
>       Vertices:
>         Map 4 
>             Map Operator Tree:
>                 TableScan
>                   alias: partitioned_table1
>                   Statistics: Num rows: 10 Data size: 11 Basic stats: COMPLETE Column
stats: NONE
>                   Select Operator
>                     expressions: col (type: int), part_col (type: int)
>                     outputColumnNames: _col0, _col1
>                     Statistics: Num rows: 10 Data size: 11 Basic stats: COMPLETE Column
stats: NONE
>                     Select Operator
>                       expressions: _col1 (type: int)
>                       outputColumnNames: _col0
>                       Statistics: Num rows: 10 Data size: 11 Basic stats: COMPLETE Column
stats: NONE
>                       Group By Operator
>                         keys: _col0 (type: int)
>                         mode: hash
>                         outputColumnNames: _col0
>                         Statistics: Num rows: 10 Data size: 11 Basic stats: COMPLETE
Column stats: NONE
>                         Spark Partition Pruning Sink Operator
>                           partition key expr: part_col
>                           Statistics: Num rows: 10 Data size: 11 Basic stats: COMPLETE
Column stats: NONE
>                           target column name: part_col
>                           target work: Map 3
>   Stage: Stage-4
>     Spark
> #### A masked pattern was here ####
>       Vertices:
>         Map 2 
>             Map Operator Tree:
>                 TableScan
>                   alias: regular_table
>                   Statistics: Num rows: 1 Data size: 1 Basic stats: COMPLETE Column stats:
NONE
>                   Filter Operator
>                     predicate: col is not null (type: boolean)
>                     Statistics: Num rows: 1 Data size: 1 Basic stats: COMPLETE Column
stats: NONE
>                     Select Operator
>                       expressions: col (type: int)
>                       outputColumnNames: _col0
>                       Statistics: Num rows: 1 Data size: 1 Basic stats: COMPLETE Column
stats: NONE
>                       Spark HashTable Sink Operator
>                         keys:
>                           0 _col0 (type: int)
>                           1 _col0 (type: int)
>                       Select Operator
>                         expressions: _col0 (type: int)
>                         outputColumnNames: _col0
>                         Statistics: Num rows: 1 Data size: 1 Basic stats: COMPLETE Column
stats: NONE
>                         Group By Operator
>                           keys: _col0 (type: int)
>                           mode: hash
>                           outputColumnNames: _col0
>                           Statistics: Num rows: 1 Data size: 1 Basic stats: COMPLETE
Column stats: NONE
>                           Spark Partition Pruning Sink Operator
>                             partition key expr: part_col
>                             Statistics: Num rows: 1 Data size: 1 Basic stats: COMPLETE
Column stats: NONE
>                             target column name: part_col
>                             target work: Map 3
>             Local Work:
>               Map Reduce Local Work
>   Stage: Stage-5
>     Spark
> #### A masked pattern was here ####
>   Stage: Stage-3
>     Spark
> #### A masked pattern was here ####
>       Vertices:
>         Map 3 
>             Map Operator Tree:
>                 TableScan
>                   alias: partitioned_table2
>                   Statistics: Num rows: 10 Data size: 11 Basic stats: COMPLETE Column
stats: NONE
>                   Select Operator
>                     expressions: part_col (type: int)
>                     outputColumnNames: _col0
>                     Statistics: Num rows: 10 Data size: 11 Basic stats: COMPLETE Column
stats: NONE
>                     Map Join Operator
>                       condition map:
>                            Inner Join 0 to 1
>                       keys:
>                         0 _col0 (type: int)
>                         1 _col0 (type: int)
>                       outputColumnNames: _col0
>                       input vertices:
>                         0 Map 2
>                       Statistics: Num rows: 11 Data size: 12 Basic stats: COMPLETE Column
stats: NONE
>                       Group By Operator
>                         keys: _col0 (type: int)
>                         mode: hash
>                         outputColumnNames: _col0
>                         Statistics: Num rows: 11 Data size: 12 Basic stats: COMPLETE
Column stats: NONE
>                         Spark HashTable Sink Operator
>                           keys:
>                             0 _col1 (type: int)
>                             1 _col0 (type: int)
>             Local Work:
>               Map Reduce Local Work
>   Stage: Stage-1
>     Spark
> #### A masked pattern was here ####
>       Vertices:
>         Map 1 
>             Map Operator Tree:
>                 TableScan
>                   alias: partitioned_table1
>                   Statistics: Num rows: 10 Data size: 11 Basic stats: COMPLETE Column
stats: NONE
>                   Select Operator
>                     expressions: col (type: int), part_col (type: int)
>                     outputColumnNames: _col0, _col1
>                     Statistics: Num rows: 10 Data size: 11 Basic stats: COMPLETE Column
stats: NONE
>                     Map Join Operator
>                       condition map:
>                            Left Semi Join 0 to 1
>                       keys:
>                         0 _col1 (type: int)
>                         1 _col0 (type: int)
>                       outputColumnNames: _col0, _col1
>                       input vertices:
>                         1 Map 3
>                       Statistics: Num rows: 12 Data size: 13 Basic stats: COMPLETE Column
stats: NONE
>                       File Output Operator
>                         compressed: false
>                         Statistics: Num rows: 12 Data size: 13 Basic stats: COMPLETE
Column stats: NONE
>                         table:
>                             input format: org.apache.hadoop.mapred.SequenceFileInputFormat
>                             output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
>                             serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
>             Local Work:
>               Map Reduce Local Work
>   Stage: Stage-0
>     Fetch Operator
>       limit: -1
>       Processor Tree:
>         ListSink
> {code}
> I see a couple of weird things in the above explain plan:
> * I don't think there should be a partitioned_table1 scan -> Spark Partition Pruning
Sink
> * I'm not sure what is happening with Stage-5 of the explain plan
> For reference, here is the explain plan for the equivalent query in Hive-on-Tez:
> {code}
> STAGE DEPENDENCIES:
>   Stage-1 is a root stage
>   Stage-0 depends on stages: Stage-1
> STAGE PLANS:
>   Stage: Stage-1
>     Tez
> #### A masked pattern was here ####
>       Edges:
>         Map 1 <- Map 3 (BROADCAST_EDGE)
>         Map 3 <- Map 2 (BROADCAST_EDGE)
> #### A masked pattern was here ####
>       Vertices:
>         Map 1 
>             Map Operator Tree:
>                 TableScan
>                   alias: partitioned_table1
>                   Statistics: Num rows: 10 Data size: 51 Basic stats: COMPLETE Column
stats: PARTIAL
>                   Select Operator
>                     expressions: col (type: int), part_col (type: int)
>                     outputColumnNames: _col0, _col1
>                     Statistics: Num rows: 10 Data size: 40 Basic stats: COMPLETE Column
stats: PARTIAL
>                     Map Join Operator
>                       condition map:
>                            Left Semi Join 0 to 1
>                       keys:
>                         0 _col1 (type: int)
>                         1 _col0 (type: int)
>                       outputColumnNames: _col0, _col1
>                       input vertices:
>                         1 Map 3
>                       Statistics: Num rows: 12 Data size: 48 Basic stats: COMPLETE Column
stats: NONE
>                       File Output Operator
>                         compressed: false
>                         Statistics: Num rows: 12 Data size: 48 Basic stats: COMPLETE
Column stats: NONE
>                         table:
>                             input format: org.apache.hadoop.mapred.SequenceFileInputFormat
>                             output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
>                             serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
>             Execution mode: llap
>             LLAP IO: no inputs
>         Map 2 
>             Map Operator Tree:
>                 TableScan
>                   alias: regular_table
>                   Statistics: Num rows: 1 Data size: 1 Basic stats: COMPLETE Column stats:
NONE
>                   Filter Operator
>                     predicate: col is not null (type: boolean)
>                     Statistics: Num rows: 1 Data size: 1 Basic stats: COMPLETE Column
stats: NONE
>                     Select Operator
>                       expressions: col (type: int)
>                       outputColumnNames: _col0
>                       Statistics: Num rows: 1 Data size: 1 Basic stats: COMPLETE Column
stats: NONE
>                       Reduce Output Operator
>                         key expressions: _col0 (type: int)
>                         sort order: +
>                         Map-reduce partition columns: _col0 (type: int)
>                         Statistics: Num rows: 1 Data size: 1 Basic stats: COMPLETE Column
stats: NONE
>                       Select Operator
>                         expressions: _col0 (type: int)
>                         outputColumnNames: _col0
>                         Statistics: Num rows: 1 Data size: 1 Basic stats: COMPLETE Column
stats: NONE
>                         Group By Operator
>                           keys: _col0 (type: int)
>                           mode: hash
>                           outputColumnNames: _col0
>                           Statistics: Num rows: 1 Data size: 1 Basic stats: COMPLETE
Column stats: NONE
>                           Dynamic Partitioning Event Operator
>                             Target column: part_col (int)
>                             Target Input: partitioned_table2
>                             Partition key expr: part_col
>                             Statistics: Num rows: 1 Data size: 1 Basic stats: COMPLETE
Column stats: NONE
>                             Target Vertex: Map 3
>             Execution mode: llap
>             LLAP IO: no inputs
>         Map 3 
>             Map Operator Tree:
>                 TableScan
>                   alias: partitioned_table2
>                   Statistics: Num rows: 10 Data size: 51 Basic stats: COMPLETE Column
stats: COMPLETE
>                   Select Operator
>                     expressions: part_col (type: int)
>                     outputColumnNames: _col0
>                     Statistics: Num rows: 10 Data size: 40 Basic stats: COMPLETE Column
stats: COMPLETE
>                     Map Join Operator
>                       condition map:
>                            Inner Join 0 to 1
>                       keys:
>                         0 _col0 (type: int)
>                         1 _col0 (type: int)
>                       outputColumnNames: _col0
>                       input vertices:
>                         0 Map 2
>                       Statistics: Num rows: 11 Data size: 44 Basic stats: COMPLETE Column
stats: NONE
>                       Group By Operator
>                         keys: _col0 (type: int)
>                         mode: hash
>                         outputColumnNames: _col0
>                         Statistics: Num rows: 11 Data size: 44 Basic stats: COMPLETE
Column stats: NONE
>                         Reduce Output Operator
>                           key expressions: _col0 (type: int)
>                           sort order: +
>                           Map-reduce partition columns: _col0 (type: int)
>                           Statistics: Num rows: 11 Data size: 44 Basic stats: COMPLETE
Column stats: NONE
>                         Select Operator
>                           expressions: _col0 (type: int)
>                           outputColumnNames: _col0
>                           Statistics: Num rows: 11 Data size: 44 Basic stats: COMPLETE
Column stats: NONE
>                           Group By Operator
>                             keys: _col0 (type: int)
>                             mode: hash
>                             outputColumnNames: _col0
>                             Statistics: Num rows: 11 Data size: 44 Basic stats: COMPLETE
Column stats: NONE
>                             Dynamic Partitioning Event Operator
>                               Target column: part_col (int)
>                               Target Input: partitioned_table1
>                               Partition key expr: part_col
>                               Statistics: Num rows: 11 Data size: 44 Basic stats: COMPLETE
Column stats: NONE
>                               Target Vertex: Map 1
>             Execution mode: llap
>             LLAP IO: no inputs
>   Stage: Stage-0
>     Fetch Operator
>       limit: -1
>       Processor Tree:
>         ListSink
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message