spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Ruslan Dautkhanov (JIRA)" <>
Subject [jira] [Commented] (SPARK-25164) Parquet reader builds entire list of columns once for each column
Date Thu, 13 Sep 2018 20:20:00 GMT


Ruslan Dautkhanov commented on SPARK-25164:

Hi [~bersprockets]


Thanks a lot for the detailed response.

I totally see with what you're saying.

That's interesting that Spark realizing all rows even though where filter has a predicate
for just one column.

I am thinking if it's feasible to lazily realize list of columns in select-clause only after
filtering is complete?

It seems could be a huge performance improvement for wider tables like this.

In other words, if Spark would realize list of columns specified in where clause first, and
only after filtering 
realize rest of columns needed for select-clause.


Thank you!


> Parquet reader builds entire list of columns once for each column
> -----------------------------------------------------------------
>                 Key: SPARK-25164
>                 URL:
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.4.0
>            Reporter: Bruce Robbins
>            Assignee: Bruce Robbins
>            Priority: Minor
>             Fix For: 2.2.3, 2.3.2, 2.4.0
> {{VectorizedParquetRecordReader.initializeInternal}} loops through each column, and for
each column it calls
> {noformat}
> requestedSchema.getColumns().get(i)
> {noformat}
> However, {{MessageType.getColumns}} will build the entire column list from getPaths(0).
> {noformat}
>   public List<ColumnDescriptor> getColumns() {
>     List<String[]> paths = this.getPaths(0);
>     List<ColumnDescriptor> columns = new ArrayList<ColumnDescriptor>(paths.size());
>     for (String[] path : paths) {
>       // TODO: optimize this                                                        
>       PrimitiveType primitiveType = getType(path).asPrimitiveType();
>       columns.add(new ColumnDescriptor(
>                       path,
>                       primitiveType,
>                       getMaxRepetitionLevel(path),
>                       getMaxDefinitionLevel(path)));
>     }
>     return columns;
>   }
> {noformat}
> This means that for each parquet file, this routine indirectly iterates colCount*colCount
> This is actually not particularly noticeable unless you have:
>  - many parquet files
>  - many columns
> To verify that this is an issue, I created a 1 million record parquet table with 6000
columns of type double and 67 files (so initializeInternal is called 67 times). I ran the
following query:
> {noformat}
> sql("select * from 6000_1m_double where id1 = 1").collect
> {noformat}
> I used Spark from the master branch. I had 8 executor threads. The filter returns only
a few thousand records. The query ran (on average) for 6.4 minutes.
> Then I cached the column list at the top of {{initializeInternal}} as follows:
> {noformat}
> List<ColumnDescriptor> columnCache = requestedSchema.getColumns();
> {noformat}
> Then I changed {{initializeInternal}} to use {{columnCache}} rather than {{requestedSchema.getColumns()}}.
> With the column cache variable, the same query runs in 5 minutes. So with my simple query,
you save %22 of time by not rebuilding the column list for each column.
> You get additional savings with a paths cache variable, now saving 34% in total on the
above query.

This message was sent by Atlassian JIRA

To unsubscribe, e-mail:
For additional commands, e-mail:

View raw message