hive-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Mostafa Mokhtar (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (HIVE-9647) Discrepancy in CBO between partitioned and un-partitioned tables
Date Wed, 11 Feb 2015 00:01:12 GMT

     [ https://issues.apache.org/jira/browse/HIVE-9647?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Mostafa Mokhtar updated HIVE-9647:
----------------------------------
    Description: 
High-level summary
HiveRelMdSelectivity.computeInnerJoinSelectivity relies on per column number of distinct value
to estimate join selectivity.
The way statistics are aggregated for partitioned tables results in discrepancy in number
of distinct values which results in different plans between partitioned and un-partitioned
schemas.

The table below summarizes the NDVs in computeInnerJoinSelectivity which are used to estimate
selectivity of joins.

||Column	||Partitioned count distincts|| 	Un-Partitioned count distincts 
|sr_customer_sk	|71,245	|1,415,625|
|sr_item_sk	|38,846|	62,562|
|sr_ticket_number	|71,245	|34,931,085|
|ss_customer_sk	|88,476|	1,415,625|
|ss_item_sk	|38,846|	62,562|
|ss_ticket_number|	100,756	|56,256,175|

	
The discrepancy is because NDV calculation for a partitioned table assumes that the NDV range
is contained within each partition and is calculates as "select max(NUM_DISTINCTS) from PART_COL_STATS”
.
This is problematic for columns like ticket number which are naturally increasing with the
partitioned date column ss_sold_date_sk.
Suggestions
Use Hyper Log Log as suggested by Gopal, there is an HLL implementation for HBASE co-porccessors
which we can use as a reference here 
Using the global stats from TAB_COL_STATS and the per partition stats from PART_COL_STATS
extrapolate the NDV for the qualified partitions as in :
Max ( (NUM_DISTINCTS from TAB_COL_STATS) x (Number of qualified partitions) / (Number of Partitions),
max(NUM_DISTINCTS) from PART_COL_STATS))
More details
While doing TPC-DS Partitioned vs. Un-Partitioned runs I noticed that many of the plans are
different, then I dumped the CBO logical plan and I found that join estimates are drastically
different

Unpartitioned schema :
{code}
2015-02-10 11:33:27,624 DEBUG [main]: parse.SemanticAnalyzer (SemanticAnalyzer.java:apply(12624))
- Plan After Join Reordering:
HiveProjectRel(store_sales_quantitycount=[$0], store_sales_quantityave=[$1], store_sales_quantitystdev=[$2],
store_sales_quantitycov=[/($2, $1)], as_store_returns_quantitycount=[$3], as_store_returns_quantityave=[$4],
as_store_returns_quantitystdev=[$5], store_returns_quantitycov=[/($5, $4)]): rowcount = 1.0,
cumulative cost = {6.056835407771381E8 rows, 0.0 cpu, 0.0 io}, id = 2956
  HiveAggregateRel(group=[{}], agg#0=[count($0)], agg#1=[avg($0)], agg#2=[stddev_samp($0)],
agg#3=[count($1)], agg#4=[avg($1)], agg#5=[stddev_samp($1)]): rowcount = 1.0, cumulative cost
= {6.056835407771381E8 rows, 0.0 cpu, 0.0 io}, id = 2954
    HiveProjectRel($f0=[$4], $f1=[$8]): rowcount = 40.05611776795562, cumulative cost = {6.056835407771381E8
rows, 0.0 cpu, 0.0 io}, id = 2952
      HiveProjectRel(ss_sold_date_sk=[$0], ss_item_sk=[$1], ss_customer_sk=[$2], ss_ticket_number=[$3],
ss_quantity=[$4], sr_item_sk=[$5], sr_customer_sk=[$6], sr_ticket_number=[$7], sr_return_quantity=[$8],
d_date_sk=[$9], d_quarter_name=[$10]): rowcount = 40.05611776795562, cumulative cost = {6.056835407771381E8
rows, 0.0 cpu, 0.0 io}, id = 2982
        HiveJoinRel(condition=[=($9, $0)], joinType=[inner]): rowcount = 40.05611776795562,
cumulative cost = {6.056835407771381E8 rows, 0.0 cpu, 0.0 io}, id = 2980
          HiveJoinRel(condition=[AND(AND(=($2, $6), =($1, $5)), =($3, $7))], joinType=[inner]):
rowcount = 28880.460910696, cumulative cost = {6.05654559E8 rows, 0.0 cpu, 0.0 io}, id = 2964
            HiveProjectRel(ss_sold_date_sk=[$0], ss_item_sk=[$2], ss_customer_sk=[$3], ss_ticket_number=[$9],
ss_quantity=[$10]): rowcount = 5.50076554E8, cumulative cost = {0.0 rows, 0.0 cpu, 0.0 io},
id = 2920
              HiveTableScanRel(table=[[tpcds_bin_orc_200.store_sales]]): rowcount = 5.50076554E8,
cumulative cost = {0}, id = 2822
            HiveProjectRel(sr_item_sk=[$2], sr_customer_sk=[$3], sr_ticket_number=[$9], sr_return_quantity=[$10]):
rowcount = 5.5578005E7, cumulative cost = {0.0 rows, 0.0 cpu, 0.0 io}, id = 2923
              HiveTableScanRel(table=[[tpcds_bin_orc_200.store_returns]]): rowcount = 5.5578005E7,
cumulative cost = {0}, id = 2823
          HiveProjectRel(d_date_sk=[$0], d_quarter_name=[$15]): rowcount = 101.31622746185853,
cumulative cost = {0.0 rows, 0.0 cpu, 0.0 io}, id = 2948
            HiveFilterRel(condition=[=($15, '2000Q1')]): rowcount = 101.31622746185853, cumulative
cost = {0.0 rows, 0.0 cpu, 0.0 io}, id = 2946
              HiveTableScanRel(table=[[tpcds_bin_orc_200.date_dim]]): rowcount = 73049.0,
cumulative cost = {0}, id = 2821
{code}

Partitioned schema :
{code}
2015-02-10 11:32:16,880 DEBUG [main]: parse.SemanticAnalyzer (SemanticAnalyzer.java:apply(12624))
- Plan After Join Reordering:
HiveProjectRel(store_sales_quantitycount=[$0], store_sales_quantityave=[$1], store_sales_quantitystdev=[$2],
store_sales_quantitycov=[/($2, $1)], as_store_returns_quantitycount=[$3], as_store_returns_quantityave=[$4],
as_store_returns_quantitystdev=[$5], store_returns_quantitycov=[/($5, $4)]): rowcount = 1.0,
cumulative cost = {6.064175958973647E8 rows, 0.0 cpu, 0.0 io}, id = 2791
  HiveAggregateRel(group=[{}], agg#0=[count($0)], agg#1=[avg($0)], agg#2=[stddev_samp($0)],
agg#3=[count($1)], agg#4=[avg($1)], agg#5=[stddev_samp($1)]): rowcount = 1.0, cumulative cost
= {6.064175958973647E8 rows, 0.0 cpu, 0.0 io}, id = 2789
    HiveProjectRel($f0=[$3], $f1=[$8]): rowcount = 100840.08570910375, cumulative cost = {6.064175958973647E8
rows, 0.0 cpu, 0.0 io}, id = 2787
      HiveProjectRel(ss_item_sk=[$4], ss_customer_sk=[$5], ss_ticket_number=[$6], ss_quantity=[$7],
ss_sold_date_sk=[$8], sr_item_sk=[$0], sr_customer_sk=[$1], sr_ticket_number=[$2], sr_return_quantity=[$3],
d_date_sk=[$9], d_quarter_name=[$10]): rowcount = 100840.08570910375, cumulative cost = {6.064175958973647E8
rows, 0.0 cpu, 0.0 io}, id = 2817
        HiveJoinRel(condition=[AND(AND(=($5, $1), =($4, $0)), =($6, $2))], joinType=[inner]):
rowcount = 100840.08570910375, cumulative cost = {6.064175958973647E8 rows, 0.0 cpu, 0.0 io},
id = 2815
          HiveProjectRel(sr_item_sk=[$1], sr_customer_sk=[$2], sr_ticket_number=[$8], sr_return_quantity=[$9]):
rowcount = 5.5578005E7, cumulative cost = {0.0 rows, 0.0 cpu, 0.0 io}, id = 2758
            HiveTableScanRel(table=[[tpcds_bin_partitioned_orc_200_orig.store_returns]]):
rowcount = 5.5578005E7, cumulative cost = {0}, id = 2658
          HiveJoinRel(condition=[=($5, $4)], joinType=[inner]): rowcount = 762935.5811373093,
cumulative cost = {5.500766553162274E8 rows, 0.0 cpu, 0.0 io}, id = 2801
            HiveProjectRel(ss_item_sk=[$1], ss_customer_sk=[$2], ss_ticket_number=[$8], ss_quantity=[$9],
ss_sold_date_sk=[$22]): rowcount = 5.50076554E8, cumulative cost = {0.0 rows, 0.0 cpu, 0.0
io}, id = 2755
              HiveTableScanRel(table=[[tpcds_bin_partitioned_orc_200_orig.store_sales]]):
rowcount = 5.50076554E8, cumulative cost = {0}, id = 2657
            HiveProjectRel(d_date_sk=[$0], d_quarter_name=[$15]): rowcount = 101.31622746185853,
cumulative cost = {0.0 rows, 0.0 cpu, 0.0 io}, id = 2783
              HiveFilterRel(condition=[=($15, '2000Q1')]): rowcount = 101.31622746185853,
cumulative cost = {0.0 rows, 0.0 cpu, 0.0 io}, id = 2781
                HiveTableScanRel(table=[[tpcds_bin_partitioned_orc_200_orig.date_dim]]): rowcount
= 73049.0, cumulative cost = {0}, id = 2656
{code}

This was puzzling knowing that the stats for both tables are “identical” in TAB_COL_STATS.

Column statistics from TAB_COL_STATS, notice how the column statistics are identical in both
cases.
{code}
||DB_NAME	||COLUMN_NAME||	COLUMN_TYPE||	NUM_NULLS||	LONG_HIGH_VALUE||	LONG_LOW_VALUE||	MAX_COL_LEN||
NUM_DISTINCTS||
|tpcds_bin_orc_200|	d_date_sk|	int|	0|	2,488,070|	2,415,022|	NULL|	65,332|
|tpcds_bin_partitioned_orc_200|	d_date_sk|	int|	0|	2,488,070|	2,415,022|	NULL|	65,332|
|tpcds_bin_orc_200	|d_quarter_name|	string|	0|	NULL|	NULL|	6|	721|
|tpcds_bin_partitioned_orc_200|	d_quarter_name|	string|	0|	NULL|	NULL|	6|	721|
|tpcds_bin_orc_200|	sr_customer_sk|	int|	1,009,571|	1,600,000|	1|	NULL|	1,415,625|
|tpcds_bin_partitioned_orc_200|	sr_customer_sk|	int|	1,009,571|	1,600,000|	1|	NULL|	1,415,625|
|tpcds_bin_orc_200	|sr_item_sk|	int|	0|	48,000|	1|	NULL|	62,562|
|tpcds_bin_partitioned_orc_200|	sr_item_sk|	int|	0|	48,000|	1|	NULL|	62,562|
|tpcds_bin_orc_200	|sr_ticket_number|	int|	0|	48,000,000|	1|	NULL|	34,931,085|
|tpcds_bin_partitioned_orc_200|	sr_ticket_number|	int|	0|	48,000,000|	1|	NULL|	34,931,085|
|tpcds_bin_orc_200	|ss_customer_sk|	int|	12,960,424|	1,600,000|	1|	NULL|	1,415,625|
|tpcds_bin_partitioned_orc_200|	ss_customer_sk|	int|	12,960,424|	1,600,000|	1|	NULL|	1,415,625|
|tpcds_bin_orc_200	|ss_item_sk|	int|	0|	48,000|	1|	NULL|	62,562|
|tpcds_bin_partitioned_orc_200|	ss_item_sk|	int|	0|	48,000|	1	|NULL|	62,562|
|tpcds_bin_orc_200|	ss_sold_date_sk|	int|	0|	2,452,642|	2,450,816|	NULL|	2,226|
|tpcds_bin_partitioned_orc_200|	ss_sold_date_sk	|int|	0|	2,452,642|	2,450,816|	NULL|	2,226|
|tpcds_bin_orc_200	|ss_ticket_number|	int|	0|	48,000,000|	1|	NULL|	56,256,175|
|tpcds_bin_partitioned_orc_200|	ss_ticket_number|	int|	0|	48,000,000|	1|	NULL|	56,256,175|


For partitioned tables we get the statistics using get_aggr_stats_for which eventually issues
the query below

{code}
select 
    COLUMN_NAME,
    COLUMN_TYPE,
    …
    max(NUM_DISTINCTS),
    …
from
    PART_COL_STATS
Where
where
    DB_NAME = 
        and TABLE_NAME = 
        and COLUMN_NAME in 
        and PARTITION_NAME in (1 … N)
group by COLUMN_NAME , COLUMN_TYPE;
{code}
 …

  was:
High-level summary
HiveRelMdSelectivity.computeInnerJoinSelectivity relies on per column number of distinct value
to estimate join selectivity.
The way statistics are aggregated for partitioned tables results in discrepancy in number
of distinct values which results in different plans between partitioned and un-partitioned
schemas.

The table below summarizes the NDVs in computeInnerJoinSelectivity which are used to estimate
selectivity of joins.

||Column	||Partitioned count distincts|| 	Un-Partitioned count distincts 
|sr_customer_sk	|71,245	|1,415,625|
|sr_item_sk	|38,846|	62,562|
|sr_ticket_number	|71,245	|34,931,085|
|ss_customer_sk	|88,476|	1,415,625|
|ss_item_sk	|38,846|	62,562|
|ss_ticket_number|	100,756	|56,256,175|

	
The discrepancy is because NDV calculation for a partitioned table assumes that the NDV range
is contained within each partition and is calculates as "select max(NUM_DISTINCTS) from PART_COL_STATS”
.
This is problematic for columns like ticket number which are naturally increasing with the
partitioned date column ss_sold_date_sk.
Suggestions
Use Hyper Log Log as suggested by Gopal, there is an HLL implementation for HBASE co-porccessors
which we can use as a reference here 
Using the global stats from TAB_COL_STATS and the per partition stats from PART_COL_STATS
extrapolate the NDV for the qualified partitions as in :
Max ( (NUM_DISTINCTS from TAB_COL_STATS) x (Number of qualified partitions) / (Number of Partitions),
max(NUM_DISTINCTS) from PART_COL_STATS))
More details
While doing TPC-DS Partitioned vs. Un-Partitioned runs I noticed that many of the plans are
different, then I dumped the CBO logical plan and I found that join estimates are drastically
different

Unpartitioned schema :
{code}
2015-02-10 11:33:27,624 DEBUG [main]: parse.SemanticAnalyzer (SemanticAnalyzer.java:apply(12624))
- Plan After Join Reordering:
HiveProjectRel(store_sales_quantitycount=[$0], store_sales_quantityave=[$1], store_sales_quantitystdev=[$2],
store_sales_quantitycov=[/($2, $1)], as_store_returns_quantitycount=[$3], as_store_returns_quantityave=[$4],
as_store_returns_quantitystdev=[$5], store_returns_quantitycov=[/($5, $4)]): rowcount = 1.0,
cumulative cost = {6.056835407771381E8 rows, 0.0 cpu, 0.0 io}, id = 2956
  HiveAggregateRel(group=[{}], agg#0=[count($0)], agg#1=[avg($0)], agg#2=[stddev_samp($0)],
agg#3=[count($1)], agg#4=[avg($1)], agg#5=[stddev_samp($1)]): rowcount = 1.0, cumulative cost
= {6.056835407771381E8 rows, 0.0 cpu, 0.0 io}, id = 2954
    HiveProjectRel($f0=[$4], $f1=[$8]): rowcount = 40.05611776795562, cumulative cost = {6.056835407771381E8
rows, 0.0 cpu, 0.0 io}, id = 2952
      HiveProjectRel(ss_sold_date_sk=[$0], ss_item_sk=[$1], ss_customer_sk=[$2], ss_ticket_number=[$3],
ss_quantity=[$4], sr_item_sk=[$5], sr_customer_sk=[$6], sr_ticket_number=[$7], sr_return_quantity=[$8],
d_date_sk=[$9], d_quarter_name=[$10]): rowcount = 40.05611776795562, cumulative cost = {6.056835407771381E8
rows, 0.0 cpu, 0.0 io}, id = 2982
        HiveJoinRel(condition=[=($9, $0)], joinType=[inner]): rowcount = 40.05611776795562,
cumulative cost = {6.056835407771381E8 rows, 0.0 cpu, 0.0 io}, id = 2980
          HiveJoinRel(condition=[AND(AND(=($2, $6), =($1, $5)), =($3, $7))], joinType=[inner]):
rowcount = 28880.460910696, cumulative cost = {6.05654559E8 rows, 0.0 cpu, 0.0 io}, id = 2964
            HiveProjectRel(ss_sold_date_sk=[$0], ss_item_sk=[$2], ss_customer_sk=[$3], ss_ticket_number=[$9],
ss_quantity=[$10]): rowcount = 5.50076554E8, cumulative cost = {0.0 rows, 0.0 cpu, 0.0 io},
id = 2920
              HiveTableScanRel(table=[[tpcds_bin_orc_200.store_sales]]): rowcount = 5.50076554E8,
cumulative cost = {0}, id = 2822
            HiveProjectRel(sr_item_sk=[$2], sr_customer_sk=[$3], sr_ticket_number=[$9], sr_return_quantity=[$10]):
rowcount = 5.5578005E7, cumulative cost = {0.0 rows, 0.0 cpu, 0.0 io}, id = 2923
              HiveTableScanRel(table=[[tpcds_bin_orc_200.store_returns]]): rowcount = 5.5578005E7,
cumulative cost = {0}, id = 2823
          HiveProjectRel(d_date_sk=[$0], d_quarter_name=[$15]): rowcount = 101.31622746185853,
cumulative cost = {0.0 rows, 0.0 cpu, 0.0 io}, id = 2948
            HiveFilterRel(condition=[=($15, '2000Q1')]): rowcount = 101.31622746185853, cumulative
cost = {0.0 rows, 0.0 cpu, 0.0 io}, id = 2946
              HiveTableScanRel(table=[[tpcds_bin_orc_200.date_dim]]): rowcount = 73049.0,
cumulative cost = {0}, id = 2821
{code}

Partitioned schema :
{code}
2015-02-10 11:32:16,880 DEBUG [main]: parse.SemanticAnalyzer (SemanticAnalyzer.java:apply(12624))
- Plan After Join Reordering:
HiveProjectRel(store_sales_quantitycount=[$0], store_sales_quantityave=[$1], store_sales_quantitystdev=[$2],
store_sales_quantitycov=[/($2, $1)], as_store_returns_quantitycount=[$3], as_store_returns_quantityave=[$4],
as_store_returns_quantitystdev=[$5], store_returns_quantitycov=[/($5, $4)]): rowcount = 1.0,
cumulative cost = {6.064175958973647E8 rows, 0.0 cpu, 0.0 io}, id = 2791
  HiveAggregateRel(group=[{}], agg#0=[count($0)], agg#1=[avg($0)], agg#2=[stddev_samp($0)],
agg#3=[count($1)], agg#4=[avg($1)], agg#5=[stddev_samp($1)]): rowcount = 1.0, cumulative cost
= {6.064175958973647E8 rows, 0.0 cpu, 0.0 io}, id = 2789
    HiveProjectRel($f0=[$3], $f1=[$8]): rowcount = 100840.08570910375, cumulative cost = {6.064175958973647E8
rows, 0.0 cpu, 0.0 io}, id = 2787
      HiveProjectRel(ss_item_sk=[$4], ss_customer_sk=[$5], ss_ticket_number=[$6], ss_quantity=[$7],
ss_sold_date_sk=[$8], sr_item_sk=[$0], sr_customer_sk=[$1], sr_ticket_number=[$2], sr_return_quantity=[$3],
d_date_sk=[$9], d_quarter_name=[$10]): rowcount = 100840.08570910375, cumulative cost = {6.064175958973647E8
rows, 0.0 cpu, 0.0 io}, id = 2817
        HiveJoinRel(condition=[AND(AND(=($5, $1), =($4, $0)), =($6, $2))], joinType=[inner]):
rowcount = 100840.08570910375, cumulative cost = {6.064175958973647E8 rows, 0.0 cpu, 0.0 io},
id = 2815
          HiveProjectRel(sr_item_sk=[$1], sr_customer_sk=[$2], sr_ticket_number=[$8], sr_return_quantity=[$9]):
rowcount = 5.5578005E7, cumulative cost = {0.0 rows, 0.0 cpu, 0.0 io}, id = 2758
            HiveTableScanRel(table=[[tpcds_bin_partitioned_orc_200_orig.store_returns]]):
rowcount = 5.5578005E7, cumulative cost = {0}, id = 2658
          HiveJoinRel(condition=[=($5, $4)], joinType=[inner]): rowcount = 762935.5811373093,
cumulative cost = {5.500766553162274E8 rows, 0.0 cpu, 0.0 io}, id = 2801
            HiveProjectRel(ss_item_sk=[$1], ss_customer_sk=[$2], ss_ticket_number=[$8], ss_quantity=[$9],
ss_sold_date_sk=[$22]): rowcount = 5.50076554E8, cumulative cost = {0.0 rows, 0.0 cpu, 0.0
io}, id = 2755
              HiveTableScanRel(table=[[tpcds_bin_partitioned_orc_200_orig.store_sales]]):
rowcount = 5.50076554E8, cumulative cost = {0}, id = 2657
            HiveProjectRel(d_date_sk=[$0], d_quarter_name=[$15]): rowcount = 101.31622746185853,
cumulative cost = {0.0 rows, 0.0 cpu, 0.0 io}, id = 2783
              HiveFilterRel(condition=[=($15, '2000Q1')]): rowcount = 101.31622746185853,
cumulative cost = {0.0 rows, 0.0 cpu, 0.0 io}, id = 2781
                HiveTableScanRel(table=[[tpcds_bin_partitioned_orc_200_orig.date_dim]]): rowcount
= 73049.0, cumulative cost = {0}, id = 2656
{code}

This was puzzling knowing that the stats for both tables are “identical” in TAB_COL_STATS.

Column statistics from TAB_COL_STATS, notice how the column statistics are identical in both
cases.
{code}
DB_NAME	COLUMN_NAME	COLUMN_TYPE	NUM_NULLS	LONG_HIGH_VALUE	LONG_LOW_VALUE	MAX_COL_LEN	NUM_DISTINCTS
tpcds_bin_orc_200	d_date_sk	int	0	2,488,070	2,415,022	NULL	65,332
tpcds_bin_partitioned_orc_200	d_date_sk	int	0	2,488,070	2,415,022	NULL	65,332
tpcds_bin_orc_200	d_quarter_name	string	0	NULL	NULL	6	721
tpcds_bin_partitioned_orc_200	d_quarter_name	string	0	NULL	NULL	6	721
tpcds_bin_orc_200	sr_customer_sk	int	1,009,571	1,600,000	1	NULL	1,415,625
tpcds_bin_partitioned_orc_200	sr_customer_sk	int	1,009,571	1,600,000	1	NULL	1,415,625
tpcds_bin_orc_200	sr_item_sk	int	0	48,000	1	NULL	62,562
tpcds_bin_partitioned_orc_200	sr_item_sk	int	0	48,000	1	NULL	62,562
tpcds_bin_orc_200	sr_ticket_number	int	0	48,000,000	1	NULL	34,931,085
tpcds_bin_partitioned_orc_200	sr_ticket_number	int	0	48,000,000	1	NULL	34,931,085
tpcds_bin_orc_200	ss_customer_sk	int	12,960,424	1,600,000	1	NULL	1,415,625
tpcds_bin_partitioned_orc_200	ss_customer_sk	int	12,960,424	1,600,000	1	NULL	1,415,625
tpcds_bin_orc_200	ss_item_sk	int	0	48,000	1	NULL	62,562
tpcds_bin_partitioned_orc_200	ss_item_sk	int	0	48,000	1	NULL	62,562
tpcds_bin_orc_200	ss_sold_date_sk	int	0	2,452,642	2,450,816	NULL	2,226
tpcds_bin_partitioned_orc_200	ss_sold_date_sk	int	0	2,452,642	2,450,816	NULL	2,226
tpcds_bin_orc_200	ss_ticket_number	int	0	48,000,000	1	NULL	56,256,175
tpcds_bin_partitioned_orc_200	ss_ticket_number	int	0	48,000,000	1	NULL	56,256,175
{code}

For partitioned tables we get the statistics using get_aggr_stats_for which eventually issues
the query below

{code}
select 
    COLUMN_NAME,
    COLUMN_TYPE,
    …
    max(NUM_DISTINCTS),
    …
from
    PART_COL_STATS
Where
where
    DB_NAME = 
        and TABLE_NAME = 
        and COLUMN_NAME in 
        and PARTITION_NAME in (1 … N)
group by COLUMN_NAME , COLUMN_TYPE;
{code}
 …


> Discrepancy in CBO between partitioned and un-partitioned tables 
> -----------------------------------------------------------------
>
>                 Key: HIVE-9647
>                 URL: https://issues.apache.org/jira/browse/HIVE-9647
>             Project: Hive
>          Issue Type: Bug
>          Components: CBO
>    Affects Versions: 0.14.0
>            Reporter: Mostafa Mokhtar
>            Assignee: Gunther Hagleitner
>             Fix For: 1.2.0
>
>
> High-level summary
> HiveRelMdSelectivity.computeInnerJoinSelectivity relies on per column number of distinct
value to estimate join selectivity.
> The way statistics are aggregated for partitioned tables results in discrepancy in number
of distinct values which results in different plans between partitioned and un-partitioned
schemas.
> The table below summarizes the NDVs in computeInnerJoinSelectivity which are used to
estimate selectivity of joins.
> ||Column	||Partitioned count distincts|| 	Un-Partitioned count distincts 
> |sr_customer_sk	|71,245	|1,415,625|
> |sr_item_sk	|38,846|	62,562|
> |sr_ticket_number	|71,245	|34,931,085|
> |ss_customer_sk	|88,476|	1,415,625|
> |ss_item_sk	|38,846|	62,562|
> |ss_ticket_number|	100,756	|56,256,175|
> 	
> The discrepancy is because NDV calculation for a partitioned table assumes that the NDV
range is contained within each partition and is calculates as "select max(NUM_DISTINCTS) from
PART_COL_STATS” .
> This is problematic for columns like ticket number which are naturally increasing with
the partitioned date column ss_sold_date_sk.
> Suggestions
> Use Hyper Log Log as suggested by Gopal, there is an HLL implementation for HBASE co-porccessors
which we can use as a reference here 
> Using the global stats from TAB_COL_STATS and the per partition stats from PART_COL_STATS
extrapolate the NDV for the qualified partitions as in :
> Max ( (NUM_DISTINCTS from TAB_COL_STATS) x (Number of qualified partitions) / (Number
of Partitions), max(NUM_DISTINCTS) from PART_COL_STATS))
> More details
> While doing TPC-DS Partitioned vs. Un-Partitioned runs I noticed that many of the plans
are different, then I dumped the CBO logical plan and I found that join estimates are drastically
different
> Unpartitioned schema :
> {code}
> 2015-02-10 11:33:27,624 DEBUG [main]: parse.SemanticAnalyzer (SemanticAnalyzer.java:apply(12624))
- Plan After Join Reordering:
> HiveProjectRel(store_sales_quantitycount=[$0], store_sales_quantityave=[$1], store_sales_quantitystdev=[$2],
store_sales_quantitycov=[/($2, $1)], as_store_returns_quantitycount=[$3], as_store_returns_quantityave=[$4],
as_store_returns_quantitystdev=[$5], store_returns_quantitycov=[/($5, $4)]): rowcount = 1.0,
cumulative cost = {6.056835407771381E8 rows, 0.0 cpu, 0.0 io}, id = 2956
>   HiveAggregateRel(group=[{}], agg#0=[count($0)], agg#1=[avg($0)], agg#2=[stddev_samp($0)],
agg#3=[count($1)], agg#4=[avg($1)], agg#5=[stddev_samp($1)]): rowcount = 1.0, cumulative cost
= {6.056835407771381E8 rows, 0.0 cpu, 0.0 io}, id = 2954
>     HiveProjectRel($f0=[$4], $f1=[$8]): rowcount = 40.05611776795562, cumulative cost
= {6.056835407771381E8 rows, 0.0 cpu, 0.0 io}, id = 2952
>       HiveProjectRel(ss_sold_date_sk=[$0], ss_item_sk=[$1], ss_customer_sk=[$2], ss_ticket_number=[$3],
ss_quantity=[$4], sr_item_sk=[$5], sr_customer_sk=[$6], sr_ticket_number=[$7], sr_return_quantity=[$8],
d_date_sk=[$9], d_quarter_name=[$10]): rowcount = 40.05611776795562, cumulative cost = {6.056835407771381E8
rows, 0.0 cpu, 0.0 io}, id = 2982
>         HiveJoinRel(condition=[=($9, $0)], joinType=[inner]): rowcount = 40.05611776795562,
cumulative cost = {6.056835407771381E8 rows, 0.0 cpu, 0.0 io}, id = 2980
>           HiveJoinRel(condition=[AND(AND(=($2, $6), =($1, $5)), =($3, $7))], joinType=[inner]):
rowcount = 28880.460910696, cumulative cost = {6.05654559E8 rows, 0.0 cpu, 0.0 io}, id = 2964
>             HiveProjectRel(ss_sold_date_sk=[$0], ss_item_sk=[$2], ss_customer_sk=[$3],
ss_ticket_number=[$9], ss_quantity=[$10]): rowcount = 5.50076554E8, cumulative cost = {0.0
rows, 0.0 cpu, 0.0 io}, id = 2920
>               HiveTableScanRel(table=[[tpcds_bin_orc_200.store_sales]]): rowcount = 5.50076554E8,
cumulative cost = {0}, id = 2822
>             HiveProjectRel(sr_item_sk=[$2], sr_customer_sk=[$3], sr_ticket_number=[$9],
sr_return_quantity=[$10]): rowcount = 5.5578005E7, cumulative cost = {0.0 rows, 0.0 cpu, 0.0
io}, id = 2923
>               HiveTableScanRel(table=[[tpcds_bin_orc_200.store_returns]]): rowcount =
5.5578005E7, cumulative cost = {0}, id = 2823
>           HiveProjectRel(d_date_sk=[$0], d_quarter_name=[$15]): rowcount = 101.31622746185853,
cumulative cost = {0.0 rows, 0.0 cpu, 0.0 io}, id = 2948
>             HiveFilterRel(condition=[=($15, '2000Q1')]): rowcount = 101.31622746185853,
cumulative cost = {0.0 rows, 0.0 cpu, 0.0 io}, id = 2946
>               HiveTableScanRel(table=[[tpcds_bin_orc_200.date_dim]]): rowcount = 73049.0,
cumulative cost = {0}, id = 2821
> {code}
> Partitioned schema :
> {code}
> 2015-02-10 11:32:16,880 DEBUG [main]: parse.SemanticAnalyzer (SemanticAnalyzer.java:apply(12624))
- Plan After Join Reordering:
> HiveProjectRel(store_sales_quantitycount=[$0], store_sales_quantityave=[$1], store_sales_quantitystdev=[$2],
store_sales_quantitycov=[/($2, $1)], as_store_returns_quantitycount=[$3], as_store_returns_quantityave=[$4],
as_store_returns_quantitystdev=[$5], store_returns_quantitycov=[/($5, $4)]): rowcount = 1.0,
cumulative cost = {6.064175958973647E8 rows, 0.0 cpu, 0.0 io}, id = 2791
>   HiveAggregateRel(group=[{}], agg#0=[count($0)], agg#1=[avg($0)], agg#2=[stddev_samp($0)],
agg#3=[count($1)], agg#4=[avg($1)], agg#5=[stddev_samp($1)]): rowcount = 1.0, cumulative cost
= {6.064175958973647E8 rows, 0.0 cpu, 0.0 io}, id = 2789
>     HiveProjectRel($f0=[$3], $f1=[$8]): rowcount = 100840.08570910375, cumulative cost
= {6.064175958973647E8 rows, 0.0 cpu, 0.0 io}, id = 2787
>       HiveProjectRel(ss_item_sk=[$4], ss_customer_sk=[$5], ss_ticket_number=[$6], ss_quantity=[$7],
ss_sold_date_sk=[$8], sr_item_sk=[$0], sr_customer_sk=[$1], sr_ticket_number=[$2], sr_return_quantity=[$3],
d_date_sk=[$9], d_quarter_name=[$10]): rowcount = 100840.08570910375, cumulative cost = {6.064175958973647E8
rows, 0.0 cpu, 0.0 io}, id = 2817
>         HiveJoinRel(condition=[AND(AND(=($5, $1), =($4, $0)), =($6, $2))], joinType=[inner]):
rowcount = 100840.08570910375, cumulative cost = {6.064175958973647E8 rows, 0.0 cpu, 0.0 io},
id = 2815
>           HiveProjectRel(sr_item_sk=[$1], sr_customer_sk=[$2], sr_ticket_number=[$8],
sr_return_quantity=[$9]): rowcount = 5.5578005E7, cumulative cost = {0.0 rows, 0.0 cpu, 0.0
io}, id = 2758
>             HiveTableScanRel(table=[[tpcds_bin_partitioned_orc_200_orig.store_returns]]):
rowcount = 5.5578005E7, cumulative cost = {0}, id = 2658
>           HiveJoinRel(condition=[=($5, $4)], joinType=[inner]): rowcount = 762935.5811373093,
cumulative cost = {5.500766553162274E8 rows, 0.0 cpu, 0.0 io}, id = 2801
>             HiveProjectRel(ss_item_sk=[$1], ss_customer_sk=[$2], ss_ticket_number=[$8],
ss_quantity=[$9], ss_sold_date_sk=[$22]): rowcount = 5.50076554E8, cumulative cost = {0.0
rows, 0.0 cpu, 0.0 io}, id = 2755
>               HiveTableScanRel(table=[[tpcds_bin_partitioned_orc_200_orig.store_sales]]):
rowcount = 5.50076554E8, cumulative cost = {0}, id = 2657
>             HiveProjectRel(d_date_sk=[$0], d_quarter_name=[$15]): rowcount = 101.31622746185853,
cumulative cost = {0.0 rows, 0.0 cpu, 0.0 io}, id = 2783
>               HiveFilterRel(condition=[=($15, '2000Q1')]): rowcount = 101.31622746185853,
cumulative cost = {0.0 rows, 0.0 cpu, 0.0 io}, id = 2781
>                 HiveTableScanRel(table=[[tpcds_bin_partitioned_orc_200_orig.date_dim]]):
rowcount = 73049.0, cumulative cost = {0}, id = 2656
> {code}
> This was puzzling knowing that the stats for both tables are “identical” in TAB_COL_STATS.
> Column statistics from TAB_COL_STATS, notice how the column statistics are identical
in both cases.
> {code}
> ||DB_NAME	||COLUMN_NAME||	COLUMN_TYPE||	NUM_NULLS||	LONG_HIGH_VALUE||	LONG_LOW_VALUE||
MAX_COL_LEN||	NUM_DISTINCTS||
> |tpcds_bin_orc_200|	d_date_sk|	int|	0|	2,488,070|	2,415,022|	NULL|	65,332|
> |tpcds_bin_partitioned_orc_200|	d_date_sk|	int|	0|	2,488,070|	2,415,022|	NULL|	65,332|
> |tpcds_bin_orc_200	|d_quarter_name|	string|	0|	NULL|	NULL|	6|	721|
> |tpcds_bin_partitioned_orc_200|	d_quarter_name|	string|	0|	NULL|	NULL|	6|	721|
> |tpcds_bin_orc_200|	sr_customer_sk|	int|	1,009,571|	1,600,000|	1|	NULL|	1,415,625|
> |tpcds_bin_partitioned_orc_200|	sr_customer_sk|	int|	1,009,571|	1,600,000|	1|	NULL|	1,415,625|
> |tpcds_bin_orc_200	|sr_item_sk|	int|	0|	48,000|	1|	NULL|	62,562|
> |tpcds_bin_partitioned_orc_200|	sr_item_sk|	int|	0|	48,000|	1|	NULL|	62,562|
> |tpcds_bin_orc_200	|sr_ticket_number|	int|	0|	48,000,000|	1|	NULL|	34,931,085|
> |tpcds_bin_partitioned_orc_200|	sr_ticket_number|	int|	0|	48,000,000|	1|	NULL|	34,931,085|
> |tpcds_bin_orc_200	|ss_customer_sk|	int|	12,960,424|	1,600,000|	1|	NULL|	1,415,625|
> |tpcds_bin_partitioned_orc_200|	ss_customer_sk|	int|	12,960,424|	1,600,000|	1|	NULL|
1,415,625|
> |tpcds_bin_orc_200	|ss_item_sk|	int|	0|	48,000|	1|	NULL|	62,562|
> |tpcds_bin_partitioned_orc_200|	ss_item_sk|	int|	0|	48,000|	1	|NULL|	62,562|
> |tpcds_bin_orc_200|	ss_sold_date_sk|	int|	0|	2,452,642|	2,450,816|	NULL|	2,226|
> |tpcds_bin_partitioned_orc_200|	ss_sold_date_sk	|int|	0|	2,452,642|	2,450,816|	NULL|
2,226|
> |tpcds_bin_orc_200	|ss_ticket_number|	int|	0|	48,000,000|	1|	NULL|	56,256,175|
> |tpcds_bin_partitioned_orc_200|	ss_ticket_number|	int|	0|	48,000,000|	1|	NULL|	56,256,175|
> For partitioned tables we get the statistics using get_aggr_stats_for which eventually
issues the query below
> {code}
> select 
>     COLUMN_NAME,
>     COLUMN_TYPE,
>     …
>     max(NUM_DISTINCTS),
>     …
> from
>     PART_COL_STATS
> Where
> where
>     DB_NAME = 
>         and TABLE_NAME = 
>         and COLUMN_NAME in 
>         and PARTITION_NAME in (1 … N)
> group by COLUMN_NAME , COLUMN_TYPE;
> {code}
>  …



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message