hive-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Appan Thirumaligai <athirumali...@ngmoco.com>
Subject Re: partitioned column join does not work as expected
Date Wed, 19 Jan 2011 19:06:39 GMT
EXPLAIN select t1.some_string,t2.some_string,sum(t1.total_count),sum(t2.total_count) from table_a
t1 join table_b t2 on t1.part_col = t2.part_col and t1.common_id = t2.common_id where t1.part_col
>= 'mypart' and t2.part_col >= 'mypart' group by t1.some_string,t2.some_string;

OK
ABSTRACT SYNTAX TREE:
  (TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_TABREF table_a t1) (TOK_TABREF table_b t2) (and (= (.
(TOK_TABLE_OR_COL t1) part_col) (. (TOK_TABLE_OR_COL t2) part_col)) (= (. (TOK_TABLE_OR_COL
t1) common_id) (. (TOK_TABLE_OR_COL t2) common_id))))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR
TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (. (TOK_TABLE_OR_COL t1) some_string)) (TOK_SELEXPR
(. (TOK_TABLE_OR_COL t2) some_string)) (TOK_SELEXPR (TOK_FUNCTION sum (. (TOK_TABLE_OR_COL
t1) total_count))) (TOK_SELEXPR (TOK_FUNCTION sum (. (TOK_TABLE_OR_COL t2) total_count))))
(TOK_WHERE (and (>= (. (TOK_TABLE_OR_COL t1) part_col) 'mypart') (>= (. (TOK_TABLE_OR_COL
t2) part_col) 'mypart'))) (TOK_GROUPBY (. (TOK_TABLE_OR_COL t1) some_string) (. (TOK_TABLE_OR_COL
t2) some_string))))

STAGE DEPENDENCIES:
  Stage-1 is a root stage
  Stage-2 depends on stages: Stage-1
  Stage-0 is a root stage

STAGE PLANS:
  Stage: Stage-1
    Map Reduce
      Alias -> Map Operator Tree:
        t1 
          TableScan
            alias: t1
            Filter Operator
              predicate:
                  expr: (part_col >= 'mypart')
                  type: boolean
              Reduce Output Operator
                key expressions:
                      expr: part_col
                      type: string
                      expr: common_id
                      type: bigint
                sort order: ++
                Map-reduce partition columns:
                      expr: part_col
                      type: string
                      expr: common_id
                      type: bigint
                tag: 0
                value expressions:
                      expr: some_string
                      type: string
                      expr: total_count
                      type: bigint
                      expr: part_col
                      type: string
        t2 
          TableScan
            alias: t2
            Filter Operator
              predicate:
                  expr: (part_col >= 'mypart')
                  type: boolean
              Reduce Output Operator
                key expressions:
                      expr: part_col
                      type: string
                      expr: common_id
                      type: bigint
                sort order: ++
                Map-reduce partition columns:
                      expr: part_col
                      type: string
                      expr: common_id
                      type: bigint
                tag: 1
                value expressions:
                      expr: some_string
                      type: string
                      expr: total_count
                      type: bigint
                      expr: part_col
                      type: string
      Reduce Operator Tree:
        Join Operator
          condition map:
               Inner Join 0 to 1
          condition expressions:
            0 {VALUE._col2} {VALUE._col3} {VALUE._col4}
            1 {VALUE._col2} {VALUE._col3} {VALUE._col4}
          handleSkewJoin: false
          outputColumnNames: _col2, _col3, _col4, _col9, _col10, _col11
          Filter Operator
            predicate:
                expr: ((_col4 >= 'mypart') and (_col11 >= 'mypart'))
                type: boolean
            Select Operator
              expressions:
                    expr: _col2
                    type: string
                    expr: _col9
                    type: string
                    expr: _col3
                    type: bigint
                    expr: _col10
                    type: bigint
              outputColumnNames: _col2, _col9, _col3, _col10
              Group By Operator
                aggregations:
                      expr: sum(_col3)
                      expr: sum(_col10)
                bucketGroup: false
                keys:
                      expr: _col2
                      type: string
                      expr: _col9
                      type: string
                mode: hash
                outputColumnNames: _col0, _col1, _col2, _col3
                File Output Operator
                  compressed: false
                  GlobalTableId: 0
                  table:
                      input format: org.apache.hadoop.mapred.SequenceFileInputFormat
                      output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat

  Stage: Stage-2
    Map Reduce
      Alias -> Map Operator Tree:
        hdfs://localhost:8022/tmp/hive-training/hive_2011-01-19_11-05-34_526_453408324928472657/-mr-10002

            Reduce Output Operator
              key expressions:
                    expr: _col0
                    type: string
                    expr: _col1
                    type: string
              sort order: ++
              Map-reduce partition columns:
                    expr: _col0
                    type: string
                    expr: _col1
                    type: string
              tag: -1
              value expressions:
                    expr: _col2
                    type: bigint
                    expr: _col3
                    type: bigint
      Reduce Operator Tree:
        Group By Operator
          aggregations:
                expr: sum(VALUE._col0)
                expr: sum(VALUE._col1)
          bucketGroup: false
          keys:
                expr: KEY._col0
                type: string
                expr: KEY._col1
                type: string
          mode: mergepartial
          outputColumnNames: _col0, _col1, _col2, _col3
          Select Operator
            expressions:
                  expr: _col0
                  type: string
                  expr: _col1
                  type: string
                  expr: _col2
                  type: bigint
                  expr: _col3
                  type: bigint
            outputColumnNames: _col0, _col1, _col2, _col3
            File Output Operator
              compressed: false
              GlobalTableId: 0
              table:
                  input format: org.apache.hadoop.mapred.TextInputFormat
                  output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat

  Stage: Stage-0
    Fetch Operator
      limit: -1

