hive-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Jesus Camacho Rodriguez (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (HIVE-15474) Extend limit propagation for chain of RS-GB-RS operators
Date Tue, 20 Dec 2016 20:35:58 GMT

    [ https://issues.apache.org/jira/browse/HIVE-15474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15765176#comment-15765176
] 

Jesus Camacho Rodriguez edited comment on HIVE-15474 at 12/20/16 8:35 PM:
--------------------------------------------------------------------------

[~xuefuz], thanks for the feedback. It is indeed true for MR and Tez. I am not so familiar
with Spark as the backend. If it does not return ordered groups, are you sure the sequence
of operators is still RS-GB-RS? That would mean that other optimizations, such as ReduceSinkDeDuplication,
do not work properly (\?). In turn, I would argue that RS is a physical operator that has
certain semantics that should be respected, no matter the backend. However, if the sequence
of operators is still that one, I can just disable the optimization for Hive on Spark.

--

Given this query:
{code:sql}
explain
select key, value, count(key + 1) as agg1 from src 
group by key, value
order by key, value, agg1 limit 20;
{code}

Explain plan without patch:
{code}
STAGE DEPENDENCIES:
  Stage-1 is a root stage
  Stage-2 depends on stages: Stage-1
  Stage-0 depends on stages: Stage-2

STAGE PLANS:
  Stage: Stage-1
    Map Reduce
      Map Operator Tree:
          TableScan
            alias: src
            Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats:
NONE
            Select Operator
              expressions: key (type: string), value (type: string), (UDFToDouble(key) + 1.0)
(type: double)
              outputColumnNames: _col0, _col1, _col2
              Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats:
NONE
              Group By Operator
                aggregations: count(_col2)
                keys: _col0 (type: string), _col1 (type: string)
                mode: hash
                outputColumnNames: _col0, _col1, _col2
                Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats:
NONE
                Reduce Output Operator
                  key expressions: _col0 (type: string), _col1 (type: string)
                  sort order: ++
                  Map-reduce partition columns: _col0 (type: string), _col1 (type: string)
                  Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats:
NONE
                  value expressions: _col2 (type: bigint)
      Reduce Operator Tree:
        Group By Operator
          aggregations: count(VALUE._col0)
          keys: KEY._col0 (type: string), KEY._col1 (type: string)
          mode: mergepartial
          outputColumnNames: _col0, _col1, _col2
          Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
          File Output Operator
            compressed: false
            table:
                input format: org.apache.hadoop.mapred.SequenceFileInputFormat
                output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
                serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe

  Stage: Stage-2
    Map Reduce
      Map Operator Tree:
          TableScan
            Reduce Output Operator
              key expressions: _col0 (type: string), _col1 (type: string), _col2 (type: bigint)
              sort order: +++
              Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats:
NONE
              TopN Hash Memory Usage: 0.3
      Reduce Operator Tree:
        Select Operator
          expressions: KEY.reducesinkkey0 (type: string), KEY.reducesinkkey1 (type: string),
KEY.reducesinkkey2 (type: bigint)
          outputColumnNames: _col0, _col1, _col2
          Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
          Limit
            Number of rows: 20
            Statistics: Num rows: 20 Data size: 200 Basic stats: COMPLETE Column stats: NONE
            File Output Operator
              compressed: false
              Statistics: Num rows: 20 Data size: 200 Basic stats: COMPLETE Column stats:
NONE
              table:
                  input format: org.apache.hadoop.mapred.SequenceFileInputFormat
                  output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
                  serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe

  Stage: Stage-0
    Fetch Operator
      limit: 20
      Processor Tree:
        ListSink
{code}

Explain plan with patch:
{code}
STAGE DEPENDENCIES:
  Stage-1 is a root stage
  Stage-2 depends on stages: Stage-1
  Stage-0 depends on stages: Stage-2

STAGE PLANS:
  Stage: Stage-1
    Map Reduce
      Map Operator Tree:
          TableScan
            alias: src
            Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats:
NONE
            Select Operator
              expressions: key (type: string), value (type: string), (UDFToDouble(key) + 1.0)
(type: double)
              outputColumnNames: _col0, _col1, _col2
              Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats:
NONE
              Group By Operator
                aggregations: count(_col2)
                keys: _col0 (type: string), _col1 (type: string)
                mode: hash
                outputColumnNames: _col0, _col1, _col2
                Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats:
NONE
                Reduce Output Operator
                  key expressions: _col0 (type: string), _col1 (type: string)
                  sort order: ++
                  Map-reduce partition columns: _col0 (type: string), _col1 (type: string)
                  Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats:
NONE
                  TopN Hash Memory Usage: 0.3
                  value expressions: _col2 (type: bigint)
      Reduce Operator Tree:
        Group By Operator
          aggregations: count(VALUE._col0)
          keys: KEY._col0 (type: string), KEY._col1 (type: string)
          mode: mergepartial
          outputColumnNames: _col0, _col1, _col2
          Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
          File Output Operator
            compressed: false
            table:
                input format: org.apache.hadoop.mapred.SequenceFileInputFormat
                output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
                serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe

  Stage: Stage-2
    Map Reduce
      Map Operator Tree:
          TableScan
            Reduce Output Operator
              key expressions: _col0 (type: string), _col1 (type: string), _col2 (type: bigint)
              sort order: +++
              Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats:
NONE
              TopN Hash Memory Usage: 0.3
      Reduce Operator Tree:
        Select Operator
          expressions: KEY.reducesinkkey0 (type: string), KEY.reducesinkkey1 (type: string),
KEY.reducesinkkey2 (type: bigint)
          outputColumnNames: _col0, _col1, _col2
          Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
          Limit
            Number of rows: 20
            Statistics: Num rows: 20 Data size: 200 Basic stats: COMPLETE Column stats: NONE
            File Output Operator
              compressed: false
              Statistics: Num rows: 20 Data size: 200 Basic stats: COMPLETE Column stats:
NONE
              table:
                  input format: org.apache.hadoop.mapred.SequenceFileInputFormat
                  output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
                  serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe

  Stage: Stage-0
    Fetch Operator
      limit: 20
      Processor Tree:
        ListSink
{code}

Observe the only difference is the TopN annotation in the first stage, which prevents shuffling
all the data.


was (Author: jcamachorodriguez):
[~xuefuz], thanks for the feedback. It is indeed true for MR and Tez. I am not so familiar
with Spark as the backend. If it does not return ordered groups, are you sure the sequence
of operators is still RS-GB-RS? That would mean that other optimizations, such as ReduceSinkDeDuplication,
do not work properly (\?). In turn, I would argue that RS is a physical operator that has
certain semantics that should be respected, no matter the backend. However, if the sequence
of operators is still that one, I can just disable the optimization for Hive on Spark.

--

Given this query:
{code:sql}
explain
select key, value, count(key + 1) as agg1 from src 
group by key, value
order by key, value, agg1 limit 20;
{code}

Explain plan without patch:
{code}
STAGE DEPENDENCIES:
  Stage-1 is a root stage
  Stage-2 depends on stages: Stage-1
  Stage-0 depends on stages: Stage-2

STAGE PLANS:
  Stage: Stage-1
    Map Reduce
      Map Operator Tree:
          TableScan
            alias: src
            Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats:
NONE
            Select Operator
              expressions: key (type: string), value (type: string), (UDFToDouble(key) + 1.0)
(type: double)
              outputColumnNames: _col0, _col1, _col2
              Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats:
NONE
              Group By Operator
                aggregations: count(_col2)
                keys: _col0 (type: string), _col1 (type: string)
                mode: hash
                outputColumnNames: _col0, _col1, _col2
                Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats:
NONE
                Reduce Output Operator
                  key expressions: _col0 (type: string), _col1 (type: string)
                  sort order: ++
                  Map-reduce partition columns: _col0 (type: string), _col1 (type: string)
                  Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats:
NONE
                  value expressions: _col2 (type: bigint)
      Reduce Operator Tree:
        Group By Operator
          aggregations: count(VALUE._col0)
          keys: KEY._col0 (type: string), KEY._col1 (type: string)
          mode: mergepartial
          outputColumnNames: _col0, _col1, _col2
          Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
          File Output Operator
            compressed: false
            table:
                input format: org.apache.hadoop.mapred.SequenceFileInputFormat
                output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
                serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe

  Stage: Stage-2
    Map Reduce
      Map Operator Tree:
          TableScan
            Reduce Output Operator
              key expressions: _col0 (type: string), _col1 (type: string), _col2 (type: bigint)
              sort order: +++
              Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats:
NONE
              TopN Hash Memory Usage: 0.3
      Reduce Operator Tree:
        Select Operator
          expressions: KEY.reducesinkkey0 (type: string), KEY.reducesinkkey1 (type: string),
KEY.reducesinkkey2 (type: bigint)
          outputColumnNames: _col0, _col1, _col2
          Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
          Limit
            Number of rows: 20
            Statistics: Num rows: 20 Data size: 200 Basic stats: COMPLETE Column stats: NONE
            File Output Operator
              compressed: false
              Statistics: Num rows: 20 Data size: 200 Basic stats: COMPLETE Column stats:
NONE
              table:
                  input format: org.apache.hadoop.mapred.SequenceFileInputFormat
                  output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
                  serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe

  Stage: Stage-0
    Fetch Operator
      limit: 20
      Processor Tree:
        ListSink
{code}

Explain plan with patch:
{code}
STAGE DEPENDENCIES:
  Stage-1 is a root stage
  Stage-2 depends on stages: Stage-1
  Stage-0 depends on stages: Stage-2

STAGE PLANS:
  Stage: Stage-1
    Map Reduce
      Map Operator Tree:
          TableScan
            alias: src
            Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats:
NONE
            Select Operator
              expressions: key (type: string), value (type: string), (UDFToDouble(key) + 1.0)
(type: double)
              outputColumnNames: _col0, _col1, _col2
              Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats:
NONE
              Group By Operator
                aggregations: count(_col2)
                keys: _col0 (type: string), _col1 (type: string)
                mode: hash
                outputColumnNames: _col0, _col1, _col2
                Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats:
NONE
                Reduce Output Operator
                  key expressions: _col0 (type: string), _col1 (type: string)
                  sort order: ++
                  Map-reduce partition columns: _col0 (type: string), _col1 (type: string)
                  Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats:
NONE
                  TopN Hash Memory Usage: 0.3
                  value expressions: _col2 (type: bigint)
      Reduce Operator Tree:
        Group By Operator
          aggregations: count(VALUE._col0)
          keys: KEY._col0 (type: string), KEY._col1 (type: string)
          mode: mergepartial
          outputColumnNames: _col0, _col1, _col2
          Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
          File Output Operator
            compressed: false
            table:
                input format: org.apache.hadoop.mapred.SequenceFileInputFormat
                output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
                serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe

  Stage: Stage-2
    Map Reduce
      Map Operator Tree:
          TableScan
            Reduce Output Operator
              key expressions: _col0 (type: string), _col1 (type: string), _col2 (type: bigint)
              sort order: +++
              Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats:
NONE
              TopN Hash Memory Usage: 0.3
      Reduce Operator Tree:
        Select Operator
          expressions: KEY.reducesinkkey0 (type: string), KEY.reducesinkkey1 (type: string),
KEY.reducesinkkey2 (type: bigint)
          outputColumnNames: _col0, _col1, _col2
          Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
          Limit
            Number of rows: 20
            Statistics: Num rows: 20 Data size: 200 Basic stats: COMPLETE Column stats: NONE
            File Output Operator
              compressed: false
              Statistics: Num rows: 20 Data size: 200 Basic stats: COMPLETE Column stats:
NONE
              table:
                  input format: org.apache.hadoop.mapred.SequenceFileInputFormat
                  output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
                  serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe

  Stage: Stage-0
    Fetch Operator
      limit: 20
      Processor Tree:
        ListSink
{code}

Observe the only difference is the TopN annotation in the map side RS, which prevents shuffling
all the data.

> Extend limit propagation for chain of RS-GB-RS operators
> --------------------------------------------------------
>
>                 Key: HIVE-15474
>                 URL: https://issues.apache.org/jira/browse/HIVE-15474
>             Project: Hive
>          Issue Type: Bug
>          Components: Physical Optimizer
>    Affects Versions: 2.2.0
>            Reporter: Jesus Camacho Rodriguez
>            Assignee: Jesus Camacho Rodriguez
>         Attachments: HIVE-15474.patch
>
>
> The goal is to extend the work started in HIVE-14002.
> For instance, given the following query:
> {code:sql}
> explain
> select key, value, count(key + 1) as agg1 from src 
> group by key, value
> order by key, value, agg1 limit 20;
> {code}
> We can push the limit to the GBy operator. However, currently we do not do it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message