drill-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Paul Rogers (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (DRILL-5266) Parquet Reader produces "low density" record batches
Date Thu, 16 Feb 2017 01:01:38 GMT

    [ https://issues.apache.org/jira/browse/DRILL-5266?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15868909#comment-15868909
] 

Paul Rogers commented on DRILL-5266:
------------------------------------

The code that determines how many variable-length records fits into a record batch is seriously
flawed. Consider the execution flow:

{code}
public class ParquetRecordReader extends AbstractRecordReader {
  ...
  public int next() {
    ...
      recordsToRead = Math.min(recordsToRead, numRecordsToRead);

      if (allFieldsFixedLength) {
        readAllFixedFields(recordsToRead);
      } else { // variable length columns
        long fixedRecordsToRead = varLengthReader.readFields(recordsToRead, firstColumnStatus);
        readAllFixedFields(fixedRecordsToRead);
      }
{code}

This suggests that we read all the variable length fields, then the fixed length ones. We
tell it how many records to read.

Then:

{code}
public class VarLenBinaryReader {
  ...
  public long readFields(long recordsToReadInThisPass, ColumnReader<?> firstColumnStatus)
throws IOException {
    long recordsReadInCurrentPass = 0;
    ...
    recordsReadInCurrentPass = determineSizesSerial(recordsToReadInThisPass);
    if(useAsyncTasks){
      readRecordsParallel(recordsReadInCurrentPass);
    }else{
      readRecordsSerial(recordsReadInCurrentPass);
    }
{code}

Here, we determine how many records to read, then read them. OK.

{code}
  private long determineSizesSerial(long recordsToReadInThisPass) throws IOException {
    ...
    int recordsReadInCurrentPass = 0;
    top: do {
      for (VarLengthColumn<?> columnReader : columns) {
        // Return status is "done reading", meaning stop if true.
        if (columnReader.determineSize(recordsReadInCurrentPass, 0 /* unused */ ))
          break top;
      ...
{code}

This says that we determine the size of each column, one record at at time, stopping when
the batch is full. This is NOT what happens. Let's follow down:

{code}
  public boolean determineSize(long recordsReadInCurrentPass, Integer lengthVarFieldsInCurrentRecord)
throws IOException {
   ...
    if (processPageData((int) recordsReadInCurrentPass)) {
      return true;
    }

public abstract class VarLengthColumn<V extends ValueVector> extends ColumnReader<V>
{
  ...
  protected boolean processPageData(int recordsToReadInThisPass) throws IOException {
    return readAndStoreValueSizeInformation();
  }

public abstract class NullableVarLengthValuesColumn<V extends ValueVector> extends VarLengthValuesColumn<V>
{
  ...
    return ! setSafe(valuesReadInCurrentPass + pageReader.valuesReadyToRead, pageReader.pageData,
        (int) pageReader.readyToReadPosInBytes + 4, dataTypeLengthInBits);

  public static class NullableVarCharColumn extends NullableVarLengthValuesColumn<NullableVarCharVector>
{
    ...
    public boolean setSafe(int index, DrillBuf value, int start, int length) {
      if (index >= vector.getValueCapacity()) {
        return false;
      }

      if (usingDictionary) {
        ByteBuffer buf = currDictValToWrite.toByteBuffer();
        mutator.setSafe(index, buf, buf.position(), currDictValToWrite.length());
      } else {
        mutator.setSafe(index, 1, start, start + length, value);
      }
      return true;
    }
{code}

The above says, let's see when the data to be read would exceed the allocated vector length.
But, that's not what it does. It actually checks if the *number of values* exceeds the length.
The actual data vector length is grown automatically as needed. But, we don't need to check
value count here; we do that elsewhere. The whole idea of this code chain is to check data
capacity -- which is why we have to go down this path rather than simply doing a count check
earlier in the stack.

But, since we always extend capacity, we will never stop reading variable-length data based
on the amount of data, only the number of values. Crazy...

> Parquet Reader produces "low density" record batches
> ----------------------------------------------------
>
>                 Key: DRILL-5266
>                 URL: https://issues.apache.org/jira/browse/DRILL-5266
>             Project: Apache Drill
>          Issue Type: Bug
>          Components: Storage - Parquet
>    Affects Versions: 1.10
>            Reporter: Paul Rogers
>
> Testing with the managed sort revealed that, for at least one file, Parquet produces
"low-density" batches: batches in which only 5% of each value vector contains actual data,
with the rest being unused space. When fed into the sort, we end up buffering 95% of wasted
space, using only 5% of available memory to hold actual query data. The result is poor performance
of the sort as it must spill far more frequently than expected.
> The managed sort analyzes incoming batches to prepare good memory use estimates. The
following the the output from the Parquet file in question:
> {code}
> Actual batch schema & sizes {
>   T1¦¦cs_sold_date_sk(std col. size: 4, actual col. size: 4, total size: 196608, vector
size: 131072, data size: 4516, row capacity: 32768, density: 4)
>   T1¦¦cs_sold_time_sk(std col. size: 4, actual col. size: 4, total size: 196608, vector
size: 131072, data size: 4516, row capacity: 32768, density: 4)
>   T1¦¦cs_ship_date_sk(std col. size: 4, actual col. size: 4, total size: 196608, vector
size: 131072, data size: 4516, row capacity: 32768, density: 4)
> ...
>   c_email_address(std col. size: 54, actual col. size: 27, total size: 53248, vector
size: 49152, data size: 30327, row capacity: 4095, density: 62)
>   Records: 1129, Total size: 32006144, Row width:28350, Density:5}
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message