On Jan 19, 2011, at 10:47 AM, Viral Bajaria wrote:

> Thanks Appan for verifying. I will do some more tests on my side too and let you know
the results.
>  
> I tried a different version of the query where I join'ed two sub-queries for the same
partitions and the data comes out to be correct.
>  
> I will see if I can post the real-world example to the list, because that might sound
like a more practical example.
>  
> If you still have your example(s) do you mind sending me your query-plan for
>  
> select t1.some_string,t2.some_string,sum(t1.total_count),sum(t2.total_count) from table_a
t1 join table_b t2 on t1.part_col = t2.part_col and t1.common_id = t2.common_id where t1.part_col
>= 'mypart' and t2.part_col >= 'mypart' group by t1.some_string,t2.some_string;
>  
> -Viral
> On Wed, Jan 19, 2011 at 10:36 AM, Appan Thirumaligai <athirumaligai@ngmoco.com>
wrote:
> Viral,
> 
> I tried the queries below (similar to yours) and I get the expected results when I do
the join. I ran my queries after building hive from the latest source and hadoop 0.20+.
> create table table_a(a_id bigint, common_id bigint, some_string string,total_count bigint)
partitioned by (part_col string)  ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED
BY '\n' STORED AS TEXTFILE;
> create table table_b(b_id bigint, common_id bigint, some_string string,total_count bigint)
partitioned by (part_col string)  ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED
BY '\n' STORED AS TEXTFILE;
> dfs -mkdir /user/data/table_a;
> dfs -mkdir /user/data/table_b;
> dfs -put /home/training/hiveug/table_a.csv /user/data/table_a;
> dfs -put /home/training/hiveug/table_b.csv /user/data/table_b;
> alter table table_a add partition (part_col = 'mypart') location '/user/data/table_a';
> alter table table_b add partition (part_col = 'mypart') location '/user/data/table_b';
> select * from table_a t1 join table_b t2 on t1.part_col == t2.part_col; 
> -->> Returns expected result
> select t1.some_string,t2.some_string,sum(t1.total_count),sum(t2.total_count) from table_a
t1 join table_b t2 on t1.part_col = t2.part_col where t1.part_col >= 'mypart' and t2.part_col
>= 'mypart' group by t1.some_string,t2.some_string;
> --->>Works fine.
> select t1.some_string,t2.some_string,sum(t1.total_count),sum(t2.total_count) from table_a
t1 join table_b t2 on t1.part_col = t2.part_col where t1.part_col >= 'mypart' and t2.part_col
>= 'mypart' group by t1.some_string,t2.some_st* from table_a t1 join table_b t2 on t1.part_col
= t2.part_col where t1.part_col >= 'mypart' and t2.part_col >= 'mypart';
> --->Works fine.
> 
> I created the two files with sample data in them and copied it to hdfs
> 
> I'll try later on your hive 0.5.0 but looks like there might be something wrong in your
query.
> 
> On Jan 18, 2011, at 8:40 PM, Ajo Fod wrote:
> 
>> Can you try this with a dummy table with very few rows ... to see if
>> the reason the script doesn't finish is a computational issue?
>> 
>> One other thing is to try with a combined partition, to see if it is a
>> problem with the partitioning.
>> 
>> Also, take a look at  the results of an EXPLAIN statement, see if
>> there are any hints there.
>> 
>> NOTE: I'm new to hive too.
>> 
>> -Ajo
>> 
>> 
>> On Tue, Jan 18, 2011 at 8:08 PM, Viral Bajaria <viral.bajaria@gmail.com> wrote:
>>> I haven't heard back from any on the list and am still struggling to join
>>> two tables on partitioned column
>>> 
>>> Has anyone every tried joining two tables on a paritioned column and the
>>> results are not as expected ?
>>> On Tue, Jan 18, 2011 at 2:04 AM, Viral Bajaria <viral.bajaria@gmail.com>
>>> wrote:
>>>> 
>>>> I am facing issues with a query where I am joining two fairly large tables
>>>> on the partitioned column along with other common columns. The expected
>>>> output is not in line with what I expect it to be. Since the query is very
>>>> complex, I will simplify it so that people can provide inputs if they have
>>>> faced similar issues or if I am doing something totally wrong.
>>>> TABLE A:
>>>> a_id bigint
>>>> common_id bigint
>>>> some_string string
>>>> total_count bigint
>>>> part_col string <---- this is the partitioned column
>>>> TABLE B:
>>>> b_int bigint
>>>> common_id bigint
>>>> some_string string
>>>> total_sum bigint
>>>> part_col string <---- this is the partitioned column
>>>> now the query is as follows:
>>>> SELECT /*+ STREAMTABLE(A,B) */ A.some_string, B.some_string,
>>>> sum(A.total_count), sum(B.total_sum) from A JOIN B ON (t1.part_col =
>>>> t2.part_col AND t1.common_id = t2.common_id) WHERE t1.part_col >= 'val1'
AND
>>>> t2.part_col >= 'val1' GROUP BY A.some_string, B.some_string
>>>> Does HIVE not like to join on the partitioned columns ? because when i
>>>> create a join on just the partitioned column the reduce step never finishes.
>>>> I am using HIVE 0.5.0
>>>> Thanks,
>>>> Viral
>>> 
> 
> Appan Thirumaligai
> appan@ngmoco.com
> Ph:1-818-472-8427
> ngmoco:)
> 
> 

Appan Thirumaligai
appan@ngmoco.com
Ph:1-818-472-8427
ngmoco:)


Mime
View raw message