drill-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From paul-rogers <...@git.apache.org>
Subject [GitHub] drill pull request #789: DRILL-5356: Refactor Parquet Record Reader
Date Thu, 25 May 2017 21:41:23 GMT
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/789#discussion_r118591297
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetSchema.java
---
    @@ -0,0 +1,262 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.exec.store.parquet.columnreaders;
    +
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.apache.drill.common.expression.SchemaPath;
    +import org.apache.drill.common.types.TypeProtos;
    +import org.apache.drill.common.types.Types;
    +import org.apache.drill.common.types.TypeProtos.DataMode;
    +import org.apache.drill.exec.exception.SchemaChangeException;
    +import org.apache.drill.exec.expr.TypeHelper;
    +import org.apache.drill.exec.physical.impl.OutputMutator;
    +import org.apache.drill.exec.record.MaterializedField;
    +import org.apache.drill.exec.server.options.OptionManager;
    +import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
    +import org.apache.drill.exec.vector.NullableIntVector;
    +import org.apache.parquet.column.ColumnDescriptor;
    +import org.apache.parquet.format.SchemaElement;
    +import org.apache.parquet.hadoop.metadata.BlockMetaData;
    +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
    +import org.apache.parquet.hadoop.metadata.ParquetMetadata;
    +
    +import com.google.common.collect.Lists;
    +
    +/**
    + * Mapping from the schema of the Parquet file to that of the record reader
    + * to the schema that Drill and the Parquet reader uses.
    + */
    +
    +public class ParquetSchema {
    +  /**
    +   * Set of columns specified in the SELECT clause. Will be null for
    +   * a SELECT * query.
    +   */
    +  private final Collection<SchemaPath> selectedCols;
    +  /**
    +   * Parallel list to the columns list above, it is used to determine the subset of the
project
    +   * pushdown columns that do not appear in this file.
    +   */
    +  private final boolean[] columnsFound;
    +  private final OptionManager options;
    +  private final int rowGroupIndex;
    +  private ParquetMetadata footer;
    +  /**
    +   * List of metadata for selected columns. This list does two things.
    +   * First, it identifies the Parquet columns we wish to select. Second, it
    +   * provides metadata for those columns. Note that null columns (columns
    +   * in the SELECT clause but not in the file) appear elsewhere.
    +   */
    +  private List<ParquetColumnMetadata> selectedColumnMetadata = new ArrayList<>();
    +  private int bitWidthAllFixedFields;
    +  private boolean allFieldsFixedLength;
    +  private long groupRecordCount;
    +  private int recordsPerBatch;
    +
    +  /**
    +   * Build the Parquet schema. The schema can be based on a "SELECT *",
    +   * meaning we want all columns defined in the Parquet file. In this case,
    +   * the list of selected columns is null. Or, the query can be based on
    +   * an explicit list of selected columns. In this case, the
    +   * columns need not exist in the Parquet file. If a column does not exist,
    +   * the reader returns null for that column. If no selected column exists
    +   * in the file, then we return "mock" records: records with only null
    +   * values, but repeated for the number of rows in the Parquet file.
    +   *
    +   * @param options session options
    +   * @param rowGroupIndex row group to read
    +   * @param selectedCols columns specified in the SELECT clause, or null if
    +   * this is a SELECT * query
    +   */
    +
    +  public ParquetSchema(OptionManager options, int rowGroupIndex, Collection<SchemaPath>
selectedCols) {
    +    this.options = options;
    +    this.rowGroupIndex = rowGroupIndex;
    +    this.selectedCols = selectedCols;
    +    if (selectedCols == null) {
    +      columnsFound = null;
    +    } else {
    +      columnsFound = new boolean[selectedCols.size()];
    +    }
    +  }
    +
    +  /**
    +   * Build the schema for this read as a combination of the schema specified in
    +   * the Parquet footer and the list of columns selected in the query.
    +   *
    +   * @param footer Parquet metadata
    +   * @param batchSize target size of the batch, in rows
    +   * @throws Exception if anything goes wrong
    +   */
    +
    +  public void buildSchema(ParquetMetadata footer, long batchSize) throws Exception {
    +    this.footer = footer;
    +    groupRecordCount = footer.getBlocks().get(rowGroupIndex).getRowCount();
    +    loadParquetSchema();
    +    computeFixedPart();
    +
    +    if (! selectedColumnMetadata.isEmpty()  && allFieldsFixedLength) {
    +      recordsPerBatch = (int) Math.min(Math.min(batchSize / bitWidthAllFixedFields,
    +          footer.getBlocks().get(0).getColumns().get(0).getValueCount()), ParquetRecordReader.DEFAULT_RECORDS_TO_READ_IF_FIXED_WIDTH);
    +    }
    +    else {
    +      recordsPerBatch = ParquetRecordReader.DEFAULT_RECORDS_TO_READ_IF_VARIABLE_WIDTH;
    +    }
    +  }
    +
    +  /**
    +   * Scan the Parquet footer, then map each Parquet column to the list of columns
    +   * we want to read. Track those to be read.
    +   */
    +
    +  private void loadParquetSchema() {
    +    // TODO - figure out how to deal with this better once we add nested reading, note
also look where this map is used below
    +    // store a map from column name to converted types if they are non-null
    +    Map<String, SchemaElement> schemaElements = ParquetReaderUtility.getColNameToSchemaElementMapping(footer);
    +
    +    // loop to add up the length of the fixed width columns and build the schema
    +    for (ColumnDescriptor column : footer.getFileMetaData().getSchema().getColumns())
{
    +      ParquetColumnMetadata columnMetadata = new ParquetColumnMetadata(column);
    +      columnMetadata.resolveDrillType(schemaElements, options);
    +      if (! fieldSelected(columnMetadata.field)) {
    +        continue;
    +      }
    +      selectedColumnMetadata.add(columnMetadata);
    +    }
    +  }
    +
    +  /**
    +   * Fixed-width fields are the easiest to plan. We know the size of each column,
    +   * making it easy to determine the total length of each vector, once we know
    +   * the target record count. A special reader is used in the fortunate case
    +   * that all fields are fixed width.
    +   */
    +
    +  private void computeFixedPart() {
    +    allFieldsFixedLength = true;
    +    for (ParquetColumnMetadata colMd : selectedColumnMetadata) {
    +      if (colMd.isFixedLength()) {
    +        bitWidthAllFixedFields += colMd.length;
    +      } else {
    +        allFieldsFixedLength = false;
    +      }
    +    }
    +  }
    +
    +  public boolean isStarQuery() { return selectedCols == null; }
    +  public ParquetMetadata footer() { return footer; }
    +  public int getBitWidthAllFixedFields() { return bitWidthAllFixedFields; }
    +  public int getRecordsPerBatch() { return recordsPerBatch; }
    +  public boolean allFieldsFixedLength() { return allFieldsFixedLength; }
    +  public List<ParquetColumnMetadata> getColumnMetadata() { return selectedColumnMetadata;
}
    +
    +  /**
    +   * Return the Parquet file row count.
    +   *
    +   * @return number of records in the Parquet row group
    +   */
    +
    +  public long getGroupRecordCount() { return groupRecordCount; }
    +
    +  public BlockMetaData getRowGroupMetadata() {
    +    return footer.getBlocks().get(rowGroupIndex);
    +  }
    +
    +  /**
    +   * Determine if a Parquet field is selected for the query. It is selected
    +   * either if this is a star query (we want all columns), or the column
    +   * appers in the select list.
    +   *
    +   * @param field the Parquet column expressed as as Drill field.
    +   * @return true if the column is to be included in the scan, false
    +   * if not
    +   */
    +
    +  private boolean fieldSelected(MaterializedField field) {
    +    // TODO - not sure if this is how we want to represent this
    +    // for now it makes the existing tests pass, simply selecting
    +    // all available data if no columns are provided
    +    if (isStarQuery()) {
    +      return true;
    +    }
    +
    +    int i = 0;
    +    for (SchemaPath expr : selectedCols) {
    +      if ( field.getPath().equalsIgnoreCase(expr.getAsUnescapedPath())) {
    +        columnsFound[i] = true;
    +        return true;
    +      }
    +      i++;
    +    }
    +    return false;
    +  }
    +  /**
    --- End diff --
    
    Fixed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message