drill-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ppadma <...@git.apache.org>
Subject [GitHub] drill pull request #789: DRILL-5356: Refactor Parquet Record Reader
Date Thu, 06 Apr 2017 22:11:08 GMT
Github user ppadma commented on a diff in the pull request:

    https://github.com/apache/drill/pull/789#discussion_r110277141
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
---
    @@ -308,163 +232,50 @@ public FragmentContext getFragmentContext() {
       }
     
       /**
    -   * Returns data type length for a given {@see ColumnDescriptor} and it's corresponding
    -   * {@see SchemaElement}. Neither is enough information alone as the max
    -   * repetition level (indicating if it is an array type) is in the ColumnDescriptor
and
    -   * the length of a fixed width field is stored at the schema level.
    -   *
    -   * @return the length if fixed width, else -1
    +   * Prepare the Parquet reader. First determine the set of columns to read (the schema
    +   * for this read.) Then, create a state object to track the read across calls to
    +   * the reader <tt>next()</tt> method. Finally, create one of three readers
to
    +   * read batches depending on whether this scan is for only fixed-width fields,
    +   * contains at least one variable-width field, or is a "mock" scan consisting
    +   * only of null fields (fields in the SELECT clause but not in the Parquet file.)
        */
    -  private int getDataTypeLength(ColumnDescriptor column, SchemaElement se) {
    -    if (column.getType() != PrimitiveType.PrimitiveTypeName.BINARY) {
    -      if (column.getMaxRepetitionLevel() > 0) {
    -        return -1;
    -      }
    -      if (column.getType() == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) {
    -        return se.getType_length() * 8;
    -      } else {
    -        return getTypeLengthInBits(column.getType());
    -      }
    -    } else {
    -      return -1;
    -    }
    -  }
     
    -  @SuppressWarnings({ "resource", "unchecked" })
       @Override
       public void setup(OperatorContext operatorContext, OutputMutator output) throws ExecutionSetupException
{
         this.operatorContext = operatorContext;
    -    if (!isStarQuery()) {
    -      columnsFound = new boolean[getColumns().size()];
    -      nullFilledVectors = new ArrayList<>();
    -    }
    -    columnStatuses = new ArrayList<>();
    -    List<ColumnDescriptor> columns = footer.getFileMetaData().getSchema().getColumns();
    -    allFieldsFixedLength = true;
    -    ColumnDescriptor column;
    -    ColumnChunkMetaData columnChunkMetaData;
    -    int columnsToScan = 0;
    -    mockRecordsRead = 0;
    -
    -    MaterializedField field;
    +    schema = new ParquetSchema(fragmentContext.getOptions(), rowGroupIndex, isStarQuery()
? null : getColumns());
     
         logger.debug("Reading row group({}) with {} records in file {}.", rowGroupIndex,
footer.getBlocks().get(rowGroupIndex).getRowCount(),
             hadoopPath.toUri().getPath());
    -    totalRecordsRead = 0;
    -
    -    // TODO - figure out how to deal with this better once we add nested reading, note
also look where this map is used below
    -    // store a map from column name to converted types if they are non-null
    -    Map<String, SchemaElement> schemaElements = ParquetReaderUtility.getColNameToSchemaElementMapping(footer);
    -
    -    // loop to add up the length of the fixed width columns and build the schema
    -    for (int i = 0; i < columns.size(); ++i) {
    -      column = columns.get(i);
    -      SchemaElement se = schemaElements.get(column.getPath()[0]);
    -      MajorType mt = ParquetToDrillTypeConverter.toMajorType(column.getType(), se.getType_length(),
    -          getDataMode(column), se, fragmentContext.getOptions());
    -      field = MaterializedField.create(toFieldName(column.getPath()), mt);
    -      if ( ! fieldSelected(field)) {
    -        continue;
    -      }
    -      columnsToScan++;
    -      int dataTypeLength = getDataTypeLength(column, se);
    -      if (dataTypeLength == -1) {
    -        allFieldsFixedLength = false;
    -      } else {
    -        bitWidthAllFixedFields += dataTypeLength;
    -      }
    -    }
    -
    -    if (columnsToScan != 0  && allFieldsFixedLength) {
    -      recordsPerBatch = (int) Math.min(Math.min(batchSize / bitWidthAllFixedFields,
    -          footer.getBlocks().get(0).getColumns().get(0).getValueCount()), DEFAULT_RECORDS_TO_READ_IF_FIXED_WIDTH);
    -    }
    -    else {
    -      recordsPerBatch = DEFAULT_RECORDS_TO_READ_IF_VARIABLE_WIDTH;
    -    }
     
         try {
    -      ValueVector vector;
    -      SchemaElement schemaElement;
    -      final ArrayList<VarLengthColumn<? extends ValueVector>> varLengthColumns
= new ArrayList<>();
    -      // initialize all of the column read status objects
    -      boolean fieldFixedLength;
    -      // the column chunk meta-data is not guaranteed to be in the same order as the
columns in the schema
    -      // a map is constructed for fast access to the correct columnChunkMetadata to correspond
    -      // to an element in the schema
    -      Map<String, Integer> columnChunkMetadataPositionsInList = new HashMap<>();
    -      BlockMetaData rowGroupMetadata = footer.getBlocks().get(rowGroupIndex);
    -
    -      int colChunkIndex = 0;
    -      for (ColumnChunkMetaData colChunk : rowGroupMetadata.getColumns()) {
    -        columnChunkMetadataPositionsInList.put(Arrays.toString(colChunk.getPath().toArray()),
colChunkIndex);
    -        colChunkIndex++;
    -      }
    -      for (int i = 0; i < columns.size(); ++i) {
    -        column = columns.get(i);
    -        columnChunkMetaData = rowGroupMetadata.getColumns().get(columnChunkMetadataPositionsInList.get(Arrays.toString(column.getPath())));
    -        schemaElement = schemaElements.get(column.getPath()[0]);
    -        MajorType type = ParquetToDrillTypeConverter.toMajorType(column.getType(), schemaElement.getType_length(),
    -            getDataMode(column), schemaElement, fragmentContext.getOptions());
    -        field = MaterializedField.create(toFieldName(column.getPath()), type);
    -        // the field was not requested to be read
    -        if ( ! fieldSelected(field)) {
    -          continue;
    -        }
    -
    -        fieldFixedLength = column.getType() != PrimitiveType.PrimitiveTypeName.BINARY;
    -        vector = output.addField(field, (Class<? extends ValueVector>) TypeHelper.getValueVectorClass(type.getMinorType(),
type.getMode()));
    -        if (column.getType() != PrimitiveType.PrimitiveTypeName.BINARY) {
    -          if (column.getMaxRepetitionLevel() > 0) {
    -            final RepeatedValueVector repeatedVector = RepeatedValueVector.class.cast(vector);
    -            ColumnReader<?> dataReader = ColumnReaderFactory.createFixedColumnReader(this,
fieldFixedLength,
    -                column, columnChunkMetaData, recordsPerBatch,
    -                repeatedVector.getDataVector(), schemaElement);
    -            varLengthColumns.add(new FixedWidthRepeatedReader(this, dataReader,
    -                getTypeLengthInBits(column.getType()), -1, column, columnChunkMetaData,
false, repeatedVector, schemaElement));
    -          }
    -          else {
    -
    -           ColumnReader<?> cr = ColumnReaderFactory.createFixedColumnReader(this,
fieldFixedLength,
    -                column, columnChunkMetaData, recordsPerBatch, vector,
    -                schemaElement) ;
    -            columnStatuses.add(cr);
    -          }
    -        } else {
    -          // create a reader and add it to the appropriate list
    -          varLengthColumns.add(ColumnReaderFactory.getReader(this, -1, column, columnChunkMetaData,
false, vector, schemaElement));
    -        }
    -      }
    -      varLengthReader = new VarLenBinaryReader(this, varLengthColumns);
    -
    -      if (!isStarQuery()) {
    -        List<SchemaPath> projectedColumns = Lists.newArrayList(getColumns());
    -        SchemaPath col;
    -        for (int i = 0; i < columnsFound.length; i++) {
    -          col = projectedColumns.get(i);
    -          assert col!=null;
    -          if ( ! columnsFound[i] && !col.equals(STAR_COLUMN)) {
    -            nullFilledVectors.add((NullableIntVector)output.addField(MaterializedField.create(col.getAsUnescapedPath(),
    -                    Types.optional(TypeProtos.MinorType.INT)),
    -                (Class<? extends ValueVector>) TypeHelper.getValueVectorClass(TypeProtos.MinorType.INT,
DataMode.OPTIONAL)));
    -
    -          }
    -        }
    -      }
    +      schema.buildSchema(footer, batchSize);
    --- End diff --
    
    May be pass footer in the constructor of ParquetSchema itself ? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message