impala-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Alexander Behm (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (IMPALA-5036) Improve COUNT(*) performance of Parquet scans.
Date Fri, 17 Mar 2017 20:42:41 GMT

     [ https://issues.apache.org/jira/browse/IMPALA-5036?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Alexander Behm updated IMPALA-5036:
-----------------------------------
    Description: 
{code}
select count(*) from parquet_table
{code}

Impala already has a special code path for fast Parquet scans when no columns are scanned
and materialized, but the performance can be significantly improved with a plan+execution
change, as follows:

Execution change:
Instead of returning empty batches until num_rows have been returned, the Parquet scanner
can populate a single slot with the num_rows from the Parquet row groups

Plan change:
The count(*) local aggregation needs to be changed to a sum(num_rows_slot) aggregation.
The final distributed plan will be:
scan -> local agg with sum(num_rows_slot) -> merge agg sum(sum(num_rows_slot))



  was:
Impala already has a special code path for fast Parquet scans when no columns are scanned
and materialized. Such cases include COUNT(*), or queries where only partition columns are
referenced. However, the existing code can be further improved in the special case where no
columns at all, not even partition columns, need to be returned.

See this code from HdfsParquetScanner::GetNextInternal():
{code}
  if (scan_node_->IsZeroSlotTableScan()) {
    // There are no materialized slots, e.g. count(*) over the table.  We can serve
    // this query from just the file metadata. We don't need to read the column data.
    if (row_group_rows_read_ == file_metadata_.num_rows) {
      eos_ = true;
      return Status::OK();
    }
    assemble_rows_timer_.Start();
    DCHECK_LE(row_group_rows_read_, file_metadata_.num_rows);
    int rows_remaining = file_metadata_.num_rows - row_group_rows_read_;
    int max_tuples = min<int64_t>(row_batch->capacity(), rows_remaining);
    TupleRow* current_row = row_batch->GetRow(row_batch->AddRow());
    int num_to_commit = WriteTemplateTuples(current_row, max_tuples);
    Status status = CommitRows(row_batch, num_to_commit);
    assemble_rows_timer_.Stop();
    RETURN_IF_ERROR(status);
    row_group_rows_read_ += num_to_commit;
    COUNTER_ADD(scan_node_->rows_read_counter(), row_group_rows_read_);
    return Status::OK();
  }
{code}

We still return one batch at a time, limited by the catch capacity.

I did a simple experiment with a synthetic file that has 20438625324 as the num_rows in the
Parquet file metadata. I ran the experiments on release:

{code}
set num_scanner_threads=1;
set num_nodes=1;
select count(*) from huge;
Fetched 1 row(s) in 22.12s
{code}

Increase the batch size by 10x:
{code}
set num_scanner_threads=1;
set num_nodes=1;
set batch_size=10240;
select count(*) from huge;
Fetched 1 row(s) in 9.36s
{code}

There is even a benefit when running with MT_DOP=1.
{code}
set mt_dop=1;
set num_nodes=1;
select count(*) from huge;
Fetched 1 row(s) in 11.60s
{code}

Increase the batch size by 10x:
{code}
set mt_dop=1;
set num_nodes=1;
set batch_size=10240;
select count(*) from huge;
Fetched 1 row(s) in 6.99s
{code}


> Improve COUNT(*) performance of Parquet scans.
> ----------------------------------------------
>
>                 Key: IMPALA-5036
>                 URL: https://issues.apache.org/jira/browse/IMPALA-5036
>             Project: IMPALA
>          Issue Type: Sub-task
>          Components: Backend
>    Affects Versions: Impala 2.5.0, Impala 2.6.0, Impala 2.7.0, Impala 2.8.0
>            Reporter: Alexander Behm
>              Labels: parquet, performance
>
> {code}
> select count(*) from parquet_table
> {code}
> Impala already has a special code path for fast Parquet scans when no columns are scanned
and materialized, but the performance can be significantly improved with a plan+execution
change, as follows:
> Execution change:
> Instead of returning empty batches until num_rows have been returned, the Parquet scanner
can populate a single slot with the num_rows from the Parquet row groups
> Plan change:
> The count(*) local aggregation needs to be changed to a sum(num_rows_slot) aggregation.
> The final distributed plan will be:
> scan -> local agg with sum(num_rows_slot) -> merge agg sum(sum(num_rows_slot))



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message