drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j..@apache.org
Subject [24/27] drill git commit: DRILL-5266: Parquet returns low-density batches
Date Thu, 02 Mar 2017 20:59:51 GMT
DRILL-5266: Parquet returns low-density batches

Fixes one glaring problem related to bit/byte confusion.

Includes a few clean-up items found along the way.

Additional fixes from code review comments

More code clean up from code review

close #749


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/8cded5ae
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/8cded5ae
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/8cded5ae

Branch: refs/heads/master
Commit: 8cded5ae124db86f42e1274762305ca04b373a51
Parents: 33fc25c
Author: Paul Rogers <progers@maprtech.com>
Authored: Wed Feb 15 20:51:17 2017 -0800
Committer: Jinfeng Ni <jni@apache.org>
Committed: Wed Mar 1 23:46:25 2017 -0800

----------------------------------------------------------------------
 .../parquet/columnreaders/ColumnReader.java     | 42 ++++++---------
 .../columnreaders/FixedWidthRepeatedReader.java |  1 +
 .../NullableVarLengthValuesColumn.java          |  6 +--
 .../columnreaders/ParquetRecordReader.java      | 40 ++++++--------
 .../columnreaders/VarLenBinaryReader.java       | 56 +++++++++++---------
 5 files changed, 66 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/8cded5ae/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java
index c45642b..5eaf286 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java
@@ -101,8 +101,10 @@ public abstract class ColumnReader<V extends ValueVector> {
     }
     if (columnDescriptor.getType() != PrimitiveType.PrimitiveTypeName.BINARY) {
       if (columnDescriptor.getType() == PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) {
+        // Here "bits" means "bytes"
         dataTypeLengthInBits = columnDescriptor.getTypeLength() * 8;
       } else {
+        // While here, "bits" means "bits"
         dataTypeLengthInBits = ParquetRecordReader.getTypeLengthInBits(columnDescriptor.getType());
       }
     }
@@ -124,7 +126,7 @@ public abstract class ColumnReader<V extends ValueVector> {
     reset();
     if(recordsToReadInThisPass>0) {
       do {
-        determineSize(recordsToReadInThisPass, 0);
+        determineSize(recordsToReadInThisPass);
 
       } while (valuesReadInCurrentPass < recordsToReadInThisPass && pageReader.hasPage());
     }
@@ -153,9 +155,7 @@ public abstract class ColumnReader<V extends ValueVector> {
           .pushContext("File: ", this.parentReader.getHadoopPath().toString() )
           .build(logger);
       throw ex;
-
     }
-
   }
 
   protected abstract void readField(long recordsToRead);
