Return-Path: X-Original-To: apmail-hive-dev-archive@www.apache.org Delivered-To: apmail-hive-dev-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 9402F17E55 for ; Wed, 11 Feb 2015 00:01:13 +0000 (UTC) Received: (qmail 64310 invoked by uid 500); 11 Feb 2015 00:01:12 -0000 Delivered-To: apmail-hive-dev-archive@hive.apache.org Received: (qmail 63930 invoked by uid 500); 11 Feb 2015 00:01:12 -0000 Mailing-List: contact dev-help@hive.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hive.apache.org Delivered-To: mailing list dev@hive.apache.org Received: (qmail 63805 invoked by uid 500); 11 Feb 2015 00:01:12 -0000 Delivered-To: apmail-hadoop-hive-dev@hadoop.apache.org Received: (qmail 63802 invoked by uid 99); 11 Feb 2015 00:01:12 -0000 Received: from arcas.apache.org (HELO arcas.apache.org) (140.211.11.28) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 11 Feb 2015 00:01:12 +0000 Date: Wed, 11 Feb 2015 00:01:12 +0000 (UTC) From: "Mostafa Mokhtar (JIRA)" To: hive-dev@hadoop.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Updated] (HIVE-9647) Discrepancy in CBO between partitioned and un-partitioned tables MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: quoted-printable X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 [ https://issues.apache.org/jira/browse/HIVE-9647?page=3Dcom.atlassian= .jira.plugin.system.issuetabpanels:all-tabpanel ] Mostafa Mokhtar updated HIVE-9647: ---------------------------------- Description:=20 High-level summary HiveRelMdSelectivity.computeInnerJoinSelectivity relies on per column numbe= r of distinct value to estimate join selectivity. The way statistics are aggregated for partitioned tables results in discrep= ancy in number of distinct values which results in different plans between = partitioned and un-partitioned schemas. The table below summarizes the NDVs in computeInnerJoinSelectivity which ar= e used to estimate selectivity of joins. ||Column=09||Partitioned count distincts|| =09Un-Partitioned count distinct= s=20 |sr_customer_sk=09|71,245=09|1,415,625| |sr_item_sk=09|38,846|=0962,562| |sr_ticket_number=09|71,245=09|34,931,085| |ss_customer_sk=09|88,476|=091,415,625| |ss_item_sk=09|38,846|=0962,562| |ss_ticket_number|=09100,756=09|56,256,175| =09 The discrepancy is because NDV calculation for a partitioned table assumes = that the NDV range is contained within each partition and is calculates as = "select max(NUM_DISTINCTS) from PART_COL_STATS=E2=80=9D . This is problematic for columns like ticket number which are naturally incr= easing with the partitioned date column ss_sold_date_sk. Suggestions Use Hyper Log Log as suggested by Gopal, there is an HLL implementation for= HBASE co-porccessors which we can use as a reference here=20 Using the global stats from TAB_COL_STATS and the per partition stats from = PART_COL_STATS extrapolate the NDV for the qualified partitions as in : Max ( (NUM_DISTINCTS from TAB_COL_STATS) x (Number of qualified partitions)= / (Number of Partitions), max(NUM_DISTINCTS) from PART_COL_STATS)) More details While doing TPC-DS Partitioned vs. Un-Partitioned runs I noticed that many = of the plans are different, then I dumped the CBO logical plan and I found = that join estimates are drastically different Unpartitioned schema : {code} 2015-02-10 11:33:27,624 DEBUG [main]: parse.SemanticAnalyzer (SemanticAnaly= zer.java:apply(12624)) - Plan After Join Reordering: HiveProjectRel(store_sales_quantitycount=3D[$0], store_sales_quantityave=3D= [$1], store_sales_quantitystdev=3D[$2], store_sales_quantitycov=3D[/($2, $1= )], as_store_returns_quantitycount=3D[$3], as_store_returns_quantityave=3D[= $4], as_store_returns_quantitystdev=3D[$5], store_returns_quantitycov=3D[/(= $5, $4)]): rowcount =3D 1.0, cumulative cost =3D {6.056835407771381E8 rows,= 0.0 cpu, 0.0 io}, id =3D 2956 HiveAggregateRel(group=3D[{}], agg#0=3D[count($0)], agg#1=3D[avg($0)], ag= g#2=3D[stddev_samp($0)], agg#3=3D[count($1)], agg#4=3D[avg($1)], agg#5=3D[s= tddev_samp($1)]): rowcount =3D 1.0, cumulative cost =3D {6.056835407771381E= 8 rows, 0.0 cpu, 0.0 io}, id =3D 2954 HiveProjectRel($f0=3D[$4], $f1=3D[$8]): rowcount =3D 40.05611776795562,= cumulative cost =3D {6.056835407771381E8 rows, 0.0 cpu, 0.0 io}, id =3D 29= 52 HiveProjectRel(ss_sold_date_sk=3D[$0], ss_item_sk=3D[$1], ss_customer= _sk=3D[$2], ss_ticket_number=3D[$3], ss_quantity=3D[$4], sr_item_sk=3D[$5],= sr_customer_sk=3D[$6], sr_ticket_number=3D[$7], sr_return_quantity=3D[$8],= d_date_sk=3D[$9], d_quarter_name=3D[$10]): rowcount =3D 40.05611776795562,= cumulative cost =3D {6.056835407771381E8 rows, 0.0 cpu, 0.0 io}, id =3D 29= 82 HiveJoinRel(condition=3D[=3D($9, $0)], joinType=3D[inner]): rowcoun= t =3D 40.05611776795562, cumulative cost =3D {6.056835407771381E8 rows, 0.0= cpu, 0.0 io}, id =3D 2980 HiveJoinRel(condition=3D[AND(AND(=3D($2, $6), =3D($1, $5)), =3D($= 3, $7))], joinType=3D[inner]): rowcount =3D 28880.460910696, cumulative cos= t =3D {6.05654559E8 rows, 0.0 cpu, 0.0 io}, id =3D 2964 HiveProjectRel(ss_sold_date_sk=3D[$0], ss_item_sk=3D[$2], ss_cu= stomer_sk=3D[$3], ss_ticket_number=3D[$9], ss_quantity=3D[$10]): rowcount = =3D 5.50076554E8, cumulative cost =3D {0.0 rows, 0.0 cpu, 0.0 io}, id =3D 2= 920 HiveTableScanRel(table=3D[[tpcds_bin_orc_200.store_sales]]): = rowcount =3D 5.50076554E8, cumulative cost =3D {0}, id =3D 2822 HiveProjectRel(sr_item_sk=3D[$2], sr_customer_sk=3D[$3], sr_tic= ket_number=3D[$9], sr_return_quantity=3D[$10]): rowcount =3D 5.5578005E7, c= umulative cost =3D {0.0 rows, 0.0 cpu, 0.0 io}, id =3D 2923 HiveTableScanRel(table=3D[[tpcds_bin_orc_200.store_returns]])= : rowcount =3D 5.5578005E7, cumulative cost =3D {0}, id =3D 2823 HiveProjectRel(d_date_sk=3D[$0], d_quarter_name=3D[$15]): rowcoun= t =3D 101.31622746185853, cumulative cost =3D {0.0 rows, 0.0 cpu, 0.0 io}, = id =3D 2948 HiveFilterRel(condition=3D[=3D($15, '2000Q1')]): rowcount =3D 1= 01.31622746185853, cumulative cost =3D {0.0 rows, 0.0 cpu, 0.0 io}, id =3D = 2946 HiveTableScanRel(table=3D[[tpcds_bin_orc_200.date_dim]]): row= count =3D 73049.0, cumulative cost =3D {0}, id =3D 2821 {code} Partitioned schema : {code} 2015-02-10 11:32:16,880 DEBUG [main]: parse.SemanticAnalyzer (SemanticAnaly= zer.java:apply(12624)) - Plan After Join Reordering: HiveProjectRel(store_sales_quantitycount=3D[$0], store_sales_quantityave=3D= [$1], store_sales_quantitystdev=3D[$2], store_sales_quantitycov=3D[/($2, $1= )], as_store_returns_quantitycount=3D[$3], as_store_returns_quantityave=3D[= $4], as_store_returns_quantitystdev=3D[$5], store_returns_quantitycov=3D[/(= $5, $4)]): rowcount =3D 1.0, cumulative cost =3D {6.064175958973647E8 rows,= 0.0 cpu, 0.0 io}, id =3D 2791 HiveAggregateRel(group=3D[{}], agg#0=3D[count($0)], agg#1=3D[avg($0)], ag= g#2=3D[stddev_samp($0)], agg#3=3D[count($1)], agg#4=3D[avg($1)], agg#5=3D[s= tddev_samp($1)]): rowcount =3D 1.0, cumulative cost =3D {6.064175958973647E= 8 rows, 0.0 cpu, 0.0 io}, id =3D 2789 HiveProjectRel($f0=3D[$3], $f1=3D[$8]): rowcount =3D 100840.08570910375= , cumulative cost =3D {6.064175958973647E8 rows, 0.0 cpu, 0.0 io}, id =3D 2= 787 HiveProjectRel(ss_item_sk=3D[$4], ss_customer_sk=3D[$5], ss_ticket_nu= mber=3D[$6], ss_quantity=3D[$7], ss_sold_date_sk=3D[$8], sr_item_sk=3D[$0],= sr_customer_sk=3D[$1], sr_ticket_number=3D[$2], sr_return_quantity=3D[$3],= d_date_sk=3D[$9], d_quarter_name=3D[$10]): rowcount =3D 100840.08570910375= , cumulative cost =3D {6.064175958973647E8 rows, 0.0 cpu, 0.0 io}, id =3D 2= 817 HiveJoinRel(condition=3D[AND(AND(=3D($5, $1), =3D($4, $0)), =3D($6,= $2))], joinType=3D[inner]): rowcount =3D 100840.08570910375, cumulative co= st =3D {6.064175958973647E8 rows, 0.0 cpu, 0.0 io}, id =3D 2815 HiveProjectRel(sr_item_sk=3D[$1], sr_customer_sk=3D[$2], sr_ticke= t_number=3D[$8], sr_return_quantity=3D[$9]): rowcount =3D 5.5578005E7, cumu= lative cost =3D {0.0 rows, 0.0 cpu, 0.0 io}, id =3D 2758 HiveTableScanRel(table=3D[[tpcds_bin_partitioned_orc_200_orig.s= tore_returns]]): rowcount =3D 5.5578005E7, cumulative cost =3D {0}, id =3D = 2658 HiveJoinRel(condition=3D[=3D($5, $4)], joinType=3D[inner]): rowco= unt =3D 762935.5811373093, cumulative cost =3D {5.500766553162274E8 rows, 0= .0 cpu, 0.0 io}, id =3D 2801 HiveProjectRel(ss_item_sk=3D[$1], ss_customer_sk=3D[$2], ss_tic= ket_number=3D[$8], ss_quantity=3D[$9], ss_sold_date_sk=3D[$22]): rowcount = =3D 5.50076554E8, cumulative cost =3D {0.0 rows, 0.0 cpu, 0.0 io}, id =3D 2= 755 HiveTableScanRel(table=3D[[tpcds_bin_partitioned_orc_200_orig= .store_sales]]): rowcount =3D 5.50076554E8, cumulative cost =3D {0}, id =3D= 2657 HiveProjectRel(d_date_sk=3D[$0], d_quarter_name=3D[$15]): rowco= unt =3D 101.31622746185853, cumulative cost =3D {0.0 rows, 0.0 cpu, 0.0 io}= , id =3D 2783 HiveFilterRel(condition=3D[=3D($15, '2000Q1')]): rowcount =3D= 101.31622746185853, cumulative cost =3D {0.0 rows, 0.0 cpu, 0.0 io}, id = =3D 2781 HiveTableScanRel(table=3D[[tpcds_bin_partitioned_orc_200_or= ig.date_dim]]): rowcount =3D 73049.0, cumulative cost =3D {0}, id =3D 2656 {code} This was puzzling knowing that the stats for both tables are =E2=80=9Cident= ical=E2=80=9D in TAB_COL_STATS. Column statistics from TAB_COL_STATS, notice how the column statistics are = identical in both cases. ||DB_NAME=09||COLUMN_NAME||=09COLUMN_TYPE||=09NUM_NULLS||=09LONG_HIGH_VALUE= ||=09LONG_LOW_VALUE||=09MAX_COL_LEN||=09NUM_DISTINCTS|| |tpcds_bin_orc_200|=09d_date_sk|=09int|=090|=092,488,070|=092,415,022|=09NU= LL|=0965,332| |tpcds_bin_partitioned_orc_200|=09d_date_sk|=09int|=090|=092,488,070|=092,4= 15,022|=09NULL|=0965,332| |tpcds_bin_orc_200=09|d_quarter_name|=09string|=090|=09NULL|=09NULL|=096|= =09721| |tpcds_bin_partitioned_orc_200|=09d_quarter_name|=09string|=090|=09NULL|=09= NULL|=096|=09721| |tpcds_bin_orc_200|=09sr_customer_sk|=09int|=091,009,571|=091,600,000|=091|= =09NULL|=091,415,625| |tpcds_bin_partitioned_orc_200|=09sr_customer_sk|=09int|=091,009,571|=091,6= 00,000|=091|=09NULL|=091,415,625| |tpcds_bin_orc_200=09|sr_item_sk|=09int|=090|=0948,000|=091|=09NULL|=0962,5= 62| |tpcds_bin_partitioned_orc_200|=09sr_item_sk|=09int|=090|=0948,000|=091|=09= NULL|=0962,562| |tpcds_bin_orc_200=09|sr_ticket_number|=09int|=090|=0948,000,000|=091|=09NU= LL|=0934,931,085| |tpcds_bin_partitioned_orc_200|=09sr_ticket_number|=09int|=090|=0948,000,00= 0|=091|=09NULL|=0934,931,085| |tpcds_bin_orc_200=09|ss_customer_sk|=09int|=0912,960,424|=091,600,000|=091= |=09NULL|=091,415,625| |tpcds_bin_partitioned_orc_200|=09ss_customer_sk|=09int|=0912,960,424|=091,= 600,000|=091|=09NULL|=091,415,625| |tpcds_bin_orc_200=09|ss_item_sk|=09int|=090|=0948,000|=091|=09NULL|=0962,5= 62| |tpcds_bin_partitioned_orc_200|=09ss_item_sk|=09int|=090|=0948,000|=091=09|= NULL|=0962,562| |tpcds_bin_orc_200|=09ss_sold_date_sk|=09int|=090|=092,452,642|=092,450,816= |=09NULL|=092,226| |tpcds_bin_partitioned_orc_200|=09ss_sold_date_sk=09|int|=090|=092,452,642|= =092,450,816|=09NULL|=092,226| |tpcds_bin_orc_200=09|ss_ticket_number|=09int|=090|=0948,000,000|=091|=09NU= LL|=0956,256,175| |tpcds_bin_partitioned_orc_200|=09ss_ticket_number|=09int|=090|=0948,000,00= 0|=091|=09NULL|=0956,256,175| For partitioned tables we get the statistics using get_aggr_stats_for which= eventually issues the query below {code} select=20 COLUMN_NAME, COLUMN_TYPE, =E2=80=A6 max(NUM_DISTINCTS), =E2=80=A6 from PART_COL_STATS Where where DB_NAME =3D=20 and TABLE_NAME =3D=20 and COLUMN_NAME in=20 and PARTITION_NAME in (1 =E2=80=A6 N) group by COLUMN_NAME , COLUMN_TYPE; {code} =E2=80=A6 was: High-level summary HiveRelMdSelectivity.computeInnerJoinSelectivity relies on per column numbe= r of distinct value to estimate join selectivity. The way statistics are aggregated for partitioned tables results in discrep= ancy in number of distinct values which results in different plans between = partitioned and un-partitioned schemas. The table below summarizes the NDVs in computeInnerJoinSelectivity which ar= e used to estimate selectivity of joins. ||Column=09||Partitioned count distincts|| =09Un-Partitioned count distinct= s=20 |sr_customer_sk=09|71,245=09|1,415,625| |sr_item_sk=09|38,846|=0962,562| |sr_ticket_number=09|71,245=09|34,931,085| |ss_customer_sk=09|88,476|=091,415,625| |ss_item_sk=09|38,846|=0962,562| |ss_ticket_number|=09100,756=09|56,256,175| =09 The discrepancy is because NDV calculation for a partitioned table assumes = that the NDV range is contained within each partition and is calculates as = "select max(NUM_DISTINCTS) from PART_COL_STATS=E2=80=9D . This is problematic for columns like ticket number which are naturally incr= easing with the partitioned date column ss_sold_date_sk. Suggestions Use Hyper Log Log as suggested by Gopal, there is an HLL implementation for= HBASE co-porccessors which we can use as a reference here=20 Using the global stats from TAB_COL_STATS and the per partition stats from = PART_COL_STATS extrapolate the NDV for the qualified partitions as in : Max ( (NUM_DISTINCTS from TAB_COL_STATS) x (Number of qualified partitions)= / (Number of Partitions), max(NUM_DISTINCTS) from PART_COL_STATS)) More details While doing TPC-DS Partitioned vs. Un-Partitioned runs I noticed that many = of the plans are different, then I dumped the CBO logical plan and I found = that join estimates are drastically different Unpartitioned schema : {code} 2015-02-10 11:33:27,624 DEBUG [main]: parse.SemanticAnalyzer (SemanticAnaly= zer.java:apply(12624)) - Plan After Join Reordering: HiveProjectRel(store_sales_quantitycount=3D[$0], store_sales_quantityave=3D= [$1], store_sales_quantitystdev=3D[$2], store_sales_quantitycov=3D[/($2, $1= )], as_store_returns_quantitycount=3D[$3], as_store_returns_quantityave=3D[= $4], as_store_returns_quantitystdev=3D[$5], store_returns_quantitycov=3D[/(= $5, $4)]): rowcount =3D 1.0, cumulative cost =3D {6.056835407771381E8 rows,= 0.0 cpu, 0.0 io}, id =3D 2956 HiveAggregateRel(group=3D[{}], agg#0=3D[count($0)], agg#1=3D[avg($0)], ag= g#2=3D[stddev_samp($0)], agg#3=3D[count($1)], agg#4=3D[avg($1)], agg#5=3D[s= tddev_samp($1)]): rowcount =3D 1.0, cumulative cost =3D {6.056835407771381E= 8 rows, 0.0 cpu, 0.0 io}, id =3D 2954 HiveProjectRel($f0=3D[$4], $f1=3D[$8]): rowcount =3D 40.05611776795562,= cumulative cost =3D {6.056835407771381E8 rows, 0.0 cpu, 0.0 io}, id =3D 29= 52 HiveProjectRel(ss_sold_date_sk=3D[$0], ss_item_sk=3D[$1], ss_customer= _sk=3D[$2], ss_ticket_number=3D[$3], ss_quantity=3D[$4], sr_item_sk=3D[$5],= sr_customer_sk=3D[$6], sr_ticket_number=3D[$7], sr_return_quantity=3D[$8],= d_date_sk=3D[$9], d_quarter_name=3D[$10]): rowcount =3D 40.05611776795562,= cumulative cost =3D {6.056835407771381E8 rows, 0.0 cpu, 0.0 io}, id =3D 29= 82 HiveJoinRel(condition=3D[=3D($9, $0)], joinType=3D[inner]): rowcoun= t =3D 40.05611776795562, cumulative cost =3D {6.056835407771381E8 rows, 0.0= cpu, 0.0 io}, id =3D 2980 HiveJoinRel(condition=3D[AND(AND(=3D($2, $6), =3D($1, $5)), =3D($= 3, $7))], joinType=3D[inner]): rowcount =3D 28880.460910696, cumulative cos= t =3D {6.05654559E8 rows, 0.0 cpu, 0.0 io}, id =3D 2964 HiveProjectRel(ss_sold_date_sk=3D[$0], ss_item_sk=3D[$2], ss_cu= stomer_sk=3D[$3], ss_ticket_number=3D[$9], ss_quantity=3D[$10]): rowcount = =3D 5.50076554E8, cumulative cost =3D {0.0 rows, 0.0 cpu, 0.0 io}, id =3D 2= 920 HiveTableScanRel(table=3D[[tpcds_bin_orc_200.store_sales]]): = rowcount =3D 5.50076554E8, cumulative cost =3D {0}, id =3D 2822 HiveProjectRel(sr_item_sk=3D[$2], sr_customer_sk=3D[$3], sr_tic= ket_number=3D[$9], sr_return_quantity=3D[$10]): rowcount =3D 5.5578005E7, c= umulative cost =3D {0.0 rows, 0.0 cpu, 0.0 io}, id =3D 2923 HiveTableScanRel(table=3D[[tpcds_bin_orc_200.store_returns]])= : rowcount =3D 5.5578005E7, cumulative cost =3D {0}, id =3D 2823 HiveProjectRel(d_date_sk=3D[$0], d_quarter_name=3D[$15]): rowcoun= t =3D 101.31622746185853, cumulative cost =3D {0.0 rows, 0.0 cpu, 0.0 io}, = id =3D 2948 HiveFilterRel(condition=3D[=3D($15, '2000Q1')]): rowcount =3D 1= 01.31622746185853, cumulative cost =3D {0.0 rows, 0.0 cpu, 0.0 io}, id =3D = 2946 HiveTableScanRel(table=3D[[tpcds_bin_orc_200.date_dim]]): row= count =3D 73049.0, cumulative cost =3D {0}, id =3D 2821 {code} Partitioned schema : {code} 2015-02-10 11:32:16,880 DEBUG [main]: parse.SemanticAnalyzer (SemanticAnaly= zer.java:apply(12624)) - Plan After Join Reordering: HiveProjectRel(store_sales_quantitycount=3D[$0], store_sales_quantityave=3D= [$1], store_sales_quantitystdev=3D[$2], store_sales_quantitycov=3D[/($2, $1= )], as_store_returns_quantitycount=3D[$3], as_store_returns_quantityave=3D[= $4], as_store_returns_quantitystdev=3D[$5], store_returns_quantitycov=3D[/(= $5, $4)]): rowcount =3D 1.0, cumulative cost =3D {6.064175958973647E8 rows,= 0.0 cpu, 0.0 io}, id =3D 2791 HiveAggregateRel(group=3D[{}], agg#0=3D[count($0)], agg#1=3D[avg($0)], ag= g#2=3D[stddev_samp($0)], agg#3=3D[count($1)], agg#4=3D[avg($1)], agg#5=3D[s= tddev_samp($1)]): rowcount =3D 1.0, cumulative cost =3D {6.064175958973647E= 8 rows, 0.0 cpu, 0.0 io}, id =3D 2789 HiveProjectRel($f0=3D[$3], $f1=3D[$8]): rowcount =3D 100840.08570910375= , cumulative cost =3D {6.064175958973647E8 rows, 0.0 cpu, 0.0 io}, id =3D 2= 787 HiveProjectRel(ss_item_sk=3D[$4], ss_customer_sk=3D[$5], ss_ticket_nu= mber=3D[$6], ss_quantity=3D[$7], ss_sold_date_sk=3D[$8], sr_item_sk=3D[$0],= sr_customer_sk=3D[$1], sr_ticket_number=3D[$2], sr_return_quantity=3D[$3],= d_date_sk=3D[$9], d_quarter_name=3D[$10]): rowcount =3D 100840.08570910375= , cumulative cost =3D {6.064175958973647E8 rows, 0.0 cpu, 0.0 io}, id =3D 2= 817 HiveJoinRel(condition=3D[AND(AND(=3D($5, $1), =3D($4, $0)), =3D($6,= $2))], joinType=3D[inner]): rowcount =3D 100840.08570910375, cumulative co= st =3D {6.064175958973647E8 rows, 0.0 cpu, 0.0 io}, id =3D 2815 HiveProjectRel(sr_item_sk=3D[$1], sr_customer_sk=3D[$2], sr_ticke= t_number=3D[$8], sr_return_quantity=3D[$9]): rowcount =3D 5.5578005E7, cumu= lative cost =3D {0.0 rows, 0.0 cpu, 0.0 io}, id =3D 2758 HiveTableScanRel(table=3D[[tpcds_bin_partitioned_orc_200_orig.s= tore_returns]]): rowcount =3D 5.5578005E7, cumulative cost =3D {0}, id =3D = 2658 HiveJoinRel(condition=3D[=3D($5, $4)], joinType=3D[inner]): rowco= unt =3D 762935.5811373093, cumulative cost =3D {5.500766553162274E8 rows, 0= .0 cpu, 0.0 io}, id =3D 2801 HiveProjectRel(ss_item_sk=3D[$1], ss_customer_sk=3D[$2], ss_tic= ket_number=3D[$8], ss_quantity=3D[$9], ss_sold_date_sk=3D[$22]): rowcount = =3D 5.50076554E8, cumulative cost =3D {0.0 rows, 0.0 cpu, 0.0 io}, id =3D 2= 755 HiveTableScanRel(table=3D[[tpcds_bin_partitioned_orc_200_orig= .store_sales]]): rowcount =3D 5.50076554E8, cumulative cost =3D {0}, id =3D= 2657 HiveProjectRel(d_date_sk=3D[$0], d_quarter_name=3D[$15]): rowco= unt =3D 101.31622746185853, cumulative cost =3D {0.0 rows, 0.0 cpu, 0.0 io}= , id =3D 2783 HiveFilterRel(condition=3D[=3D($15, '2000Q1')]): rowcount =3D= 101.31622746185853, cumulative cost =3D {0.0 rows, 0.0 cpu, 0.0 io}, id = =3D 2781 HiveTableScanRel(table=3D[[tpcds_bin_partitioned_orc_200_or= ig.date_dim]]): rowcount =3D 73049.0, cumulative cost =3D {0}, id =3D 2656 {code} This was puzzling knowing that the stats for both tables are =E2=80=9Cident= ical=E2=80=9D in TAB_COL_STATS. Column statistics from TAB_COL_STATS, notice how the column statistics are = identical in both cases. {code} ||DB_NAME=09||COLUMN_NAME||=09COLUMN_TYPE||=09NUM_NULLS||=09LONG_HIGH_VALUE= ||=09LONG_LOW_VALUE||=09MAX_COL_LEN||=09NUM_DISTINCTS|| |tpcds_bin_orc_200|=09d_date_sk|=09int|=090|=092,488,070|=092,415,022|=09NU= LL|=0965,332| |tpcds_bin_partitioned_orc_200|=09d_date_sk|=09int|=090|=092,488,070|=092,4= 15,022|=09NULL|=0965,332| |tpcds_bin_orc_200=09|d_quarter_name|=09string|=090|=09NULL|=09NULL|=096|= =09721| |tpcds_bin_partitioned_orc_200|=09d_quarter_name|=09string|=090|=09NULL|=09= NULL|=096|=09721| |tpcds_bin_orc_200|=09sr_customer_sk|=09int|=091,009,571|=091,600,000|=091|= =09NULL|=091,415,625| |tpcds_bin_partitioned_orc_200|=09sr_customer_sk|=09int|=091,009,571|=091,6= 00,000|=091|=09NULL|=091,415,625| |tpcds_bin_orc_200=09|sr_item_sk|=09int|=090|=0948,000|=091|=09NULL|=0962,5= 62| |tpcds_bin_partitioned_orc_200|=09sr_item_sk|=09int|=090|=0948,000|=091|=09= NULL|=0962,562| |tpcds_bin_orc_200=09|sr_ticket_number|=09int|=090|=0948,000,000|=091|=09NU= LL|=0934,931,085| |tpcds_bin_partitioned_orc_200|=09sr_ticket_number|=09int|=090|=0948,000,00= 0|=091|=09NULL|=0934,931,085| |tpcds_bin_orc_200=09|ss_customer_sk|=09int|=0912,960,424|=091,600,000|=091= |=09NULL|=091,415,625| |tpcds_bin_partitioned_orc_200|=09ss_customer_sk|=09int|=0912,960,424|=091,= 600,000|=091|=09NULL|=091,415,625| |tpcds_bin_orc_200=09|ss_item_sk|=09int|=090|=0948,000|=091|=09NULL|=0962,5= 62| |tpcds_bin_partitioned_orc_200|=09ss_item_sk|=09int|=090|=0948,000|=091=09|= NULL|=0962,562| |tpcds_bin_orc_200|=09ss_sold_date_sk|=09int|=090|=092,452,642|=092,450,816= |=09NULL|=092,226| |tpcds_bin_partitioned_orc_200|=09ss_sold_date_sk=09|int|=090|=092,452,642|= =092,450,816|=09NULL|=092,226| |tpcds_bin_orc_200=09|ss_ticket_number|=09int|=090|=0948,000,000|=091|=09NU= LL|=0956,256,175| |tpcds_bin_partitioned_orc_200|=09ss_ticket_number|=09int|=090|=0948,000,00= 0|=091|=09NULL|=0956,256,175| For partitioned tables we get the statistics using get_aggr_stats_for which= eventually issues the query below {code} select=20 COLUMN_NAME, COLUMN_TYPE, =E2=80=A6 max(NUM_DISTINCTS), =E2=80=A6 from PART_COL_STATS Where where DB_NAME =3D=20 and TABLE_NAME =3D=20 and COLUMN_NAME in=20 and PARTITION_NAME in (1 =E2=80=A6 N) group by COLUMN_NAME , COLUMN_TYPE; {code} =E2=80=A6 > Discrepancy in CBO between partitioned and un-partitioned tables=20 > ----------------------------------------------------------------- > > Key: HIVE-9647 > URL: https://issues.apache.org/jira/browse/HIVE-9647 > Project: Hive > Issue Type: Bug > Components: CBO > Affects Versions: 0.14.0 > Reporter: Mostafa Mokhtar > Assignee: Gunther Hagleitner > Fix For: 1.2.0 > > > High-level summary > HiveRelMdSelectivity.computeInnerJoinSelectivity relies on per column num= ber of distinct value to estimate join selectivity. > The way statistics are aggregated for partitioned tables results in discr= epancy in number of distinct values which results in different plans betwee= n partitioned and un-partitioned schemas. > The table below summarizes the NDVs in computeInnerJoinSelectivity which = are used to estimate selectivity of joins. > ||Column=09||Partitioned count distincts|| =09Un-Partitioned count distin= cts=20 > |sr_customer_sk=09|71,245=09|1,415,625| > |sr_item_sk=09|38,846|=0962,562| > |sr_ticket_number=09|71,245=09|34,931,085| > |ss_customer_sk=09|88,476|=091,415,625| > |ss_item_sk=09|38,846|=0962,562| > |ss_ticket_number|=09100,756=09|56,256,175| > =09 > The discrepancy is because NDV calculation for a partitioned table assume= s that the NDV range is contained within each partition and is calculates a= s "select max(NUM_DISTINCTS) from PART_COL_STATS=E2=80=9D . > This is problematic for columns like ticket number which are naturally in= creasing with the partitioned date column ss_sold_date_sk. > Suggestions > Use Hyper Log Log as suggested by Gopal, there is an HLL implementation f= or HBASE co-porccessors which we can use as a reference here=20 > Using the global stats from TAB_COL_STATS and the per partition stats fro= m PART_COL_STATS extrapolate the NDV for the qualified partitions as in : > Max ( (NUM_DISTINCTS from TAB_COL_STATS) x (Number of qualified partition= s) / (Number of Partitions), max(NUM_DISTINCTS) from PART_COL_STATS)) > More details > While doing TPC-DS Partitioned vs. Un-Partitioned runs I noticed that man= y of the plans are different, then I dumped the CBO logical plan and I foun= d that join estimates are drastically different > Unpartitioned schema : > {code} > 2015-02-10 11:33:27,624 DEBUG [main]: parse.SemanticAnalyzer (SemanticAna= lyzer.java:apply(12624)) - Plan After Join Reordering: > HiveProjectRel(store_sales_quantitycount=3D[$0], store_sales_quantityave= =3D[$1], store_sales_quantitystdev=3D[$2], store_sales_quantitycov=3D[/($2,= $1)], as_store_returns_quantitycount=3D[$3], as_store_returns_quantityave= =3D[$4], as_store_returns_quantitystdev=3D[$5], store_returns_quantitycov= =3D[/($5, $4)]): rowcount =3D 1.0, cumulative cost =3D {6.056835407771381E8= rows, 0.0 cpu, 0.0 io}, id =3D 2956 > HiveAggregateRel(group=3D[{}], agg#0=3D[count($0)], agg#1=3D[avg($0)], = agg#2=3D[stddev_samp($0)], agg#3=3D[count($1)], agg#4=3D[avg($1)], agg#5=3D= [stddev_samp($1)]): rowcount =3D 1.0, cumulative cost =3D {6.05683540777138= 1E8 rows, 0.0 cpu, 0.0 io}, id =3D 2954 > HiveProjectRel($f0=3D[$4], $f1=3D[$8]): rowcount =3D 40.0561177679556= 2, cumulative cost =3D {6.056835407771381E8 rows, 0.0 cpu, 0.0 io}, id =3D = 2952 > HiveProjectRel(ss_sold_date_sk=3D[$0], ss_item_sk=3D[$1], ss_custom= er_sk=3D[$2], ss_ticket_number=3D[$3], ss_quantity=3D[$4], sr_item_sk=3D[$5= ], sr_customer_sk=3D[$6], sr_ticket_number=3D[$7], sr_return_quantity=3D[$8= ], d_date_sk=3D[$9], d_quarter_name=3D[$10]): rowcount =3D 40.0561177679556= 2, cumulative cost =3D {6.056835407771381E8 rows, 0.0 cpu, 0.0 io}, id =3D = 2982 > HiveJoinRel(condition=3D[=3D($9, $0)], joinType=3D[inner]): rowco= unt =3D 40.05611776795562, cumulative cost =3D {6.056835407771381E8 rows, 0= .0 cpu, 0.0 io}, id =3D 2980 > HiveJoinRel(condition=3D[AND(AND(=3D($2, $6), =3D($1, $5)), =3D= ($3, $7))], joinType=3D[inner]): rowcount =3D 28880.460910696, cumulative c= ost =3D {6.05654559E8 rows, 0.0 cpu, 0.0 io}, id =3D 2964 > HiveProjectRel(ss_sold_date_sk=3D[$0], ss_item_sk=3D[$2], ss_= customer_sk=3D[$3], ss_ticket_number=3D[$9], ss_quantity=3D[$10]): rowcount= =3D 5.50076554E8, cumulative cost =3D {0.0 rows, 0.0 cpu, 0.0 io}, id =3D = 2920 > HiveTableScanRel(table=3D[[tpcds_bin_orc_200.store_sales]])= : rowcount =3D 5.50076554E8, cumulative cost =3D {0}, id =3D 2822 > HiveProjectRel(sr_item_sk=3D[$2], sr_customer_sk=3D[$3], sr_t= icket_number=3D[$9], sr_return_quantity=3D[$10]): rowcount =3D 5.5578005E7,= cumulative cost =3D {0.0 rows, 0.0 cpu, 0.0 io}, id =3D 2923 > HiveTableScanRel(table=3D[[tpcds_bin_orc_200.store_returns]= ]): rowcount =3D 5.5578005E7, cumulative cost =3D {0}, id =3D 2823 > HiveProjectRel(d_date_sk=3D[$0], d_quarter_name=3D[$15]): rowco= unt =3D 101.31622746185853, cumulative cost =3D {0.0 rows, 0.0 cpu, 0.0 io}= , id =3D 2948 > HiveFilterRel(condition=3D[=3D($15, '2000Q1')]): rowcount =3D= 101.31622746185853, cumulative cost =3D {0.0 rows, 0.0 cpu, 0.0 io}, id = =3D 2946 > HiveTableScanRel(table=3D[[tpcds_bin_orc_200.date_dim]]): r= owcount =3D 73049.0, cumulative cost =3D {0}, id =3D 2821 > {code} > Partitioned schema : > {code} > 2015-02-10 11:32:16,880 DEBUG [main]: parse.SemanticAnalyzer (SemanticAna= lyzer.java:apply(12624)) - Plan After Join Reordering: > HiveProjectRel(store_sales_quantitycount=3D[$0], store_sales_quantityave= =3D[$1], store_sales_quantitystdev=3D[$2], store_sales_quantitycov=3D[/($2,= $1)], as_store_returns_quantitycount=3D[$3], as_store_returns_quantityave= =3D[$4], as_store_returns_quantitystdev=3D[$5], store_returns_quantitycov= =3D[/($5, $4)]): rowcount =3D 1.0, cumulative cost =3D {6.064175958973647E8= rows, 0.0 cpu, 0.0 io}, id =3D 2791 > HiveAggregateRel(group=3D[{}], agg#0=3D[count($0)], agg#1=3D[avg($0)], = agg#2=3D[stddev_samp($0)], agg#3=3D[count($1)], agg#4=3D[avg($1)], agg#5=3D= [stddev_samp($1)]): rowcount =3D 1.0, cumulative cost =3D {6.06417595897364= 7E8 rows, 0.0 cpu, 0.0 io}, id =3D 2789 > HiveProjectRel($f0=3D[$3], $f1=3D[$8]): rowcount =3D 100840.085709103= 75, cumulative cost =3D {6.064175958973647E8 rows, 0.0 cpu, 0.0 io}, id =3D= 2787 > HiveProjectRel(ss_item_sk=3D[$4], ss_customer_sk=3D[$5], ss_ticket_= number=3D[$6], ss_quantity=3D[$7], ss_sold_date_sk=3D[$8], sr_item_sk=3D[$0= ], sr_customer_sk=3D[$1], sr_ticket_number=3D[$2], sr_return_quantity=3D[$3= ], d_date_sk=3D[$9], d_quarter_name=3D[$10]): rowcount =3D 100840.085709103= 75, cumulative cost =3D {6.064175958973647E8 rows, 0.0 cpu, 0.0 io}, id =3D= 2817 > HiveJoinRel(condition=3D[AND(AND(=3D($5, $1), =3D($4, $0)), =3D($= 6, $2))], joinType=3D[inner]): rowcount =3D 100840.08570910375, cumulative = cost =3D {6.064175958973647E8 rows, 0.0 cpu, 0.0 io}, id =3D 2815 > HiveProjectRel(sr_item_sk=3D[$1], sr_customer_sk=3D[$2], sr_tic= ket_number=3D[$8], sr_return_quantity=3D[$9]): rowcount =3D 5.5578005E7, cu= mulative cost =3D {0.0 rows, 0.0 cpu, 0.0 io}, id =3D 2758 > HiveTableScanRel(table=3D[[tpcds_bin_partitioned_orc_200_orig= .store_returns]]): rowcount =3D 5.5578005E7, cumulative cost =3D {0}, id = =3D 2658 > HiveJoinRel(condition=3D[=3D($5, $4)], joinType=3D[inner]): row= count =3D 762935.5811373093, cumulative cost =3D {5.500766553162274E8 rows,= 0.0 cpu, 0.0 io}, id =3D 2801 > HiveProjectRel(ss_item_sk=3D[$1], ss_customer_sk=3D[$2], ss_t= icket_number=3D[$8], ss_quantity=3D[$9], ss_sold_date_sk=3D[$22]): rowcount= =3D 5.50076554E8, cumulative cost =3D {0.0 rows, 0.0 cpu, 0.0 io}, id =3D = 2755 > HiveTableScanRel(table=3D[[tpcds_bin_partitioned_orc_200_or= ig.store_sales]]): rowcount =3D 5.50076554E8, cumulative cost =3D {0}, id = =3D 2657 > HiveProjectRel(d_date_sk=3D[$0], d_quarter_name=3D[$15]): row= count =3D 101.31622746185853, cumulative cost =3D {0.0 rows, 0.0 cpu, 0.0 i= o}, id =3D 2783 > HiveFilterRel(condition=3D[=3D($15, '2000Q1')]): rowcount = =3D 101.31622746185853, cumulative cost =3D {0.0 rows, 0.0 cpu, 0.0 io}, id= =3D 2781 > HiveTableScanRel(table=3D[[tpcds_bin_partitioned_orc_200_= orig.date_dim]]): rowcount =3D 73049.0, cumulative cost =3D {0}, id =3D 265= 6 > {code} > This was puzzling knowing that the stats for both tables are =E2=80=9Cide= ntical=E2=80=9D in TAB_COL_STATS. > Column statistics from TAB_COL_STATS, notice how the column statistics ar= e identical in both cases. > ||DB_NAME=09||COLUMN_NAME||=09COLUMN_TYPE||=09NUM_NULLS||=09LONG_HIGH_VAL= UE||=09LONG_LOW_VALUE||=09MAX_COL_LEN||=09NUM_DISTINCTS|| > |tpcds_bin_orc_200|=09d_date_sk|=09int|=090|=092,488,070|=092,415,022|=09= NULL|=0965,332| > |tpcds_bin_partitioned_orc_200|=09d_date_sk|=09int|=090|=092,488,070|=092= ,415,022|=09NULL|=0965,332| > |tpcds_bin_orc_200=09|d_quarter_name|=09string|=090|=09NULL|=09NULL|=096|= =09721| > |tpcds_bin_partitioned_orc_200|=09d_quarter_name|=09string|=090|=09NULL|= =09NULL|=096|=09721| > |tpcds_bin_orc_200|=09sr_customer_sk|=09int|=091,009,571|=091,600,000|=09= 1|=09NULL|=091,415,625| > |tpcds_bin_partitioned_orc_200|=09sr_customer_sk|=09int|=091,009,571|=091= ,600,000|=091|=09NULL|=091,415,625| > |tpcds_bin_orc_200=09|sr_item_sk|=09int|=090|=0948,000|=091|=09NULL|=0962= ,562| > |tpcds_bin_partitioned_orc_200|=09sr_item_sk|=09int|=090|=0948,000|=091|= =09NULL|=0962,562| > |tpcds_bin_orc_200=09|sr_ticket_number|=09int|=090|=0948,000,000|=091|=09= NULL|=0934,931,085| > |tpcds_bin_partitioned_orc_200|=09sr_ticket_number|=09int|=090|=0948,000,= 000|=091|=09NULL|=0934,931,085| > |tpcds_bin_orc_200=09|ss_customer_sk|=09int|=0912,960,424|=091,600,000|= =091|=09NULL|=091,415,625| > |tpcds_bin_partitioned_orc_200|=09ss_customer_sk|=09int|=0912,960,424|=09= 1,600,000|=091|=09NULL|=091,415,625| > |tpcds_bin_orc_200=09|ss_item_sk|=09int|=090|=0948,000|=091|=09NULL|=0962= ,562| > |tpcds_bin_partitioned_orc_200|=09ss_item_sk|=09int|=090|=0948,000|=091= =09|NULL|=0962,562| > |tpcds_bin_orc_200|=09ss_sold_date_sk|=09int|=090|=092,452,642|=092,450,8= 16|=09NULL|=092,226| > |tpcds_bin_partitioned_orc_200|=09ss_sold_date_sk=09|int|=090|=092,452,64= 2|=092,450,816|=09NULL|=092,226| > |tpcds_bin_orc_200=09|ss_ticket_number|=09int|=090|=0948,000,000|=091|=09= NULL|=0956,256,175| > |tpcds_bin_partitioned_orc_200|=09ss_ticket_number|=09int|=090|=0948,000,= 000|=091|=09NULL|=0956,256,175| > For partitioned tables we get the statistics using get_aggr_stats_for whi= ch eventually issues the query below > {code} > select=20 > COLUMN_NAME, > COLUMN_TYPE, > =E2=80=A6 > max(NUM_DISTINCTS), > =E2=80=A6 > from > PART_COL_STATS > Where > where > DB_NAME =3D=20 > and TABLE_NAME =3D=20 > and COLUMN_NAME in=20 > and PARTITION_NAME in (1 =E2=80=A6 N) > group by COLUMN_NAME , COLUMN_TYPE; > {code} > =E2=80=A6 -- This message was sent by Atlassian JIRA (v6.3.4#6332)