Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 4EC44200C8B for ; Mon, 22 May 2017 12:50:11 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 4D47F160BBF; Mon, 22 May 2017 10:50:11 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 44A68160BA5 for ; Mon, 22 May 2017 12:50:10 +0200 (CEST) Received: (qmail 80969 invoked by uid 500); 22 May 2017 10:50:09 -0000 Mailing-List: contact issues-help@carbondata.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@carbondata.apache.org Delivered-To: mailing list issues@carbondata.apache.org Received: (qmail 80960 invoked by uid 99); 22 May 2017 10:50:09 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 22 May 2017 10:50:09 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 0358B1A0722 for ; Mon, 22 May 2017 10:50:09 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -100.002 X-Spam-Level: X-Spam-Status: No, score=-100.002 tagged_above=-999 required=6.31 tests=[RP_MATCHES_RCVD=-0.001, SPF_PASS=-0.001, USER_IN_WHITELIST=-100] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id Ax5D_S5VIeLs for ; Mon, 22 May 2017 10:50:06 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTP id 5FD7F5FDFE for ; Mon, 22 May 2017 10:50:05 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id 80742E073A for ; Mon, 22 May 2017 10:50:04 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id 30D2421B57 for ; Mon, 22 May 2017 10:50:04 +0000 (UTC) Date: Mon, 22 May 2017 10:50:04 +0000 (UTC) From: "chenerlu (JIRA)" To: issues@carbondata.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Updated] (CARBONDATA-1076) Join Issue caused by dictionary and shuffle exchange MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Mon, 22 May 2017 10:50:11 -0000 [ https://issues.apache.org/jira/browse/CARBONDATA-1076?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] chenerlu updated CARBONDATA-1076: --------------------------------- Description: We can reproduce this issue as following steps: Step1: create a carbon table carbon.sql("CREATE TABLE IF NOT EXISTS carbon_table (col1 int, col2 int, col3 int) STORED by 'carbondata' TBLPROPERTIES('DICTIONARY_INCLUDE'='col1,col2,col3','TABLE_BLOCKSIZE'='4')") Step2: load data carbon.sql("LOAD DATA LOCAL INPATH '/opt/carbon_table' INTO TABLE carbon_table") data in file carbon_table as follows: col1,col2,col3 1,2,3 4,5,6 7,8,9 Step3: do the query carbon.sql("SELECT c1.col1,c2.col1,c2.col3 FROM (SELECT col1,col2 FROM carbon_table GROUP BY col1,col2) c1 FULL JOIN (SELECT col1,count(col2) as col3 FROM carbon_table GROUP BY col1) c2 ON c1.col1 = c2.col1").show() [expected] Hive table and parquet table get same result as below and it should be correct. |col1|col1|col3| | 1|null|null| |null| 4| 1| | 4|null|null| |null| 7| 1| | 7|null|null| |null| 1| 1| [acutally] carbon will get null because wrong match. |col1|col1|col3| | 1| 1| 1| | 4| 4| 1| | 7| 7| 1| Root cause analysis: It is because this query has two subquery, and one subquey do the decode after exchange and the other subquery do the decode before exchange, and this may lead to wrong match when execute full join. My idea: Can we move decode before exchange ? Because I am not very familiar with Carbon query, so any idea about this ? Plan as follows: == Physical Plan == SortMergeJoin [col1#3445], [col1#3460], FullOuter :- Sort [col1#3445 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(col1#3445, 200) : +- CarbonDictionaryDecoder [CarbonDecoderRelation(Map(col1#3445 -> col1#3445, col2#3446 -> col2#3446, col3#3447 -> col3#3447),CarbonDatasourceHadoopRelation [ Database name :tempdev, Table name :carbon_table, Schema :Some(StructType(StructField(col1,IntegerType,true), StructField(col2,IntegerType,true), StructField(col3,IntegerType,true))) ]), CarbonDecoderRelation(Map(col1#3460 -> col1#3460, col2#3461 -> col2#3461, col3#3462 -> col3#3462),CarbonDatasourceHadoopRelation [ Database name :tempdev, Table name :carbon_table, Schema :Some(StructType(StructField(col1,IntegerType,true), StructField(col2,IntegerType,true), StructField(col3,IntegerType,true))) ])], IncludeProfile(ArrayBuffer(col1#3445)), CarbonAliasDecoderRelation(), org.apache.spark.sql.CarbonSession@69e87cbe : +- HashAggregate(keys=[col1#3445, col2#3446], functions=[], output=[col1#3445]) : +- Exchange hashpartitioning(col1#3445, col2#3446, 200) : +- HashAggregate(keys=[col1#3445, col2#3446], functions=[], output=[col1#3445, col2#3446]) : +- Scan CarbonDatasourceHadoopRelation [ Database name :tempdev, Table name :carbon_table, Schema :Some(StructType(StructField(col1,IntegerType,true), StructField(col2,IntegerType,true), StructField(col3,IntegerType,true))) ] tempdev.carbon_table[col1#3445,col2#3446] +- Sort [col1#3460 ASC NULLS FIRST], false, 0 +- CarbonDictionaryDecoder [CarbonDecoderRelation(Map(col1#3445 -> col1#3445, col2#3446 -> col2#3446, col3#3447 -> col3#3447),CarbonDatasourceHadoopRelation [ Database name :tempdev, Table name :carbon_table, Schema :Some(StructType(StructField(col1,IntegerType,true), StructField(col2,IntegerType,true), StructField(col3,IntegerType,true))) ]), CarbonDecoderRelation(Map(col1#3460 -> col1#3460, col2#3461 -> col2#3461, col3#3462 -> col3#3462),CarbonDatasourceHadoopRelation [ Database name :tempdev, Table name :carbon_table, Schema :Some(StructType(StructField(col1,IntegerType,true), StructField(col2,IntegerType,true), StructField(col3,IntegerType,true))) ])], IncludeProfile(ArrayBuffer(col1#3460)), CarbonAliasDecoderRelation(), org.apache.spark.sql.CarbonSession@69e87cbe +- HashAggregate(keys=[col1#3460], functions=[count(col2#3461)], output=[col1#3460, col3#3436L]) +- Exchange hashpartitioning(col1#3460, 200) +- HashAggregate(keys=[col1#3460], functions=[partial_count(col2#3461)], output=[col1#3460, count#3472L]) +- CarbonDictionaryDecoder [CarbonDecoderRelation(Map(col1#3445 -> col1#3445, col2#3446 -> col2#3446, col3#3447 -> col3#3447),CarbonDatasourceHadoopRelation [ Database name :tempdev, Table name :carbon_table, Schema :Some(StructType(StructField(col1,IntegerType,true), StructField(col2,IntegerType,true), StructField(col3,IntegerType,true))) ]), CarbonDecoderRelation(Map(col1#3460 -> col1#3460, col2#3461 -> col2#3461, col3#3462 -> col3#3462),CarbonDatasourceHadoopRelation [ Database name :tempdev, Table name :carbon_table, Schema :Some(StructType(StructField(col1,IntegerType,true), StructField(col2,IntegerType,true), StructField(col3,IntegerType,true))) ])], IncludeProfile(ArrayBuffer(col2#3461)), CarbonAliasDecoderRelation(), org.apache.spark.sql.CarbonSession@69e87cbe +- Scan CarbonDatasourceHadoopRelation [ Database name :tempdev, Table name :carbon_table, Schema :Some(StructType(StructField(col1,IntegerType,true), StructField(col2,IntegerType,true), StructField(col3,IntegerType,true))) ] tempdev.carbon_table[col1#3460,col2#3461]] was: We can reproduce this issue as following steps: Step1: create a carbon table carbon.sql("CREATE TABLE IF NOT EXISTS carbon_table (col1 int, col2 int, col3 int) STORED by 'carbondata' TBLPROPERTIES('DICTIONARY_INCLUDE'='col1,col2,col3','TABLE_BLOCKSIZE'='4')") Step2: load data carbon.sql("LOAD DATA LOCAL INPATH '/opt/carbon_table' INTO TABLE carbon_table") data in file carbon_table as follows: col1,col2,col3 1,2,3 4,5,6 7,8,9 you can get carbon_table file in attachment. Step3: do the query carbon.sql("SELECT c1.col1,c2.col1,c2.col3 FROM (SELECT col1,col2 FROM carbon_table GROUP BY col1,col2) c1 FULL JOIN (SELECT col1,count(col2) as col3 FROM carbon_table GROUP BY col1) c2 ON c1.col1 = c2.col1").show() [expected] Hive table and parquet table get same result as below and it should be correct. |col1|col1|col3| | 1|null|null| |null| 4| 1| | 4|null|null| |null| 7| 1| | 7|null|null| |null| 1| 1| [acutally] carbon will get null because wrong match. |col1|col1|col3| | 1| 1| 1| | 4| 4| 1| | 7| 7| 1| Root cause analysis: It is because this query has two subquery, and one subquey do the decode after exchange and the other subquery do the decode before exchange, and this may lead to wrong match when execute full join. My idea: Can we move decode before exchange ? Because I am not very familiar with Carbon query, so any idea about this ? Plan as follows: == Physical Plan == SortMergeJoin [col1#3445], [col1#3460], FullOuter :- Sort [col1#3445 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(col1#3445, 200) : +- CarbonDictionaryDecoder [CarbonDecoderRelation(Map(col1#3445 -> col1#3445, col2#3446 -> col2#3446, col3#3447 -> col3#3447),CarbonDatasourceHadoopRelation [ Database name :tempdev, Table name :carbon_table, Schema :Some(StructType(StructField(col1,IntegerType,true), StructField(col2,IntegerType,true), StructField(col3,IntegerType,true))) ]), CarbonDecoderRelation(Map(col1#3460 -> col1#3460, col2#3461 -> col2#3461, col3#3462 -> col3#3462),CarbonDatasourceHadoopRelation [ Database name :tempdev, Table name :carbon_table, Schema :Some(StructType(StructField(col1,IntegerType,true), StructField(col2,IntegerType,true), StructField(col3,IntegerType,true))) ])], IncludeProfile(ArrayBuffer(col1#3445)), CarbonAliasDecoderRelation(), org.apache.spark.sql.CarbonSession@69e87cbe : +- HashAggregate(keys=[col1#3445, col2#3446], functions=[], output=[col1#3445]) : +- Exchange hashpartitioning(col1#3445, col2#3446, 200) : +- HashAggregate(keys=[col1#3445, col2#3446], functions=[], output=[col1#3445, col2#3446]) : +- Scan CarbonDatasourceHadoopRelation [ Database name :tempdev, Table name :carbon_table, Schema :Some(StructType(StructField(col1,IntegerType,true), StructField(col2,IntegerType,true), StructField(col3,IntegerType,true))) ] tempdev.carbon_table[col1#3445,col2#3446] +- Sort [col1#3460 ASC NULLS FIRST], false, 0 +- CarbonDictionaryDecoder [CarbonDecoderRelation(Map(col1#3445 -> col1#3445, col2#3446 -> col2#3446, col3#3447 -> col3#3447),CarbonDatasourceHadoopRelation [ Database name :tempdev, Table name :carbon_table, Schema :Some(StructType(StructField(col1,IntegerType,true), StructField(col2,IntegerType,true), StructField(col3,IntegerType,true))) ]), CarbonDecoderRelation(Map(col1#3460 -> col1#3460, col2#3461 -> col2#3461, col3#3462 -> col3#3462),CarbonDatasourceHadoopRelation [ Database name :tempdev, Table name :carbon_table, Schema :Some(StructType(StructField(col1,IntegerType,true), StructField(col2,IntegerType,true), StructField(col3,IntegerType,true))) ])], IncludeProfile(ArrayBuffer(col1#3460)), CarbonAliasDecoderRelation(), org.apache.spark.sql.CarbonSession@69e87cbe +- HashAggregate(keys=[col1#3460], functions=[count(col2#3461)], output=[col1#3460, col3#3436L]) +- Exchange hashpartitioning(col1#3460, 200) +- HashAggregate(keys=[col1#3460], functions=[partial_count(col2#3461)], output=[col1#3460, count#3472L]) +- CarbonDictionaryDecoder [CarbonDecoderRelation(Map(col1#3445 -> col1#3445, col2#3446 -> col2#3446, col3#3447 -> col3#3447),CarbonDatasourceHadoopRelation [ Database name :tempdev, Table name :carbon_table, Schema :Some(StructType(StructField(col1,IntegerType,true), StructField(col2,IntegerType,true), StructField(col3,IntegerType,true))) ]), CarbonDecoderRelation(Map(col1#3460 -> col1#3460, col2#3461 -> col2#3461, col3#3462 -> col3#3462),CarbonDatasourceHadoopRelation [ Database name :tempdev, Table name :carbon_table, Schema :Some(StructType(StructField(col1,IntegerType,true), StructField(col2,IntegerType,true), StructField(col3,IntegerType,true))) ])], IncludeProfile(ArrayBuffer(col2#3461)), CarbonAliasDecoderRelation(), org.apache.spark.sql.CarbonSession@69e87cbe +- Scan CarbonDatasourceHadoopRelation [ Database name :tempdev, Table name :carbon_table, Schema :Some(StructType(StructField(col1,IntegerType,true), StructField(col2,IntegerType,true), StructField(col3,IntegerType,true))) ] tempdev.carbon_table[col1#3460,col2#3461]] > Join Issue caused by dictionary and shuffle exchange > ---------------------------------------------------- > > Key: CARBONDATA-1076 > URL: https://issues.apache.org/jira/browse/CARBONDATA-1076 > Project: CarbonData > Issue Type: Bug > Components: core > Affects Versions: 0.1.1-incubating, 1.1.0 > Environment: Carbon + spark 2.1 > Reporter: chenerlu > Assignee: Ravindra Pesala > > We can reproduce this issue as following steps: > Step1: create a carbon table > > carbon.sql("CREATE TABLE IF NOT EXISTS carbon_table (col1 int, col2 int, col3 int) STORED by 'carbondata' TBLPROPERTIES('DICTIONARY_INCLUDE'='col1,col2,col3','TABLE_BLOCKSIZE'='4')") > > Step2: load data > carbon.sql("LOAD DATA LOCAL INPATH '/opt/carbon_table' INTO TABLE carbon_table") > data in file carbon_table as follows: > col1,col2,col3 > 1,2,3 > 4,5,6 > 7,8,9 > > Step3: do the query > carbon.sql("SELECT c1.col1,c2.col1,c2.col3 FROM (SELECT col1,col2 FROM carbon_table GROUP BY col1,col2) c1 FULL JOIN (SELECT col1,count(col2) as col3 FROM carbon_table GROUP BY col1) c2 ON c1.col1 = c2.col1").show() > [expected] Hive table and parquet table get same result as below and it should be correct. > |col1|col1|col3| > | 1|null|null| > |null| 4| 1| > | 4|null|null| > |null| 7| 1| > | 7|null|null| > |null| 1| 1| > > [acutally] carbon will get null because wrong match. > |col1|col1|col3| > | 1| 1| 1| > | 4| 4| 1| > | 7| 7| 1| > Root cause analysis: > > It is because this query has two subquery, and one subquey do the decode after exchange and the other subquery do the decode before exchange, and this may lead to wrong match when execute full join. > > My idea: Can we move decode before exchange ? Because I am not very familiar with Carbon query, so any idea about this ? > Plan as follows: > > == Physical Plan == > SortMergeJoin [col1#3445], [col1#3460], FullOuter > :- Sort [col1#3445 ASC NULLS FIRST], false, 0 > : +- Exchange hashpartitioning(col1#3445, 200) > : +- CarbonDictionaryDecoder [CarbonDecoderRelation(Map(col1#3445 -> col1#3445, col2#3446 -> col2#3446, col3#3447 -> col3#3447),CarbonDatasourceHadoopRelation [ Database name :tempdev, Table name :carbon_table, Schema :Some(StructType(StructField(col1,IntegerType,true), StructField(col2,IntegerType,true), StructField(col3,IntegerType,true))) ]), CarbonDecoderRelation(Map(col1#3460 -> col1#3460, col2#3461 -> col2#3461, col3#3462 -> col3#3462),CarbonDatasourceHadoopRelation [ Database name :tempdev, Table name :carbon_table, Schema :Some(StructType(StructField(col1,IntegerType,true), StructField(col2,IntegerType,true), StructField(col3,IntegerType,true))) ])], IncludeProfile(ArrayBuffer(col1#3445)), CarbonAliasDecoderRelation(), org.apache.spark.sql.CarbonSession@69e87cbe > : +- HashAggregate(keys=[col1#3445, col2#3446], functions=[], output=[col1#3445]) > : +- Exchange hashpartitioning(col1#3445, col2#3446, 200) > : +- HashAggregate(keys=[col1#3445, col2#3446], functions=[], output=[col1#3445, col2#3446]) > : +- Scan CarbonDatasourceHadoopRelation [ Database name :tempdev, Table name :carbon_table, Schema :Some(StructType(StructField(col1,IntegerType,true), StructField(col2,IntegerType,true), StructField(col3,IntegerType,true))) ] tempdev.carbon_table[col1#3445,col2#3446] > +- Sort [col1#3460 ASC NULLS FIRST], false, 0 > +- CarbonDictionaryDecoder [CarbonDecoderRelation(Map(col1#3445 -> col1#3445, col2#3446 -> col2#3446, col3#3447 -> col3#3447),CarbonDatasourceHadoopRelation [ Database name :tempdev, Table name :carbon_table, Schema :Some(StructType(StructField(col1,IntegerType,true), StructField(col2,IntegerType,true), StructField(col3,IntegerType,true))) ]), CarbonDecoderRelation(Map(col1#3460 -> col1#3460, col2#3461 -> col2#3461, col3#3462 -> col3#3462),CarbonDatasourceHadoopRelation [ Database name :tempdev, Table name :carbon_table, Schema :Some(StructType(StructField(col1,IntegerType,true), StructField(col2,IntegerType,true), StructField(col3,IntegerType,true))) ])], IncludeProfile(ArrayBuffer(col1#3460)), CarbonAliasDecoderRelation(), org.apache.spark.sql.CarbonSession@69e87cbe > +- HashAggregate(keys=[col1#3460], functions=[count(col2#3461)], output=[col1#3460, col3#3436L]) > +- Exchange hashpartitioning(col1#3460, 200) > +- HashAggregate(keys=[col1#3460], functions=[partial_count(col2#3461)], output=[col1#3460, count#3472L]) > +- CarbonDictionaryDecoder [CarbonDecoderRelation(Map(col1#3445 -> col1#3445, col2#3446 -> col2#3446, col3#3447 -> col3#3447),CarbonDatasourceHadoopRelation [ Database name :tempdev, Table name :carbon_table, Schema :Some(StructType(StructField(col1,IntegerType,true), StructField(col2,IntegerType,true), StructField(col3,IntegerType,true))) ]), CarbonDecoderRelation(Map(col1#3460 -> col1#3460, col2#3461 -> col2#3461, col3#3462 -> col3#3462),CarbonDatasourceHadoopRelation [ Database name :tempdev, Table name :carbon_table, Schema :Some(StructType(StructField(col1,IntegerType,true), StructField(col2,IntegerType,true), StructField(col3,IntegerType,true))) ])], IncludeProfile(ArrayBuffer(col2#3461)), CarbonAliasDecoderRelation(), org.apache.spark.sql.CarbonSession@69e87cbe > +- Scan CarbonDatasourceHadoopRelation [ Database name :tempdev, Table name :carbon_table, Schema :Some(StructType(StructField(col1,IntegerType,true), StructField(col2,IntegerType,true), StructField(col3,IntegerType,true))) ] tempdev.carbon_table[col1#3460,col2#3461]] -- This message was sent by Atlassian JIRA (v6.3.15#6346)