@@ -170,27 +170,17 @@ public abstract class ColumnReader<V extends ValueVector> {
    * @return - true if we should stop reading
    * @throws IOException
    */
-  public boolean determineSize(long recordsReadInCurrentPass, Integer lengthVarFieldsInCurrentRecord)
throws IOException {
-
-    boolean doneReading = readPage();
-    if (doneReading) {
-      return true;
-    }
+  public boolean determineSize(long recordsReadInCurrentPass) throws IOException {
 
-    doneReading = processPageData((int) recordsReadInCurrentPass);
-    if (doneReading) {
+    if (readPage()) {
       return true;
     }
 
-    // Never used in this code path. Hard to remove because the method is overidden by subclasses
-    lengthVarFieldsInCurrentRecord = -1;
-
-    doneReading = checkVectorCapacityReached();
-    if (doneReading) {
+    if (processPageData((int) recordsReadInCurrentPass)) {
       return true;
     }
 
-    return false;
+    return checkVectorCapacityReached();
   }
 
   protected Future<Integer> readRecordsAsync(int recordsToRead){
@@ -264,17 +254,20 @@ public abstract class ColumnReader<V extends ValueVector> {
   protected void hitRowGroupEnd() {}
 
   protected boolean checkVectorCapacityReached() {
+    // Here "bits" means "bytes"
+    // But, inside "capacity", "bits" sometimes means "bits".
+    // Note that bytesReadInCurrentPass is never updated, so this next
+    // line is a no-op.
     if (bytesReadInCurrentPass + dataTypeLengthInBits > capacity()) {
       logger.debug("Reached the capacity of the data vector in a variable length value vector.");
       return true;
     }
-    else if (valuesReadInCurrentPass > valueVec.getValueCapacity()) {
-      return true;
-    }
-    return false;
+    // No op: already checked this earlier and would not be here if this
+    // condition is true.
+    return valuesReadInCurrentPass > valueVec.getValueCapacity();
   }
 
-  // copied out of parquet library, didn't want to deal with the uneeded throws statement
they had declared
+  // copied out of Parquet library, didn't want to deal with the uneeded throws statement
they had declared
   public static int readIntLittleEndian(DrillBuf in, int offset) {
     int ch4 = in.getByte(offset) & 0xff;
     int ch3 = in.getByte(offset + 1) & 0xff;
@@ -285,7 +278,7 @@ public abstract class ColumnReader<V extends ValueVector> {
 
   private class ColumnReaderProcessPagesTask implements Callable<Long> {
 
-    private final ColumnReader parent = ColumnReader.this;
+    private final ColumnReader<V> parent = ColumnReader.this;
     private final long recordsToReadInThisPass;
 
     public ColumnReaderProcessPagesTask(long recordsToReadInThisPass){
@@ -305,12 +298,11 @@ public abstract class ColumnReader<V extends ValueVector> {
         Thread.currentThread().setName(oldname);
       }
     }
-
   }
 
   private class ColumnReaderReadRecordsTask implements Callable<Integer> {
 
-    private final ColumnReader parent = ColumnReader.this;
+    private final ColumnReader<V> parent = ColumnReader.this;
     private final int recordsToRead;
 
     public ColumnReaderReadRecordsTask(int recordsToRead){

http://git-wip-us.apache.org/repos/asf/drill/blob/8cded5ae/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedWidthRepeatedReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedWidthRepeatedReader.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedWidthRepeatedReader.java
index f70c8d5..6db7110 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedWidthRepeatedReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedWidthRepeatedReader.java
@@ -140,6 +140,7 @@ public class FixedWidthRepeatedReader extends VarLengthColumn<RepeatedValueVecto
     }
   }
 
+  @SuppressWarnings("resource")
   @Override
   protected boolean readAndStoreValueSizeInformation() {
     int numLeftoverVals = 0;

http://git-wip-us.apache.org/repos/asf/drill/blob/8cded5ae/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableVarLengthValuesColumn.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableVarLengthValuesColumn.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableVarLengthValuesColumn.java
index c96064b..3a7a54b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableVarLengthValuesColumn.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableVarLengthValuesColumn.java
@@ -91,12 +91,8 @@ public abstract class NullableVarLengthValuesColumn<V extends ValueVector>
exten
       dataTypeLengthInBits = pageReader.pageData.getInt((int) pageReader.readyToReadPosInBytes);
     }
     // I think this also needs to happen if it is null for the random access
-    boolean success = setSafe(valuesReadInCurrentPass + pageReader.valuesReadyToRead, pageReader.pageData,
+    return ! setSafe(valuesReadInCurrentPass + pageReader.valuesReadyToRead, pageReader.pageData,
         (int) pageReader.readyToReadPosInBytes + 4, dataTypeLengthInBits);
-    if ( ! success ) {
-      return true;
-    }
-    return false;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/8cded5ae/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
index 79901ed..93c1214 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
@@ -26,8 +26,6 @@ import java.util.Map;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
-import com.google.common.base.Stopwatch;
-import com.google.common.collect.ImmutableList;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.SchemaPath;
@@ -53,16 +51,15 @@ import org.apache.drill.exec.vector.complex.RepeatedValueVector;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.parquet.column.ColumnDescriptor;
-import org.apache.parquet.format.FileMetaData;
 import org.apache.parquet.format.SchemaElement;
-import org.apache.parquet.format.converter.ParquetMetadataConverter;
 import org.apache.parquet.hadoop.CodecFactory;
-import org.apache.parquet.hadoop.ParquetFileWriter;
 import org.apache.parquet.hadoop.metadata.BlockMetaData;
 import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
 import org.apache.parquet.hadoop.metadata.ParquetMetadata;
 import org.apache.parquet.schema.PrimitiveType;
 
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 
 public class ParquetRecordReader extends AbstractRecordReader {
@@ -72,7 +69,8 @@ public class ParquetRecordReader extends AbstractRecordReader {
   private static final int NUMBER_OF_VECTORS = 1;
   private static final long DEFAULT_BATCH_LENGTH = 256 * 1024 * NUMBER_OF_VECTORS; // 256kb
   private static final long DEFAULT_BATCH_LENGTH_IN_BITS = DEFAULT_BATCH_LENGTH * 8; // 256kb
-  private static final char DEFAULT_RECORDS_TO_READ_IF_NOT_FIXED_WIDTH = 32*1024;
+  private static final char DEFAULT_RECORDS_TO_READ_IF_VARIABLE_WIDTH = 32*1024; // 32K
+  private static final int DEFAULT_RECORDS_TO_READ_IF_FIXED_WIDTH = 64*1024 - 1; // 64K -
1, max SV2 can address
   private static final int NUM_RECORDS_TO_READ_NOT_SPECIFIED = -1;
 
   // When no column is required by the downstrea operator, ask SCAN to return a DEFAULT column.
If such column does not exist,
@@ -91,12 +89,10 @@ public class ParquetRecordReader extends AbstractRecordReader {
   private boolean allFieldsFixedLength;
   private int recordsPerBatch;
   private OperatorContext operatorContext;
-//  private long totalRecords;
-//  private long rowGroupOffset;
 
   private List<ColumnReader<?>> columnStatuses;
   private FileSystem fileSystem;
-  private long batchSize;
+  private final long batchSize;
   private long numRecordsToRead; // number of records to read
 
   Path hadoopPath;
@@ -128,6 +124,7 @@ public class ParquetRecordReader extends AbstractRecordReader {
   public boolean enforceTotalSize;
   public long readQueueSize;
 
+  @SuppressWarnings("unused")
   private String name;
 
 
@@ -333,6 +330,7 @@ public class ParquetRecordReader extends AbstractRecordReader {
     }
   }
 
+  @SuppressWarnings({ "resource", "unchecked" })
   @Override
   public void setup(OperatorContext operatorContext, OutputMutator output) throws ExecutionSetupException
{
     this.operatorContext = operatorContext;
@@ -341,7 +339,6 @@ public class ParquetRecordReader extends AbstractRecordReader {
       nullFilledVectors = new ArrayList<>();
     }
     columnStatuses = new ArrayList<>();
-//    totalRecords = footer.getBlocks().get(rowGroupIndex).getRowCount();
     List<ColumnDescriptor> columns = footer.getFileMetaData().getSchema().getColumns();
     allFieldsFixedLength = true;
     ColumnDescriptor column;
@@ -350,8 +347,6 @@ public class ParquetRecordReader extends AbstractRecordReader {
     mockRecordsRead = 0;
 
     MaterializedField field;
-//    ParquetMetadataConverter metaConverter = new ParquetMetadataConverter();
-    FileMetaData fileMetaData;
 
     logger.debug("Reading row group({}) with {} records in file {}.", rowGroupIndex, footer.getBlocks().get(rowGroupIndex).getRowCount(),
         hadoopPath.toUri().getPath());
@@ -374,19 +369,18 @@ public class ParquetRecordReader extends AbstractRecordReader {
       columnsToScan++;
       int dataTypeLength = getDataTypeLength(column, se);
       if (dataTypeLength == -1) {
-          allFieldsFixedLength = false;
-        } else {
+        allFieldsFixedLength = false;
+      } else {
         bitWidthAllFixedFields += dataTypeLength;
-        }
       }
-//    rowGroupOffset = footer.getBlocks().get(rowGroupIndex).getColumns().get(0).getFirstDataPageOffset();
+    }
 
     if (columnsToScan != 0  && allFieldsFixedLength) {
       recordsPerBatch = (int) Math.min(Math.min(batchSize / bitWidthAllFixedFields,
-          footer.getBlocks().get(0).getColumns().get(0).getValueCount()), 65535);
+          footer.getBlocks().get(0).getColumns().get(0).getValueCount()), DEFAULT_RECORDS_TO_READ_IF_FIXED_WIDTH);
     }
     else {
-      recordsPerBatch = DEFAULT_RECORDS_TO_READ_IF_NOT_FIXED_WIDTH;
+      recordsPerBatch = DEFAULT_RECORDS_TO_READ_IF_VARIABLE_WIDTH;
     }
 
     try {
@@ -526,7 +520,7 @@ public class ParquetRecordReader extends AbstractRecordReader {
       futures.add(f);
     }
     Exception exception = null;
-    for(Future f: futures){
+    for(Future<Long> f: futures){
       if(exception != null) {
         f.cancel(true);
       } else {
@@ -567,7 +561,7 @@ public class ParquetRecordReader extends AbstractRecordReader {
           parquetReaderStats.timeProcess.addAndGet(timer.elapsed(TimeUnit.NANOSECONDS));
           return 0;
         }
-        recordsToRead = Math.min(DEFAULT_RECORDS_TO_READ_IF_NOT_FIXED_WIDTH, footer.getBlocks().get(rowGroupIndex).getRowCount()
- mockRecordsRead);
+        recordsToRead = Math.min(DEFAULT_RECORDS_TO_READ_IF_VARIABLE_WIDTH, footer.getBlocks().get(rowGroupIndex).getRowCount()
- mockRecordsRead);
 
         // Pick the minimum of recordsToRead calculated above and numRecordsToRead (based
on rowCount and limit).
         recordsToRead = Math.min(recordsToRead, numRecordsToRead);
@@ -585,7 +579,7 @@ public class ParquetRecordReader extends AbstractRecordReader {
       if (allFieldsFixedLength) {
         recordsToRead = Math.min(recordsPerBatch, firstColumnStatus.columnChunkMetaData.getValueCount()
- firstColumnStatus.totalValuesRead);
       } else {
-        recordsToRead = DEFAULT_RECORDS_TO_READ_IF_NOT_FIXED_WIDTH;
+        recordsToRead = DEFAULT_RECORDS_TO_READ_IF_VARIABLE_WIDTH;
 
       }
 
@@ -595,7 +589,7 @@ public class ParquetRecordReader extends AbstractRecordReader {
       if (allFieldsFixedLength) {
         readAllFixedFields(recordsToRead);
       } else { // variable length columns
-        long fixedRecordsToRead = varLengthReader.readFields(recordsToRead, firstColumnStatus);
+        long fixedRecordsToRead = varLengthReader.readFields(recordsToRead);
         readAllFixedFields(fixedRecordsToRead);
       }
 
@@ -644,7 +638,7 @@ public class ParquetRecordReader extends AbstractRecordReader {
     codecFactory.release();
 
     if (varLengthReader != null) {
-      for (final VarLengthColumn r : varLengthReader.columns) {
+      for (final VarLengthColumn<?> r : varLengthReader.columns) {
         r.clear();
       }
       varLengthReader.columns.clear();

http://git-wip-us.apache.org/repos/asf/drill/blob/8cded5ae/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java
index 9bfc3aa..b598ac8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java
@@ -33,24 +33,34 @@ public class VarLenBinaryReader {
   ParquetRecordReader parentReader;
   final List<VarLengthColumn<? extends ValueVector>> columns;
   final boolean useAsyncTasks;
+  private final long targetRecordCount;
 
   public VarLenBinaryReader(ParquetRecordReader parentReader, List<VarLengthColumn<?
extends ValueVector>> columns) {
     this.parentReader = parentReader;
     this.columns = columns;
     useAsyncTasks = parentReader.useAsyncColReader;
+
+    // Can't read any more records than fixed width fields will fit.
+    // Note: this calculation is very likely wrong; it is a simplified
+    // version of earlier code, but probably needs even more attention.
+
+    int totalFixedFieldWidth = parentReader.getBitWidthAllFixedFields() / 8;
+    if (totalFixedFieldWidth == 0) {
+      targetRecordCount = 0;
+    } else {
+      targetRecordCount = parentReader.getBatchSize() / totalFixedFieldWidth;
+    }
   }
 
   /**
    * Reads as many variable length values as possible.
    *
    * @param recordsToReadInThisPass - the number of records recommended for reading form
the reader
-   * @param firstColumnStatus - a reference to the first column status in the parquet file
to grab metatdata from
+   * @param firstColumnStatus - a reference to the first column status in the Parquet file
to grab metatdata from
    * @return - the number of fixed length fields that will fit in the batch
    * @throws IOException
    */
-  public long readFields(long recordsToReadInThisPass, ColumnReader<?> firstColumnStatus)
throws IOException {
-
-    long recordsReadInCurrentPass = 0;
+  public long readFields(long recordsToReadInThisPass) throws IOException {
 
     // write the first 0 offset
     for (VarLengthColumn<?> columnReader : columns) {
@@ -58,10 +68,16 @@ public class VarLenBinaryReader {
     }
     Stopwatch timer = Stopwatch.createStarted();
 
-    recordsReadInCurrentPass = determineSizesSerial(recordsToReadInThisPass);
-    if(useAsyncTasks){
+    // Can't read any more records than fixed width fields will fit.
+
+    if (targetRecordCount > 0) {
+      recordsToReadInThisPass = Math.min(recordsToReadInThisPass, targetRecordCount);
+    }
+    long recordsReadInCurrentPass = determineSizesSerial(recordsToReadInThisPass);
+
+    if(useAsyncTasks) {
       readRecordsParallel(recordsReadInCurrentPass);
-    }else{
+    } else {
       readRecordsSerial(recordsReadInCurrentPass);
     }
 
@@ -70,33 +86,21 @@ public class VarLenBinaryReader {
     return recordsReadInCurrentPass;
   }
 
-
   private long determineSizesSerial(long recordsToReadInThisPass) throws IOException {
-    int lengthVarFieldsInCurrentRecord = 0;
-    boolean exitLengthDeterminingLoop = false;
-    long totalVariableLengthData = 0;
-    long recordsReadInCurrentPass = 0;
-    do {
+
+    int recordsReadInCurrentPass = 0;
+    top: do {
       for (VarLengthColumn<?> columnReader : columns) {
-        if (!exitLengthDeterminingLoop) {
-          exitLengthDeterminingLoop =
-              columnReader.determineSize(recordsReadInCurrentPass, lengthVarFieldsInCurrentRecord);
-        } else {
-          break;
+        // Return status is "done reading", meaning stop if true.
+        if (columnReader.determineSize(recordsReadInCurrentPass)) {
+          break top;
         }
       }
-      // check that the next record will fit in the batch
-      if (exitLengthDeterminingLoop ||
-          (recordsReadInCurrentPass + 1) * parentReader.getBitWidthAllFixedFields()
-              + totalVariableLengthData + lengthVarFieldsInCurrentRecord > parentReader.getBatchSize())
{
-        break;
-      }
       for (VarLengthColumn<?> columnReader : columns) {
         columnReader.updateReadyToReadPosition();
         columnReader.currDefLevel = -1;
       }
       recordsReadInCurrentPass++;
-      totalVariableLengthData += lengthVarFieldsInCurrentRecord;
     } while (recordsReadInCurrentPass < recordsToReadInThisPass);
 
     return recordsReadInCurrentPass;
@@ -118,7 +122,7 @@ public class VarLenBinaryReader {
       futures.add(f);
     }
     Exception exception = null;
-    for(Future f: futures){
+    for(Future<Integer> f: futures){
       if(exception != null) {
         f.cancel(true);
       } else {


Mime
View raw message